diff --git a/services/apps/packages_worker/src/npm/activities.ts b/services/apps/packages_worker/src/npm/activities.ts index 0149bd14cc..1fa7c52645 100644 --- a/services/apps/packages_worker/src/npm/activities.ts +++ b/services/apps/packages_worker/src/npm/activities.ts @@ -124,18 +124,23 @@ async function ingestOne(qx: QueryExecutor, purl: string, dispatcher?: Dispatche } // 429 / 5xx / network → bubble up so Temporal retries the activity with exponential backoff. - if (!isClientError(packumentResult.statusCode, packumentResult.kind)) { + // MALFORMED is permanent (a 200 body that isn't a packument — retrying won't change it), + // so it takes the quick-retry-then-skip path below instead of poisoning the lane forever. + if ( + !isClientError(packumentResult.statusCode, packumentResult.kind) && + packumentResult.kind !== 'MALFORMED' + ) { throw new Error(`Failed to fetch packument for ${name}: ${packumentResult.message}`) } - // 4xx → quick retry a few times (1s, 2s); give up and skip after the last attempt. + // 4xx/MALFORMED → quick retry a few times (1s, 2s); give up and skip after the last attempt. if (attempt < INGEST_4XX_ATTEMPTS) { await sleep(attempt * INGEST_4XX_BACKOFF_MS) continue } log.warn( { purl, statusCode: packumentResult.statusCode, kind: packumentResult.kind }, - 'npm packument 4xx after fast retries — marking scanned and skipping', + 'npm packument 4xx/malformed after fast retries — marking scanned and skipping', ) await markNpmPackageScanned(qx, purl, { status: 'error', diff --git a/services/apps/packages_worker/src/npm/fetchPackument.ts b/services/apps/packages_worker/src/npm/fetchPackument.ts index f5ff0bfdbd..d79fb0d6d0 100644 --- a/services/apps/packages_worker/src/npm/fetchPackument.ts +++ b/services/apps/packages_worker/src/npm/fetchPackument.ts @@ -48,7 +48,11 @@ export async function fetchPackument( return { kind: 'MALFORMED', message: 'invalid JSON' } } - if (!isPackument(json)) return { kind: 'MALFORMED', message: 'unexpected shape' } + if (!isPackument(json)) { + const stub = asUnpublishedStub(json) + if (stub) return stub + return { kind: 'MALFORMED', message: 'unexpected shape' } + } delete (json as unknown as Record).readme return json } @@ -56,3 +60,22 @@ export async function fetchPackument( function isPackument(v: unknown): v is Packument { return typeof v === 'object' && v !== null && 'name' in v && 'versions' in v && 'dist-tags' in v } + +// A fully unpublished package returns HTTP 200 with a stub document — just name + time, +// where time.unpublished records the unpublish event; there are no versions/dist-tags keys, +// so isPackument rejects it. Normalize the stub into an empty packument with `unpublished` +// set, so ingest stores status='unpublished' instead of erroring on shape. +function asUnpublishedStub(v: unknown): Packument | null { + if (typeof v !== 'object' || v === null) return null + const o = v as Record + if (typeof o.name !== 'string' || typeof o.time !== 'object' || o.time === null) return null + const t = o.time as Record + const unpublished = t.unpublished + if (typeof unpublished !== 'object' || unpublished === null) return null + if (typeof (unpublished as Record).time !== 'string') return null + const time: Record = {} + for (const [key, value] of Object.entries(t)) { + if (typeof value === 'string') time[key] = value + } + return { name: o.name, 'dist-tags': {}, versions: {}, time, unpublished } +} diff --git a/services/apps/packages_worker/src/npm/normalize.ts b/services/apps/packages_worker/src/npm/normalize.ts index f82ad38e56..8b2f2eec96 100644 --- a/services/apps/packages_worker/src/npm/normalize.ts +++ b/services/apps/packages_worker/src/npm/normalize.ts @@ -56,6 +56,33 @@ export function normalizeLicenses(packument: Packument): string[] { ) } +// A version's `license` field comes in several shapes: a plain SPDX string ("MIT"), +// an object ({ type, url }), or the legacy array form ([{ type, file }, ...]). Passing those +// raw into a text column would persist objects/arrays, so collapse every shape to a single +// string (OR-joined for the array form) or null. Non-string or blank `type` values are dropped. +export function versionLicense(raw: unknown): string | null { + if (raw == null) return null + if (typeof raw === 'string') return blankToNull(raw) + if (Array.isArray(raw)) { + const types = raw + .map((l) => (typeof l === 'string' ? blankToNull(l) : licenseType(l))) + .filter((t): t is string => t !== null) + return types.length ? types.join(' OR ') : null + } + return licenseType(raw) +} + +function licenseType(v: unknown): string | null { + if (typeof v !== 'object' || v === null) return null + const type = (v as { type?: unknown }).type + return typeof type === 'string' ? blankToNull(type) : null +} + +function blankToNull(s: string): string | null { + const trimmed = s.trim() + return trimmed || null +} + function clean(s: string): string { return s.replace(/[()]/g, '').trim() } diff --git a/services/apps/packages_worker/src/npm/upsertPackage.ts b/services/apps/packages_worker/src/npm/upsertPackage.ts index a3d5f5027c..13cfa7a851 100644 --- a/services/apps/packages_worker/src/npm/upsertPackage.ts +++ b/services/apps/packages_worker/src/npm/upsertPackage.ts @@ -15,6 +15,7 @@ import { normalizeLicenses, parseNpmName, stripNullBytesDeep, + versionLicense, } from './normalize' import type { FundingEntry, Packument } from './types' @@ -98,7 +99,7 @@ export async function upsertPackage( publishedAt: time[number] ?? null, isLatest: number === latestVersion, isPrerelease: isPrerelease(number), - license: v.license ?? licenses[0] ?? null, + license: versionLicense(v.license) ?? licenses[0] ?? null, })), ) verChanged.forEach((f) => changed.add(f)) diff --git a/services/apps/packages_worker/src/npm/workflows.ts b/services/apps/packages_worker/src/npm/workflows.ts index e638aff394..4f6512d408 100644 --- a/services/apps/packages_worker/src/npm/workflows.ts +++ b/services/apps/packages_worker/src/npm/workflows.ts @@ -27,7 +27,7 @@ const heartbeatingActs = proxyActivities({ // Per lane, per round. Total purls fetched per round = lanes × INGEST_PER_LANE. const INGEST_PER_LANE = 50 -const INGEST_ROUNDS_PER_RUN = 25 +const INGEST_ROUNDS_PER_RUN = 5 interface IngestState { unscannedCursor: string diff --git a/services/libs/data-access-layer/src/packages/maintainers.ts b/services/libs/data-access-layer/src/packages/maintainers.ts index 6cd5680e53..43a828c08a 100644 --- a/services/libs/data-access-layer/src/packages/maintainers.ts +++ b/services/libs/data-access-layer/src/packages/maintainers.ts @@ -14,7 +14,11 @@ export async function upsertNpmMaintainers( ): Promise { const changed = new Set() - for (const m of maintainers) { + const ordered = [...maintainers].sort((a, b) => + a.username < b.username ? -1 : a.username > b.username ? 1 : 0, + ) + + for (const m of ordered) { const row: { changed_fields: string[] } = await qx.selectOne( `WITH old AS ( SELECT display_name, email FROM maintainers WHERE ecosystem = 'npm' AND username = $(username) @@ -50,7 +54,7 @@ export async function upsertNpmMaintainers( }) const afterMap = new Map() - for (const m of maintainers) { + for (const m of ordered) { const row: { maintainer_id: string } | null = await qx.selectOneOrNone( `INSERT INTO package_maintainers (package_id, maintainer_id, role, created_at, updated_at) SELECT $(packageId)::bigint, id, $(role), NOW(), NOW() FROM maintainers WHERE ecosystem = 'npm' AND username = $(username) diff --git a/services/libs/data-access-layer/src/packages/packages.ts b/services/libs/data-access-layer/src/packages/packages.ts index 4e2d5585ea..fc8f258d76 100644 --- a/services/libs/data-access-layer/src/packages/packages.ts +++ b/services/libs/data-access-layer/src/packages/packages.ts @@ -66,7 +66,11 @@ export async function upsertNpmPackage( dist_tags_latest = EXCLUDED.dist_tags_latest, dist_tags_next = EXCLUDED.dist_tags_next, dist_tags_beta = EXCLUDED.dist_tags_beta, - versions_count = EXCLUDED.versions_count, + -- An unpublished stub reports 0 versions; don't clobber a previously-known count + -- (keep it consistent with the version rows that are retained on unpublish). + versions_count = CASE WHEN EXCLUDED.versions_count = 0 + THEN packages.versions_count + ELSE EXCLUDED.versions_count END, latest_version = EXCLUDED.latest_version, first_release_at = EXCLUDED.first_release_at, latest_release_at = EXCLUDED.latest_release_at, diff --git a/services/libs/data-access-layer/src/packages/repos.ts b/services/libs/data-access-layer/src/packages/repos.ts index 678bcc9532..469ce883a8 100644 --- a/services/libs/data-access-layer/src/packages/repos.ts +++ b/services/libs/data-access-layer/src/packages/repos.ts @@ -5,22 +5,31 @@ export async function getOrCreateRepoByUrl( url: string, host: string, ): Promise<{ id: string; changedFields: string[] }> { - const row: { id: string; created: boolean } = await qx.selectOne( - ` - WITH ins AS ( - INSERT INTO repos (url, host) VALUES ($(url), $(host)) - ON CONFLICT (url) DO NOTHING - RETURNING id - ) - SELECT id::text AS id, true AS created FROM ins - UNION ALL - SELECT id::text AS id, false AS created - FROM repos - WHERE url = $(url) AND NOT EXISTS (SELECT 1 FROM ins) - `, + // Repos are shared across packages (every package in a monorepo points at one repo) + // so this is by far the common case + const existing: { id: string } | null = await qx.selectOneOrNone( + `SELECT id::text AS id FROM repos WHERE url = $(url)`, + { url }, + ) + if (existing) return { id: existing.id, changedFields: [] } + + // Not seen yet — try to create it. ON CONFLICT DO NOTHING so a concurrent ingest lane creating + // the same shared repo doesn't raise a unique violation. + const inserted: { id: string } | null = await qx.selectOneOrNone( + `INSERT INTO repos (url, host) VALUES ($(url), $(host)) + ON CONFLICT (url) DO NOTHING + RETURNING id::text AS id`, { url, host }, ) - return { id: row.id, changedFields: row.created ? ['repos.url', 'repos.host'] : [] } + if (inserted) return { id: inserted.id, changedFields: ['repos.url', 'repos.host'] } + + // Lost the race: another lane committed the same url between our SELECT and INSERT, so + // ON CONFLICT DO NOTHING returned no row. Re-read in a fresh statement — under READ COMMITTED + const row: { id: string } = await qx.selectOne( + `SELECT id::text AS id FROM repos WHERE url = $(url)`, + { url }, + ) + return { id: row.id, changedFields: [] } } export async function upsertPackageRepo(