From 0e47a06d55268a7eaf4f4682ed93a7ed065c4cb6 Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 18:25:51 +0200 Subject: [PATCH 1/8] fix: order npm maintainer upserts to avoid deadlock Signed-off-by: anilb --- .../libs/data-access-layer/src/packages/maintainers.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/services/libs/data-access-layer/src/packages/maintainers.ts b/services/libs/data-access-layer/src/packages/maintainers.ts index 6cd5680e53..2230245b5a 100644 --- a/services/libs/data-access-layer/src/packages/maintainers.ts +++ b/services/libs/data-access-layer/src/packages/maintainers.ts @@ -14,7 +14,11 @@ export async function upsertNpmMaintainers( ): Promise { const changed = new Set() - for (const m of maintainers) { + // Lock maintainers rows in a deterministic (username) order so concurrent ingest lanes + // enriching different packages that share maintainers can't acquire the row locks in cyclic order + const ordered = [...maintainers].sort((a, b) => a.username.localeCompare(b.username)) + + for (const m of ordered) { const row: { changed_fields: string[] } = await qx.selectOne( `WITH old AS ( SELECT display_name, email FROM maintainers WHERE ecosystem = 'npm' AND username = $(username) @@ -50,7 +54,7 @@ export async function upsertNpmMaintainers( }) const afterMap = new Map() - for (const m of maintainers) { + for (const m of ordered) { const row: { maintainer_id: string } | null = await qx.selectOneOrNone( `INSERT INTO package_maintainers (package_id, maintainer_id, role, created_at, updated_at) SELECT $(packageId)::bigint, id, $(role), NOW(), NOW() FROM maintainers WHERE ecosystem = 'npm' AND username = $(username) From b13542db0b074dcd2fa99734c2769e2017b35a53 Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 18:42:13 +0200 Subject: [PATCH 2/8] fix: avoid empty result race in repo upsert Signed-off-by: anilb --- .../data-access-layer/src/packages/repos.ts | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/services/libs/data-access-layer/src/packages/repos.ts b/services/libs/data-access-layer/src/packages/repos.ts index 678bcc9532..469ce883a8 100644 --- a/services/libs/data-access-layer/src/packages/repos.ts +++ b/services/libs/data-access-layer/src/packages/repos.ts @@ -5,22 +5,31 @@ export async function getOrCreateRepoByUrl( url: string, host: string, ): Promise<{ id: string; changedFields: string[] }> { - const row: { id: string; created: boolean } = await qx.selectOne( - ` - WITH ins AS ( - INSERT INTO repos (url, host) VALUES ($(url), $(host)) - ON CONFLICT (url) DO NOTHING - RETURNING id - ) - SELECT id::text AS id, true AS created FROM ins - UNION ALL - SELECT id::text AS id, false AS created - FROM repos - WHERE url = $(url) AND NOT EXISTS (SELECT 1 FROM ins) - `, + // Repos are shared across packages (every package in a monorepo points at one repo) + // so this is by far the common case + const existing: { id: string } | null = await qx.selectOneOrNone( + `SELECT id::text AS id FROM repos WHERE url = $(url)`, + { url }, + ) + if (existing) return { id: existing.id, changedFields: [] } + + // Not seen yet — try to create it. ON CONFLICT DO NOTHING so a concurrent ingest lane creating + // the same shared repo doesn't raise a unique violation. + const inserted: { id: string } | null = await qx.selectOneOrNone( + `INSERT INTO repos (url, host) VALUES ($(url), $(host)) + ON CONFLICT (url) DO NOTHING + RETURNING id::text AS id`, { url, host }, ) - return { id: row.id, changedFields: row.created ? ['repos.url', 'repos.host'] : [] } + if (inserted) return { id: inserted.id, changedFields: ['repos.url', 'repos.host'] } + + // Lost the race: another lane committed the same url between our SELECT and INSERT, so + // ON CONFLICT DO NOTHING returned no row. Re-read in a fresh statement — under READ COMMITTED + const row: { id: string } = await qx.selectOne( + `SELECT id::text AS id FROM repos WHERE url = $(url)`, + { url }, + ) + return { id: row.id, changedFields: [] } } export async function upsertPackageRepo( From 05d1faf596b500da6ef3e9c25c38aa8e97c6e355 Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 19:36:03 +0200 Subject: [PATCH 3/8] chore: lower npm ingest rounds per run Signed-off-by: anilb --- services/apps/packages_worker/src/npm/workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/npm/workflows.ts b/services/apps/packages_worker/src/npm/workflows.ts index e638aff394..4f6512d408 100644 --- a/services/apps/packages_worker/src/npm/workflows.ts +++ b/services/apps/packages_worker/src/npm/workflows.ts @@ -27,7 +27,7 @@ const heartbeatingActs = proxyActivities({ // Per lane, per round. Total purls fetched per round = lanes × INGEST_PER_LANE. const INGEST_PER_LANE = 50 -const INGEST_ROUNDS_PER_RUN = 25 +const INGEST_ROUNDS_PER_RUN = 5 interface IngestState { unscannedCursor: string From fec45d4aa8d6ab711266c5e74b0255d70ec7d5cf Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 21:45:06 +0200 Subject: [PATCH 4/8] fix: coerce npm version license to string Signed-off-by: anilb --- .../apps/packages_worker/src/npm/normalize.ts | 19 +++++++++++++++++++ .../packages_worker/src/npm/upsertPackage.ts | 3 ++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/npm/normalize.ts b/services/apps/packages_worker/src/npm/normalize.ts index f82ad38e56..c797740ec4 100644 --- a/services/apps/packages_worker/src/npm/normalize.ts +++ b/services/apps/packages_worker/src/npm/normalize.ts @@ -56,6 +56,25 @@ export function normalizeLicenses(packument: Packument): string[] { ) } +// A version's license is sometimes an object ({ type, url }) +// or the legacy array form ([{ type, file }, ...]). Passing those raw +export function versionLicense(raw: unknown): string | null { + if (raw == null) return null + if (typeof raw === 'string') return raw || null + if (Array.isArray(raw)) { + const types = raw + .map((l) => (typeof l === 'string' ? l : isLicenseObject(l) ? l.type : null)) + .filter((t): t is string => Boolean(t)) + return types.length ? types.join(' OR ') : null + } + if (isLicenseObject(raw)) return raw.type ?? null + return null +} + +function isLicenseObject(v: unknown): v is { type?: string } { + return typeof v === 'object' && v !== null +} + function clean(s: string): string { return s.replace(/[()]/g, '').trim() } diff --git a/services/apps/packages_worker/src/npm/upsertPackage.ts b/services/apps/packages_worker/src/npm/upsertPackage.ts index a3d5f5027c..13cfa7a851 100644 --- a/services/apps/packages_worker/src/npm/upsertPackage.ts +++ b/services/apps/packages_worker/src/npm/upsertPackage.ts @@ -15,6 +15,7 @@ import { normalizeLicenses, parseNpmName, stripNullBytesDeep, + versionLicense, } from './normalize' import type { FundingEntry, Packument } from './types' @@ -98,7 +99,7 @@ export async function upsertPackage( publishedAt: time[number] ?? null, isLatest: number === latestVersion, isPrerelease: isPrerelease(number), - license: v.license ?? licenses[0] ?? null, + license: versionLicense(v.license) ?? licenses[0] ?? null, })), ) verChanged.forEach((f) => changed.add(f)) From c2e8d6dd4aa73a2738b24345387093035dbf5847 Mon Sep 17 00:00:00 2001 From: anilb Date: Tue, 9 Jun 2026 22:17:48 +0200 Subject: [PATCH 5/8] fix: handle unpublished npm packument stubs Signed-off-by: anilb --- .../packages_worker/src/npm/activities.ts | 11 ++++++--- .../packages_worker/src/npm/fetchPackument.ts | 23 ++++++++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/services/apps/packages_worker/src/npm/activities.ts b/services/apps/packages_worker/src/npm/activities.ts index 0149bd14cc..1fa7c52645 100644 --- a/services/apps/packages_worker/src/npm/activities.ts +++ b/services/apps/packages_worker/src/npm/activities.ts @@ -124,18 +124,23 @@ async function ingestOne(qx: QueryExecutor, purl: string, dispatcher?: Dispatche } // 429 / 5xx / network → bubble up so Temporal retries the activity with exponential backoff. - if (!isClientError(packumentResult.statusCode, packumentResult.kind)) { + // MALFORMED is permanent (a 200 body that isn't a packument — retrying won't change it), + // so it takes the quick-retry-then-skip path below instead of poisoning the lane forever. + if ( + !isClientError(packumentResult.statusCode, packumentResult.kind) && + packumentResult.kind !== 'MALFORMED' + ) { throw new Error(`Failed to fetch packument for ${name}: ${packumentResult.message}`) } - // 4xx → quick retry a few times (1s, 2s); give up and skip after the last attempt. + // 4xx/MALFORMED → quick retry a few times (1s, 2s); give up and skip after the last attempt. if (attempt < INGEST_4XX_ATTEMPTS) { await sleep(attempt * INGEST_4XX_BACKOFF_MS) continue } log.warn( { purl, statusCode: packumentResult.statusCode, kind: packumentResult.kind }, - 'npm packument 4xx after fast retries — marking scanned and skipping', + 'npm packument 4xx/malformed after fast retries — marking scanned and skipping', ) await markNpmPackageScanned(qx, purl, { status: 'error', diff --git a/services/apps/packages_worker/src/npm/fetchPackument.ts b/services/apps/packages_worker/src/npm/fetchPackument.ts index f5ff0bfdbd..1fa4b7f465 100644 --- a/services/apps/packages_worker/src/npm/fetchPackument.ts +++ b/services/apps/packages_worker/src/npm/fetchPackument.ts @@ -48,7 +48,11 @@ export async function fetchPackument( return { kind: 'MALFORMED', message: 'invalid JSON' } } - if (!isPackument(json)) return { kind: 'MALFORMED', message: 'unexpected shape' } + if (!isPackument(json)) { + const stub = asUnpublishedStub(json) + if (stub) return stub + return { kind: 'MALFORMED', message: 'unexpected shape' } + } delete (json as unknown as Record).readme return json } @@ -56,3 +60,20 @@ export async function fetchPackument( function isPackument(v: unknown): v is Packument { return typeof v === 'object' && v !== null && 'name' in v && 'versions' in v && 'dist-tags' in v } + +// A fully unpublished package returns HTTP 200 with a stub document — just name + time, +// where time.unpublished records the unpublish event; there are no versions/dist-tags keys, +// so isPackument rejects it. Normalize the stub into an empty packument with `unpublished` +// set, so ingest stores status='unpublished' instead of erroring on shape. +function asUnpublishedStub(v: unknown): Packument | null { + if (typeof v !== 'object' || v === null) return null + const o = v as Record + if (typeof o.name !== 'string' || typeof o.time !== 'object' || o.time === null) return null + const t = o.time as Record + if (!('unpublished' in t)) return null + const time: Record = {} + for (const [key, value] of Object.entries(t)) { + if (typeof value === 'string') time[key] = value + } + return { name: o.name, 'dist-tags': {}, versions: {}, time, unpublished: t.unpublished } +} From ecbc91c2120ed396281a802896d6a0c2c45b45db Mon Sep 17 00:00:00 2001 From: anilb Date: Thu, 18 Jun 2026 17:35:25 +0200 Subject: [PATCH 6/8] fix: address npm ingest PR review Signed-off-by: anilb --- .../apps/packages_worker/src/npm/normalize.ts | 18 +++++++++++------- .../src/packages/maintainers.ts | 6 +++--- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/services/apps/packages_worker/src/npm/normalize.ts b/services/apps/packages_worker/src/npm/normalize.ts index c797740ec4..0658888a51 100644 --- a/services/apps/packages_worker/src/npm/normalize.ts +++ b/services/apps/packages_worker/src/npm/normalize.ts @@ -56,23 +56,27 @@ export function normalizeLicenses(packument: Packument): string[] { ) } -// A version's license is sometimes an object ({ type, url }) -// or the legacy array form ([{ type, file }, ...]). Passing those raw +// A version's `license` field comes in several shapes: a plain SPDX string ("MIT"), +// an object ({ type, url }), or the legacy array form ([{ type, file }, ...]). Passing those +// raw into a text column would persist objects/arrays, so collapse every shape to a single +// string (OR-joined for the array form) or null. Non-string `type` values are dropped. export function versionLicense(raw: unknown): string | null { if (raw == null) return null if (typeof raw === 'string') return raw || null if (Array.isArray(raw)) { const types = raw - .map((l) => (typeof l === 'string' ? l : isLicenseObject(l) ? l.type : null)) + .map((l) => (typeof l === 'string' ? l : licenseType(l))) .filter((t): t is string => Boolean(t)) return types.length ? types.join(' OR ') : null } - if (isLicenseObject(raw)) return raw.type ?? null - return null + return licenseType(raw) } -function isLicenseObject(v: unknown): v is { type?: string } { - return typeof v === 'object' && v !== null +// Extract a string `type` from a license object, or null if absent/non-string. +function licenseType(v: unknown): string | null { + if (typeof v !== 'object' || v === null) return null + const type = (v as { type?: unknown }).type + return typeof type === 'string' ? type : null } function clean(s: string): string { diff --git a/services/libs/data-access-layer/src/packages/maintainers.ts b/services/libs/data-access-layer/src/packages/maintainers.ts index 2230245b5a..43a828c08a 100644 --- a/services/libs/data-access-layer/src/packages/maintainers.ts +++ b/services/libs/data-access-layer/src/packages/maintainers.ts @@ -14,9 +14,9 @@ export async function upsertNpmMaintainers( ): Promise { const changed = new Set() - // Lock maintainers rows in a deterministic (username) order so concurrent ingest lanes - // enriching different packages that share maintainers can't acquire the row locks in cyclic order - const ordered = [...maintainers].sort((a, b) => a.username.localeCompare(b.username)) + const ordered = [...maintainers].sort((a, b) => + a.username < b.username ? -1 : a.username > b.username ? 1 : 0, + ) for (const m of ordered) { const row: { changed_fields: string[] } = await qx.selectOne( From 6b7bbe7f755a08113578075c7af6c247cc16ab41 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Jun 2026 10:47:06 +0200 Subject: [PATCH 7/8] fix: harden npm unpublished stub ingest Signed-off-by: anilb --- services/apps/packages_worker/src/npm/fetchPackument.ts | 6 ++++-- services/libs/data-access-layer/src/packages/packages.ts | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/services/apps/packages_worker/src/npm/fetchPackument.ts b/services/apps/packages_worker/src/npm/fetchPackument.ts index 1fa4b7f465..d79fb0d6d0 100644 --- a/services/apps/packages_worker/src/npm/fetchPackument.ts +++ b/services/apps/packages_worker/src/npm/fetchPackument.ts @@ -70,10 +70,12 @@ function asUnpublishedStub(v: unknown): Packument | null { const o = v as Record if (typeof o.name !== 'string' || typeof o.time !== 'object' || o.time === null) return null const t = o.time as Record - if (!('unpublished' in t)) return null + const unpublished = t.unpublished + if (typeof unpublished !== 'object' || unpublished === null) return null + if (typeof (unpublished as Record).time !== 'string') return null const time: Record = {} for (const [key, value] of Object.entries(t)) { if (typeof value === 'string') time[key] = value } - return { name: o.name, 'dist-tags': {}, versions: {}, time, unpublished: t.unpublished } + return { name: o.name, 'dist-tags': {}, versions: {}, time, unpublished } } diff --git a/services/libs/data-access-layer/src/packages/packages.ts b/services/libs/data-access-layer/src/packages/packages.ts index 4e2d5585ea..fc8f258d76 100644 --- a/services/libs/data-access-layer/src/packages/packages.ts +++ b/services/libs/data-access-layer/src/packages/packages.ts @@ -66,7 +66,11 @@ export async function upsertNpmPackage( dist_tags_latest = EXCLUDED.dist_tags_latest, dist_tags_next = EXCLUDED.dist_tags_next, dist_tags_beta = EXCLUDED.dist_tags_beta, - versions_count = EXCLUDED.versions_count, + -- An unpublished stub reports 0 versions; don't clobber a previously-known count + -- (keep it consistent with the version rows that are retained on unpublish). + versions_count = CASE WHEN EXCLUDED.versions_count = 0 + THEN packages.versions_count + ELSE EXCLUDED.versions_count END, latest_version = EXCLUDED.latest_version, first_release_at = EXCLUDED.first_release_at, latest_release_at = EXCLUDED.latest_release_at, From ffcd8fb270cd7ef70330638d49ced924014dc61f Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Jun 2026 16:32:28 +0200 Subject: [PATCH 8/8] fix: drop blank npm version licenses Signed-off-by: anilb --- .../apps/packages_worker/src/npm/normalize.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/services/apps/packages_worker/src/npm/normalize.ts b/services/apps/packages_worker/src/npm/normalize.ts index 0658888a51..8b2f2eec96 100644 --- a/services/apps/packages_worker/src/npm/normalize.ts +++ b/services/apps/packages_worker/src/npm/normalize.ts @@ -59,24 +59,28 @@ export function normalizeLicenses(packument: Packument): string[] { // A version's `license` field comes in several shapes: a plain SPDX string ("MIT"), // an object ({ type, url }), or the legacy array form ([{ type, file }, ...]). Passing those // raw into a text column would persist objects/arrays, so collapse every shape to a single -// string (OR-joined for the array form) or null. Non-string `type` values are dropped. +// string (OR-joined for the array form) or null. Non-string or blank `type` values are dropped. export function versionLicense(raw: unknown): string | null { if (raw == null) return null - if (typeof raw === 'string') return raw || null + if (typeof raw === 'string') return blankToNull(raw) if (Array.isArray(raw)) { const types = raw - .map((l) => (typeof l === 'string' ? l : licenseType(l))) - .filter((t): t is string => Boolean(t)) + .map((l) => (typeof l === 'string' ? blankToNull(l) : licenseType(l))) + .filter((t): t is string => t !== null) return types.length ? types.join(' OR ') : null } return licenseType(raw) } -// Extract a string `type` from a license object, or null if absent/non-string. function licenseType(v: unknown): string | null { if (typeof v !== 'object' || v === null) return null const type = (v as { type?: unknown }).type - return typeof type === 'string' ? type : null + return typeof type === 'string' ? blankToNull(type) : null +} + +function blankToNull(s: string): string | null { + const trimmed = s.trim() + return trimmed || null } function clean(s: string): string {