Skip to content
11 changes: 8 additions & 3 deletions services/apps/packages_worker/src/npm/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,23 @@ async function ingestOne(qx: QueryExecutor, purl: string, dispatcher?: Dispatche
}

// 429 / 5xx / network → bubble up so Temporal retries the activity with exponential backoff.
if (!isClientError(packumentResult.statusCode, packumentResult.kind)) {
// MALFORMED is permanent (a 200 body that isn't a packument — retrying won't change it),
// so it takes the quick-retry-then-skip path below instead of poisoning the lane forever.
if (
!isClientError(packumentResult.statusCode, packumentResult.kind) &&
packumentResult.kind !== 'MALFORMED'
) {
throw new Error(`Failed to fetch packument for ${name}: ${packumentResult.message}`)
}

// 4xx → quick retry a few times (1s, 2s); give up and skip after the last attempt.
// 4xx/MALFORMED → quick retry a few times (1s, 2s); give up and skip after the last attempt.
if (attempt < INGEST_4XX_ATTEMPTS) {
await sleep(attempt * INGEST_4XX_BACKOFF_MS)
continue
}
log.warn(
{ purl, statusCode: packumentResult.statusCode, kind: packumentResult.kind },
'npm packument 4xx after fast retries — marking scanned and skipping',
'npm packument 4xx/malformed after fast retries — marking scanned and skipping',
)
await markNpmPackageScanned(qx, purl, {
status: 'error',
Expand Down
25 changes: 24 additions & 1 deletion services/apps/packages_worker/src/npm/fetchPackument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,34 @@ export async function fetchPackument(
return { kind: 'MALFORMED', message: 'invalid JSON' }
}

if (!isPackument(json)) return { kind: 'MALFORMED', message: 'unexpected shape' }
if (!isPackument(json)) {
const stub = asUnpublishedStub(json)
if (stub) return stub
return { kind: 'MALFORMED', message: 'unexpected shape' }
}
delete (json as unknown as Record<string, unknown>).readme
return json
}

function isPackument(v: unknown): v is Packument {
return typeof v === 'object' && v !== null && 'name' in v && 'versions' in v && 'dist-tags' in v
}

// A fully unpublished package returns HTTP 200 with a stub document — just name + time,
// where time.unpublished records the unpublish event; there are no versions/dist-tags keys,
// so isPackument rejects it. Normalize the stub into an empty packument with `unpublished`
// set, so ingest stores status='unpublished' instead of erroring on shape.
function asUnpublishedStub(v: unknown): Packument | null {
if (typeof v !== 'object' || v === null) return null
const o = v as Record<string, unknown>
if (typeof o.name !== 'string' || typeof o.time !== 'object' || o.time === null) return null
const t = o.time as Record<string, unknown>
const unpublished = t.unpublished
if (typeof unpublished !== 'object' || unpublished === null) return null
if (typeof (unpublished as Record<string, unknown>).time !== 'string') return null
const time: Record<string, string> = {}
for (const [key, value] of Object.entries(t)) {
if (typeof value === 'string') time[key] = value
}
return { name: o.name, 'dist-tags': {}, versions: {}, time, unpublished }
}
27 changes: 27 additions & 0 deletions services/apps/packages_worker/src/npm/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ export function normalizeLicenses(packument: Packument): string[] {
)
}

// A version's `license` field comes in several shapes: a plain SPDX string ("MIT"),
// an object ({ type, url }), or the legacy array form ([{ type, file }, ...]). Passing those
// raw into a text column would persist objects/arrays, so collapse every shape to a single
// string (OR-joined for the array form) or null. Non-string or blank `type` values are dropped.
export function versionLicense(raw: unknown): string | null {
if (raw == null) return null
if (typeof raw === 'string') return blankToNull(raw)
if (Array.isArray(raw)) {
const types = raw
.map((l) => (typeof l === 'string' ? blankToNull(l) : licenseType(l)))
.filter((t): t is string => t !== null)
return types.length ? types.join(' OR ') : null
}
return licenseType(raw)
}

function licenseType(v: unknown): string | null {
if (typeof v !== 'object' || v === null) return null
const type = (v as { type?: unknown }).type
return typeof type === 'string' ? blankToNull(type) : null
}

function blankToNull(s: string): string | null {
const trimmed = s.trim()
return trimmed || null
}

function clean(s: string): string {
return s.replace(/[()]/g, '').trim()
}
Expand Down
3 changes: 2 additions & 1 deletion services/apps/packages_worker/src/npm/upsertPackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
normalizeLicenses,
parseNpmName,
stripNullBytesDeep,
versionLicense,
} from './normalize'
import type { FundingEntry, Packument } from './types'

Expand Down Expand Up @@ -98,7 +99,7 @@ export async function upsertPackage(
publishedAt: time[number] ?? null,
isLatest: number === latestVersion,
isPrerelease: isPrerelease(number),
license: v.license ?? licenses[0] ?? null,
license: versionLicense(v.license) ?? licenses[0] ?? null,
Comment thread
cursor[bot] marked this conversation as resolved.
})),
)
verChanged.forEach((f) => changed.add(f))
Expand Down
2 changes: 1 addition & 1 deletion services/apps/packages_worker/src/npm/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const heartbeatingActs = proxyActivities<typeof activities>({

// Per lane, per round. Total purls fetched per round = lanes × INGEST_PER_LANE.
const INGEST_PER_LANE = 50
const INGEST_ROUNDS_PER_RUN = 25
const INGEST_ROUNDS_PER_RUN = 5

interface IngestState {
unscannedCursor: string
Expand Down
8 changes: 6 additions & 2 deletions services/libs/data-access-layer/src/packages/maintainers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ export async function upsertNpmMaintainers(
): Promise<string[]> {
const changed = new Set<string>()

for (const m of maintainers) {
const ordered = [...maintainers].sort((a, b) =>
a.username < b.username ? -1 : a.username > b.username ? 1 : 0,
)

for (const m of ordered) {
const row: { changed_fields: string[] } = await qx.selectOne(
`WITH old AS (
SELECT display_name, email FROM maintainers WHERE ecosystem = 'npm' AND username = $(username)
Expand Down Expand Up @@ -50,7 +54,7 @@ export async function upsertNpmMaintainers(
})

const afterMap = new Map<string, string | null>()
for (const m of maintainers) {
for (const m of ordered) {
const row: { maintainer_id: string } | null = await qx.selectOneOrNone(
`INSERT INTO package_maintainers (package_id, maintainer_id, role, created_at, updated_at)
SELECT $(packageId)::bigint, id, $(role), NOW(), NOW() FROM maintainers WHERE ecosystem = 'npm' AND username = $(username)
Expand Down
6 changes: 5 additions & 1 deletion services/libs/data-access-layer/src/packages/packages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ export async function upsertNpmPackage(
dist_tags_latest = EXCLUDED.dist_tags_latest,
dist_tags_next = EXCLUDED.dist_tags_next,
dist_tags_beta = EXCLUDED.dist_tags_beta,
versions_count = EXCLUDED.versions_count,
-- An unpublished stub reports 0 versions; don't clobber a previously-known count
-- (keep it consistent with the version rows that are retained on unpublish).
versions_count = CASE WHEN EXCLUDED.versions_count = 0
THEN packages.versions_count
ELSE EXCLUDED.versions_count END,
Comment thread
epipav marked this conversation as resolved.
latest_version = EXCLUDED.latest_version,
first_release_at = EXCLUDED.first_release_at,
latest_release_at = EXCLUDED.latest_release_at,
Expand Down
37 changes: 23 additions & 14 deletions services/libs/data-access-layer/src/packages/repos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,31 @@ export async function getOrCreateRepoByUrl(
url: string,
host: string,
): Promise<{ id: string; changedFields: string[] }> {
const row: { id: string; created: boolean } = await qx.selectOne(
`
WITH ins AS (
INSERT INTO repos (url, host) VALUES ($(url), $(host))
ON CONFLICT (url) DO NOTHING
RETURNING id
)
SELECT id::text AS id, true AS created FROM ins
UNION ALL
SELECT id::text AS id, false AS created
FROM repos
WHERE url = $(url) AND NOT EXISTS (SELECT 1 FROM ins)
`,
// Repos are shared across packages (every package in a monorepo points at one repo)
// so this is by far the common case
const existing: { id: string } | null = await qx.selectOneOrNone(
`SELECT id::text AS id FROM repos WHERE url = $(url)`,
{ url },
)
if (existing) return { id: existing.id, changedFields: [] }

// Not seen yet — try to create it. ON CONFLICT DO NOTHING so a concurrent ingest lane creating
// the same shared repo doesn't raise a unique violation.
const inserted: { id: string } | null = await qx.selectOneOrNone(
`INSERT INTO repos (url, host) VALUES ($(url), $(host))
ON CONFLICT (url) DO NOTHING
RETURNING id::text AS id`,
{ url, host },
)
return { id: row.id, changedFields: row.created ? ['repos.url', 'repos.host'] : [] }
if (inserted) return { id: inserted.id, changedFields: ['repos.url', 'repos.host'] }

// Lost the race: another lane committed the same url between our SELECT and INSERT, so
// ON CONFLICT DO NOTHING returned no row. Re-read in a fresh statement — under READ COMMITTED
const row: { id: string } = await qx.selectOne(
`SELECT id::text AS id FROM repos WHERE url = $(url)`,
{ url },
)
return { id: row.id, changedFields: [] }
}

export async function upsertPackageRepo(
Expand Down
Loading