diff --git a/doc/rfc/submitqueue/extension-contract.md b/doc/rfc/submitqueue/extension-contract.md index 21a51535..007b4a47 100644 --- a/doc/rfc/submitqueue/extension-contract.md +++ b/doc/rfc/submitqueue/extension-contract.md @@ -37,12 +37,12 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and | `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider | | `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver | | `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider | -| `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider | +| `pusher.Pusher` *(removed)* | merge | — | **moved out-of-process to runway** (`merger` / `merger-signal`); see the note below the table | — | — | | `storage`, `changestore`, `queueconfig` | — | keys + entities | unchanged — resolution targets | entities | — | -**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes. +**Outputs are unchanged.** This RFC moves the *input* toward identity; the four live return contracts — conflicts, score, change info, build id/status — are exactly what they are today. (The `pusher` row is not an in-process extension: merge runs out-of-process in runway, so its output is not part of this catalog — see the note below.) No other output shape changes. -The validate-time mergeability check runs **asynchronously and out-of-process** in runway rather than as an in-process extension: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-checker` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The in-process `mergechecker` package is unused on the validate path. +The validate-time mergeability **check** and the **merge** itself both run **asynchronously and out-of-process** in runway rather than as in-process extensions, over the one shared `MergeRequest`/`MergeResult` contract — a check is a dry run of a merge. `validate` hands off directly to runway (→ `merge-conflict-checker`, result back via `mergeconflictsignal`); `merge` hands the batch to runway (→ `merger`, result back via `mergesignal`) rather than calling an in-process `pusher`. See [workflow.md](workflow.md). The in-process `mergechecker` and `pusher` packages are unused on the pipeline path. Non-obvious points: diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index 544d4714..0f1c938b 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -1,81 +1,85 @@ # Orchestrator Workflow -The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. A stage that crosses a service boundary is the exception: it publishes a full payload to the other service's queue and consumes a full payload back, because neither service can read the other's storage. (The `validate`→`mergeconflictsignal` hop is one such stage: `validate` hands a check to runway and `mergeconflictsignal` consumes the result.) See the queue-payload-boundary rule in [CLAUDE.md](../../../CLAUDE.md). +The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. Some stages cross a service boundary: they publish a full payload to the other service's queue and consume a full payload back, because neither service can read the other's storage. (The `validate` and `merge` stages both hand work to runway — a merge-conflict check and the merge itself — and consume its result on `mergeconflictsignal` / `mergesignal`.) See the queue-payload-boundary rule in [CLAUDE.md](../../../CLAUDE.md). -The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. +The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → runway → mergesignal → speculate` (land the batch out of process, then advance the next). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. ## Diagram ``` - ┌──────────────────────────────────┐ - │ gateway:Land (RPC entry) │ - │ Accept, mint ID, hand off async │ - └────────────────┬─────────────────┘ - │ LandRequest - ▼ - ┌──────────────────────┐ ┌──────────────────────────────────┐ - │ gateway: log │◄───│ start │ - │ Persist request log │ │ Persist Request, emit Started │ - └──────────────────────┘ └────────────────┬─────────────────┘ - ▲ │ RequestID - │ ▼ - │ ┌──────────────────────────────────┐ - │ │ validate │ - │ │ Dedup, fetch metadata, publish │ - │ │ check request to runway │ - │ └────────────────┬─────────────────┘ - │ MergeRequest - │ ▼ - │ ╔══════════════════════════════════╗ - │ ║ runway (separate service) ║ - │ ║ Attempt merge, emit result ║ - │ ╚════════════════┬═════════════════╝ - │ MergeResult - │ ▼ - │ ┌──────────────────────────────────┐ - │ │ mergeconflictsignal │ - │ │ Correlate result, gate request │ - │ └────────────────┬─────────────────┘ - │ │ RequestID - │ ▼ - │ ┌──────────────────────────────────┐ - │ │ batch │ - │ │ Group request into a Batch │ - │ └────────────────┬─────────────────┘ - │ │ BatchID - │ ▼ - │ ┌──────────────────────────────────┐ - ├─────────────────│ score │ - │ RequestLog×N │ Score the batch, persist score │ - │ └────────────────┬─────────────────┘ - │ │ BatchID - │ ▼ - │ ┌──────────────────────────────────┐ - │ ┌───►│ speculate (stub) │◄────┐ - │ │ │ Decide CI verify vs. land │ │ - │ │ └──────┬─────────────────┬─────────┘ │ - │ │ BatchID │ │ BatchID │ - │ │ ▼ ▼ │ - │ │ ┌──────────────────┐ ┌──────────────────┐ │ - │ │ │ build │ │ merge │ │ - │ │ │ Trigger CI build │ │ Merge + advance │─┤ - │ │ └────────┬─────────┘ └────────┬─────────┘ │ - │ │ Build │ │ BatchID │ - │ │ ▼ │ │ - │ │ ┌──────────────────┐ │ │ - │ └──│ buildsignal │ │ │ - │ BatchID │ Feed CI result │ │ │ - │ │ back to spec. │ │ │ - │ └──────────────────┘ │ │ - │ ▲ │ BatchID │ - │ │ Build (ext. CI) ▼ │ - │ │ ┌──────────────────┐ │ - │ │ │ conclude │ │ - │ │ │ Map batch state │ │ - │ │ │ → request state │ │ - │ │ └──────────────────┘ │ - │ │ │ - └─── any controller via submitqueue/core/request.PublishLog ──────┘ + +----------------------------------+ + | gateway:Land (RPC entry) | + | Accept, mint ID, hand off async | + +----------------+-----------------+ + | LandRequest + v + +----------------------+ +----------------------------------+ + | gateway: log |<---| start | + | Persist request log | | Persist Request, emit Started | + +----------------------+ +----------------+-----------------+ + ^ | RequestID + | v + | +----------------------------------+ + | | validate | + | | Dedup, fetch metadata, publish | + | | check request to runway | + | +----------------+-----------------+ + | MergeRequest + | v + | #################################### + | # runway (separate service) # + | # Dry-run merge, emit result # + | ####################+############### + | MergeResult + | v + | +----------------------------------+ + | | mergeconflictsignal | + | | Correlate result, gate request | + | +----------------+-----------------+ + | | RequestID + | v + | +----------------------------------+ + | | batch | + | | Group request into a Batch | + | +----------------+-----------------+ + | | BatchID + | v + | +----------------------------------+ + +-----------------| score | + | RequestLog*N | Score the batch, persist score | + | +----------------+-----------------+ + | | BatchID + | v + | +----------------------------------+ + | +--->| speculate (stub) |<----+ + | | | Decide CI verify vs. land | | + | | +------+-----------------+---------+ | + | | BatchID | | BatchID | + | | v v | + | | +------------------+ +------------------+ | + | | | build | | merge | | + | | | Trigger CI build | | Publish to runway| | + | | +--------+---------+ +--------+---------+ | + | | Build | MergeRequest | + | | v v | + | | +------------------+ #################### | + | +--| buildsignal | # runway (sep.) # | + | BatchID | Feed CI result | # Merge, emit res. # | + | | back to spec. | ########+########### | + | +------------------+ MergeResult | + | ^ v | + | Build (ext.CI) | +------------------+ | + | | | mergesignal |--+ + | | | Gate batch + fan | | + | | +--------+---------+ | + | | | BatchID | + | | +------------------+ | + | | | conclude | | + | | | Map batch state | | + | | | -> request state | | + | | +------------------+ | + | | + +--- any controller via submitqueue/core/request.PublishLog ------+ ``` ## Per-controller summary @@ -91,7 +95,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **speculate** | BatchID | build, merge | (stub) Decide whether to verify via CI or land | | **build** | BatchID | buildsignal | Trigger CI build for the batch | | **buildsignal** | Build | speculate | Feed CI result back into speculation | -| **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | +| **merge** | BatchID | merger (runway) | Build the full merge request from the batch's member requests and publish to runway, keyed by the batch id (the correlation id) | +| **mergesignal** | MergeResult | conclude, speculate | Correlate runway's result; mark the batch Succeeded/Failed and fan out | | **conclude** | BatchID | — | Map terminal batch state to request state | | **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage | @@ -99,7 +104,7 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". -The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected request (`RequestID`) or batch (`BatchID`) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches, with fan-out to the member requests. A DLQ whose topic carries a full payload rather than a bare ID recovers the id from that payload instead — for example the `mergeconflictsignal` DLQ reads it from the runway `MergeResult` the producer echoed back. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected request (`RequestID`) or batch (`BatchID`) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches, with fan-out to the member requests. A DLQ whose topic carries a full payload rather than a bare ID recovers the id from that payload instead — the `mergeconflictsignal` and `mergesignal` DLQs read it from the runway `MergeResult` the producer echoed back. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 829f8658..efb7645d 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -35,9 +35,6 @@ go_library( "//submitqueue/extension/conflict/fake", "//submitqueue/extension/conflict/fileoverlap", "//submitqueue/extension/conflict/none", - "//submitqueue/extension/pusher", - "//submitqueue/extension/pusher/fake", - "//submitqueue/extension/pusher/git", "//submitqueue/extension/scorer", "//submitqueue/extension/scorer/composite", "//submitqueue/extension/scorer/fake", @@ -53,6 +50,7 @@ go_library( "//submitqueue/orchestrator/controller/dlq", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/mergeconflictsignal", + "//submitqueue/orchestrator/controller/mergesignal", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", "//submitqueue/orchestrator/controller/start", @@ -67,7 +65,7 @@ go_library( ) go_binary( - name = "orchestrator", + name = "server", embed = [":orchestrator_lib"], visibility = ["//visibility:public"], ) diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index e854ff17..2df34af0 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -54,9 +54,6 @@ import ( conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake" "github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap" "github.com/uber/submitqueue/submitqueue/extension/conflict/none" - "github.com/uber/submitqueue/submitqueue/extension/pusher" - pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake" - gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git" "github.com/uber/submitqueue/submitqueue/extension/scorer" "github.com/uber/submitqueue/submitqueue/extension/scorer/composite" scorerfake "github.com/uber/submitqueue/submitqueue/extension/scorer/fake" @@ -72,6 +69,7 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergesignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/start" @@ -239,13 +237,12 @@ func run() error { // Per-extension factories all resolve against the registry by queue name. cpf := changeProviderFactory{queues} - pshf := pusherFactory{queues} brf := buildRunnerFactory{queues} scf := scorerFactory{queues} cof := analyzerFactory{queues} // Register controllers - primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store) + primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, brf, scf, cof, cnt, store) if err != nil { return err } @@ -382,6 +379,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe {topickey.TopicKeyBuild, "build", "orchestrator-build"}, {topickey.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, {topickey.TopicKeyMerge, "merge", "orchestrator-merge"}, + {runwaymq.TopicKeyMergeSignal, "merger-signal", "orchestrator-mergesignal"}, {topickey.TopicKeyConclude, "conclude", "orchestrator-conclude"}, } @@ -441,6 +439,17 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe Queue: q, }) + // Publish-only: the orchestrator hands merge requests to runway via the + // runway-owned merger queue. Runway is the sole consumer, so the + // orchestrator registers no consuming subscription (and no DLQ) here; the + // inbound result arrives on the separate merger-signal queue, which is a + // consumed primary topic above. + configs = append(configs, consumer.TopicConfig{ + Key: runwaymq.TopicKeyMerge, + Name: "merger", + Queue: q, + }) + return consumer.NewTopicRegistry(configs) } @@ -473,11 +482,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // // queueExtensions is the full set of extension implementations for a single // queue. Grouping them per queue (rather than per extension) lets the wiring -// read as "for this queue, here are its scorer, analyzer, pusher, …", and lets +// read as "for this queue, here are its scorer, analyzer, change provider, …", and lets // a queue profile start from a baseline and override only what differs. type queueExtensions struct { changeProvider changeprovider.ChangeProvider - pusher pusher.Pusher buildRunner buildrunner.BuildRunner scorer scorer.Scorer analyzer conflict.Analyzer @@ -508,12 +516,6 @@ func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.Ch return f.reg.get(cfg.QueueName).changeProvider, nil } -type pusherFactory struct{ reg queueRegistry } - -func (f pusherFactory) For(cfg pusher.Config) (pusher.Pusher, error) { - return f.reg.get(cfg.QueueName).pusher, nil -} - type buildRunnerFactory struct{ reg queueRegistry } func (f buildRunnerFactory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) { @@ -532,7 +534,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) { return f.reg.get(cfg.QueueName).analyzer, nil } -func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { +func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) { var count int requestController := start.NewController( logger, @@ -663,7 +665,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope, store, registry, - pshf, + runwaymq.TopicKeyMerge, topickey.TopicKeyMerge, "orchestrator-merge", ) @@ -672,6 +674,19 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, } count++ + mergesignalController := mergesignal.NewController( + logger, + scope, + store, + registry, + runwaymq.TopicKeyMergeSignal, + "orchestrator-mergesignal", + ) + if err := c.Register(mergesignalController); err != nil { + return count, fmt.Errorf("failed to register mergesignal controller: %w", err) + } + count++ + concludeController := conclude.NewController( logger, scope, @@ -708,6 +723,7 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop {"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuild), "orchestrator-build-dlq")}, {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq")}, + {"mergesignal_dlq", dlq.NewDLQMergeSignalController(logger, dlqScope, store, dlq.TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq")}, {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyConclude), "orchestrator-conclude-dlq")}, } var count int @@ -768,29 +784,8 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch }), nil } -// newPusher creates a git-backed Pusher bound to the configured checkout path, -// remote, and target branch (PUSHER_CHECKOUT_PATH, PUSHER_REMOTE default -// "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it -// returns the fake pusher (commits succeed unless a change URI carries a failure -// marker, see pusher/fake), keeping the example runnable without a git checkout. -func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) { - checkout := os.Getenv("PUSHER_CHECKOUT_PATH") - if checkout == "" { - logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)") - return pushfake.New(resolver), nil - } - return gitpusher.NewPusher(gitpusher.Params{ - CheckoutPath: checkout, - Remote: getEnv("PUSHER_REMOTE", "origin"), - Target: getEnv("PUSHER_TARGET", "main"), - Resolver: resolver, - Logger: logger.Sugar(), - MetricsScope: scope.SubScope("pusher"), - }), nil -} - // newQueueRegistry builds the per-queue extension profiles for the example. -// Edge integrations (merge checker, change provider, pusher) and the build +// Edge integrations (change provider) and the build // runner form a shared baseline; each per-queue profile starts from that // baseline and overrides only the extensions that differ — here the scorer and // conflict analyzer. Queues without an explicit profile fall back to the @@ -801,10 +796,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. if err != nil { return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err) } - psh, err := newPusher(logger, scope, resolver) - if err != nil { - return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err) - } // batchLines buckets a batch by total lines changed across all its changes — // larger batches are likelier to fail to land. @@ -826,7 +817,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset. // below does. base := queueExtensions{ changeProvider: cp, - pusher: psh, buildRunner: buildfake.New(resolver), scorer: scorerfake.New(resolver, heuristic.New( resolver, diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel index db85dd8a..e3094740 100644 --- a/submitqueue/orchestrator/controller/dlq/BUILD.bazel +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "dlq.go", "log.go", "mergeconflictsignal.go", + "mergesignal.go", "request.go", ], importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq", @@ -31,6 +32,7 @@ go_test( "dlq_test.go", "log_test.go", "mergeconflictsignal_test.go", + "mergesignal_test.go", "request_test.go", ], embed = [":dlq"], diff --git a/submitqueue/orchestrator/controller/dlq/mergesignal.go b/submitqueue/orchestrator/controller/dlq/mergesignal.go new file mode 100644 index 00000000..bb47fa4c --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergesignal.go @@ -0,0 +1,108 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// mergeSignalController is the DLQ reconciler for the mergesignal topic. Its +// payload carries a runway MergeResult whose id is the batch id echoed back, so +// reconciliation fails that batch directly via failBatch (which also fans out +// to the member requests). +type mergeSignalController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify mergeSignalController implements consumer.Controller at compile time. +var _ consumer.Controller = (*mergeSignalController)(nil) + +// NewDLQMergeSignalController builds a DLQ controller for the mergesignal topic. +func NewDLQMergeSignalController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &mergeSignalController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for the mergesignal topic. +func (c *mergeSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + result, err := runwaymq.MergeResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to decode merge result from dlq payload: %w", err) + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "batch_id", result.Id, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + if err := failBatch(ctx, c.store, c.logger, result.Id); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *mergeSignalController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *mergeSignalController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *mergeSignalController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/mergesignal_test.go b/submitqueue/orchestrator/controller/dlq/mergesignal_test.go new file mode 100644 index 00000000..52ca5954 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/mergesignal_test.go @@ -0,0 +1,85 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQMergeSignalController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQMergeSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq") + + assert.Equal(t, "merger-signal_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("merger-signal_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-mergesignal-dlq", c.ConsumerGroup()) +} + +// The payload id is the batch id echoed back, so reconciliation fails the batch +// and fans out to its member requests via failBatch. +func TestDLQMergeSignalController_Process_ReconcilesBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/1").Return(entity.Batch{ + ID: "q/batch/1", Queue: "q", Contains: []string{"q/1"}, + State: entity.BatchStateMerging, Version: 2, + }, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), "q/batch/1", int32(2), int32(3), entity.BatchStateFailed).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQMergeSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq") + + payload, err := runwaymq.MergeResultToBytes(&runwaymq.MergeResult{Id: "q/batch/1", Success: false, Reason: "boom"}) + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQMergeSignalController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQMergeSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq") + + delivery := newMockDelivery(ctrl, []byte("garbage")) + require.Error(t, c.Process(context.Background(), delivery)) +} diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index f6dd9146..b31f408c 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -6,12 +6,15 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge", visibility = ["//visibility:public"], deps = [ + "//api/base/change/protopb", + "//api/base/mergestrategy/protopb", + "//api/runway/messagequeue", + "//platform/base/mergestrategy", "//platform/base/messagequeue", "//platform/consumer", + "//platform/errs", "//platform/metrics", - "//submitqueue/core/topickey", "//submitqueue/entity", - "//submitqueue/extension/pusher", "//submitqueue/extension/storage", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", @@ -23,15 +26,16 @@ go_test( srcs = ["merge_test.go"], embed = [":merge"], deps = [ + "//api/base/mergestrategy/protopb", + "//api/runway/messagequeue", "//platform/base/change", + "//platform/base/mergestrategy", "//platform/base/messagequeue", "//platform/consumer", "//platform/errs", "//platform/extension/messagequeue/mock", "//submitqueue/core/topickey", "//submitqueue/entity", - "//submitqueue/extension/pusher", - "//submitqueue/extension/pusher/mock", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index b814fda0..86a9f8e5 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -12,85 +12,102 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package merge implements the trigger stage for the asynchronous merge. It +// consumes a batch ready to land, builds the full merge request from the +// batch's member requests (one step per request, in Contains order), and +// publishes it to runway's merger queue using the batch id as the client-owned +// correlation id. Runway performs the merge out of process and publishes the +// result to the merger-signal queue, which the mergesignal stage consumes and +// correlates back to the batch by that id. package merge import ( "context" - "errors" "fmt" "github.com/uber-go/tally" "go.uber.org/zap" + changepb "github.com/uber/submitqueue/api/base/change/protopb" + strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" "github.com/uber/submitqueue/platform/consumer" - coremetrics "github.com/uber/submitqueue/platform/metrics" - "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/metrics" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/pusher" "github.com/uber/submitqueue/submitqueue/extension/storage" ) -// Controller handles merge queue messages. It loads every request in a batch, -// hands the resulting list of Changes to the configured Pusher, and -// transitions the batch to a terminal state based on the Pusher's outcome. -// After updating state it forwards the batch to conclude (so requests pick -// up the outcome) and to speculate (so downstream batches can re-plan). +// Controller handles merge queue messages. Implements consumer.Controller. // -// Conflicts are user-caused: the batch goes to BatchStateFailed and the -// queue message is acked. Any other Pusher error is treated as transient -// infra: the batch is left in place and the message is nacked. +// It loads the batch and its member requests, assembles the full merge request +// (one step per member request, in Contains order, each carrying that request's +// change and land strategy), and publishes it to runway's merger queue. Runway +// performs the merge out of process and returns the result on the merger-signal +// queue; the mergesignal stage consumes it and transitions the batch. This +// controller therefore performs no state transition itself. type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - store storage.Storage - registry consumer.TopicRegistry - pushers pusher.Factory - topicKey consumer.TopicKey - consumerGroup string + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry + runwayTopicKey consumer.TopicKey + topicKey consumer.TopicKey + consumerGroup string } // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) // NewController creates a new merge controller for the orchestrator. +// runwayTopicKey is the runway-owned topic this controller publishes merge +// requests to (TopicKeyMerge). func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry, - pushers pusher.Factory, + runwayTopicKey consumer.TopicKey, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ - logger: logger.Named("merge_controller"), - metricsScope: scope.SubScope("merge_controller"), - store: store, - registry: registry, - pushers: pushers, - topicKey: topicKey, - consumerGroup: consumerGroup, + logger: logger.Named("merge_controller"), + metricsScope: scope.SubScope("merge_controller"), + store: store, + registry: registry, + runwayTopicKey: runwayTopicKey, + topicKey: topicKey, + consumerGroup: consumerGroup, } } -// Process performs the merge for a batch and forwards it to conclude/speculate. -// Returns nil to ack (success), or error to nack (retry). +// Process publishes the full merge request to runway. Returns nil to ack +// (success), or error to nack/reject. +// +// Error classification: deserialize and storage failures are non-retryable +// (reject to DLQ). The publish to runway is retryable — it is the hand-off that +// keeps the merge alive, so a transient enqueue blip should replay rather than +// strand the batch. func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { - op := coremetrics.Begin(c.metricsScope, "process") + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) defer func() { op.Complete(retErr) }() msg := delivery.Message() bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -103,89 +120,92 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) - // Cancelling intent: the cancel controller marked this batch as not landing - // and handed it off to speculate. Silently ack — do not push (the inherent - // push-already-committed race is acknowledged elsewhere) and do not fan out - // (speculate owns the terminal write to Cancelled and the downstream - // dependent / conclude publishes). - if batch.State == entity.BatchStateCancelling { - coremetrics.NamedCounter(c.metricsScope, "process", "skipped_cancelling", 1) + // Short-circuit halted batches (terminal or cancelling): no merge should be + // kicked off for a batch that will not proceed. Unlike the old synchronous + // merge there is no terminal re-fan-out here — the mergesignal stage owns the + // state transition and fan-out once runway's result returns, so a redelivery + // at this stage simply acks. + if entity.IsBatchStateHalted(batch.State) { + metrics.NamedCounter(c.metricsScope, opName, "skipped_halted", 1) + c.logger.Infow("skipping merge for halted batch", + "batch_id", batch.ID, + "state", string(batch.State), + ) return nil } - // Idempotency: if the batch is already in a terminal state, a previous - // attempt has already merged (or failed) — just re-fan-out the events - // in case downstream stages missed them. - if batch.State.IsTerminal() { - coremetrics.NamedCounter(c.metricsScope, "process", "skipped_terminal", 1) - return c.fanout(ctx, batch.ID, batch.Queue) - } - - push, err := c.pushers.For(pusher.Config{QueueName: batch.Queue}) + // Build the full payload runway needs to perform the merge. The batch id is + // the client-owned correlation id, so a redelivery republishes the same id + // and runway dedupes on it; the result is matched straight back to the batch. + req, err := c.buildMergeRequest(ctx, batch) if err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) - return fmt.Errorf("failed to build pusher for batch %s: %w", batch.ID, err) + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to build merge request for batch %s: %w", batch.ID, err) } - // Push a single batch today; the pusher resolves its changes itself. The - // list parameter designs for a future merge-train. - pushRes, pushErr := push.Push(ctx, []entity.Batch{batch}) - - var newState entity.BatchState - switch { - case pushErr == nil: - newState = entity.BatchStateSucceeded - c.logger.Infow("merged batch", - "batch_id", batch.ID, - "outcomes", pushRes.Batches, - ) - case errors.Is(pushErr, pusher.ErrConflict): - coremetrics.NamedCounter(c.metricsScope, "process", "push_conflicts", 1) - newState = entity.BatchStateFailed - c.logger.Warnw("batch merge failed", - "batch_id", batch.ID, - "state", string(newState), - "error", pushErr, - ) - default: - coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) - return fmt.Errorf("push failed for batch %s: %w", batch.ID, pushErr) + if err := c.publish(ctx, c.runwayTopicKey, req, batch.Queue); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + // Retryable: the hand-off to runway is what keeps this merge alive. + return errs.NewRetryableError(fmt.Errorf("failed to publish to runway merger: %w", err)) } - newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "state_update_errors", 1) - return fmt.Errorf("failed to transition batch %s to %s: %w", batch.ID, newState, err) - } - batch.Version = newVersion - batch.State = newState + c.logger.Infow("published merge to runway", + "batch_id", batch.ID, + "steps", len(req.Steps), + "topic_key", c.runwayTopicKey, + ) - return c.fanout(ctx, batch.ID, batch.Queue) + return nil // Success - message will be acked } -// fanout publishes the batch ID to conclude (so requests are updated) and -// to speculate (so dependents can re-evaluate now that this batch is done). -func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { - if err := c.publish(ctx, topickey.TopicKeyConclude, batchID, partitionKey); err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "publish_conclude_errors", 1) - return fmt.Errorf("failed to publish to conclude: %w", err) +// buildMergeRequest loads the batch's member requests and assembles the runway +// merge request: one MergeStep per request, in Contains order, attributed by +// request id and carrying that request's change and land strategy. +func (c *Controller) buildMergeRequest(ctx context.Context, batch entity.Batch) (*runwaymq.MergeRequest, error) { + steps := make([]*runwaymq.MergeStep, 0, len(batch.Contains)) + for _, requestID := range batch.Contains { + request, err := c.store.GetRequestStore().Get(ctx, requestID) + if err != nil { + return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) + } + steps = append(steps, &runwaymq.MergeStep{ + StepId: request.ID, + Changes: []*changepb.Change{{Uris: request.Change.URIs}}, + Strategy: toProtoStrategy(request.LandStrategy), + }) } - if err := c.publish(ctx, topickey.TopicKeySpeculate, batchID, partitionKey); err != nil { - coremetrics.NamedCounter(c.metricsScope, "process", "publish_speculate_errors", 1) - return fmt.Errorf("failed to publish to speculate: %w", err) + return &runwaymq.MergeRequest{ + Id: batch.ID, + QueueName: batch.Queue, + Steps: steps, + }, nil +} + +// toProtoStrategy maps the shared mergestrategy.MergeStrategy entity to the +// proto Strategy enum carried on the wire. An unknown strategy maps to DEFAULT, +// letting runway apply the queue's configured default. +func toProtoStrategy(s mergestrategy.MergeStrategy) strategypb.Strategy { + switch s { + case mergestrategy.MergeStrategyRebase: + return strategypb.Strategy_REBASE + case mergestrategy.MergeStrategySquashRebase: + return strategypb.Strategy_SQUASH_REBASE + case mergestrategy.MergeStrategyMerge: + return strategypb.Strategy_MERGE + default: + return strategypb.Strategy_DEFAULT } - return nil } -// publish publishes a batch ID to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { - bid := entity.BatchID{ID: batchID} - payload, err := bid.ToBytes() +// publish serializes the runway merge request and publishes it to the given +// topic key, partitioned by queue. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, req *runwaymq.MergeRequest, partitionKey string) error { + payload, err := runwaymq.MergeRequestToBytes(req) if err != nil { - return fmt.Errorf("failed to serialize batch ID: %w", err) + return fmt.Errorf("failed to serialize merge request: %w", err) } - msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) + msg := entityqueue.NewMessage(req.Id, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index 21de482f..bcbe0f4b 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -25,15 +25,16 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" + strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" "github.com/uber/submitqueue/platform/base/change" + "github.com/uber/submitqueue/platform/base/mergestrategy" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/errs" queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/pusher" - pushermock "github.com/uber/submitqueue/submitqueue/extension/pusher/mock" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" ) @@ -52,42 +53,28 @@ func newDelivery(t *testing.T, ctrl *gomock.Controller, batchID, partitionKey st return delivery } -// newRegistry returns a registry where conclude and speculate accept any publish. -func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consumer.TopicRegistry { - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, _ string, _ entityqueue.Message) error { return publishErr }, - ).AnyTimes() - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - - registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, - }) - require.NoError(t, err) - return registry -} - -// newPusherFactory wraps a Pusher in a factory that returns it for any entityqueue. -func newPusherFactory(ctrl *gomock.Controller, p pusher.Pusher) pusher.Factory { - f := pushermock.NewMockFactory(ctrl) - f.EXPECT().For(gomock.Any()).Return(p, nil).AnyTimes() - return f -} - -func TestNewController(t *testing.T) { - ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) - c := NewController( +func newController(t *testing.T, store *storagemock.MockStorage, registry consumer.TopicRegistry) *Controller { + return NewController( zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), + registry, + runwaymq.TopicKeyMerge, topickey.TopicKeyMerge, "orchestrator-merge", ) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + q := queuemock.NewMockQueue(ctrl) + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMerge, Name: "merger", Queue: q}}, + ) + require.NoError(t, err) + + c := newController(t, store, registry) require.NotNil(t, c) assert.Equal(t, topickey.TopicKeyMerge, c.TopicKey()) @@ -96,282 +83,148 @@ func TestNewController(t *testing.T) { var _ consumer.Controller = c } -func TestController_Process_SuccessfulMerge(t *testing.T) { +func TestProcess_PublishesFullPayloadToRunway(t *testing.T) { ctrl := gomock.NewController(t) - const reqID = "test-queue/1" const batchID = "test-queue/batch/1" - + req1 := entity.Request{ + ID: "test-queue/1", + Queue: "test-queue", + Change: change.Change{URIs: []string{"github://uber/repo/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}, + LandStrategy: mergestrategy.MergeStrategySquashRebase, + } + req2 := entity.Request{ + ID: "test-queue/2", + Queue: "test-queue", + Change: change.Change{URIs: []string{"github://uber/repo/pull/2/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}}, + LandStrategy: mergestrategy.MergeStrategyRebase, + } batch := entity.Batch{ ID: batchID, Queue: "test-queue", - Contains: []string{reqID}, + Contains: []string{req1.ID, req2.ID}, State: entity.BatchStateMerging, Version: 4, } - change := change.Change{URIs: []string{"github://o/r/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}} batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(4), int32(5), entity.BatchStateSucceeded).Return(nil) + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), req1.ID).Return(req1, nil) + reqStore.EXPECT().Get(gomock.Any(), req2.ID).Return(req2, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, batches []entity.Batch) (entity.PushResult, error) { - require.Len(t, batches, 1) - assert.Equal(t, batch.ID, batches[0].ID) - return entity.PushResult{Batches: []entity.BatchOutcome{{ - BatchID: batch.ID, - Outcomes: []entity.ChangeOutcome{{ - Change: change, - Status: entity.OutcomeStatusCommitted, - CommitSHAs: []string{"deadbeef"}, - }}, - }}}, nil + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil }, ) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMerge, Name: "merger", Queue: q}}, ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) -} - -// TestController_Process_ForwardsBatchToPusher verifies the controller forwards -// the batch identity (with its full Contains, in order) to the pusher, which -// resolves the changes itself. Change resolution order is the pusher's concern, -// covered by the git pusher tests. -func TestController_Process_ForwardsBatchToPusher(t *testing.T) { - ctrl := gomock.NewController(t) - const batchID = "test-queue/batch/multi" - requestIDs := []string{"test-queue/1", "test-queue/2", "test-queue/3"} + c := newController(t, store, registry) + require.NoError(t, c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue))) - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: requestIDs, - State: entity.BatchStateMerging, - Version: 1, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, batches []entity.Batch) (entity.PushResult, error) { - require.Len(t, batches, 1) - assert.Equal(t, requestIDs, batches[0].Contains, "batch forwarded with Contains in order") - return entity.PushResult{Batches: []entity.BatchOutcome{{BatchID: batchID}}}, nil - }, - ) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + // Full payload published to runway, keyed by the batch id (the correlation id). + assert.Equal(t, "merger", gotTopic) + got, err := runwaymq.MergeRequestFromBytes(gotPayload) require.NoError(t, err) + assert.Equal(t, batch.ID, got.Id) + assert.Equal(t, batch.Queue, got.QueueName) + require.Len(t, got.Steps, 2) + // One step per member request, in Contains order, attributed by request id. + assert.Equal(t, req1.ID, got.Steps[0].StepId) + require.Len(t, got.Steps[0].Changes, 1) + assert.Equal(t, req1.Change.URIs, got.Steps[0].Changes[0].Uris) + assert.Equal(t, strategypb.Strategy_SQUASH_REBASE, got.Steps[0].Strategy) + assert.Equal(t, req2.ID, got.Steps[1].StepId) + require.Len(t, got.Steps[1].Changes, 1) + assert.Equal(t, req2.Change.URIs, got.Steps[1].Changes[0].Uris) + assert.Equal(t, strategypb.Strategy_REBASE, got.Steps[1].Strategy) } -func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { - ctrl := gomock.NewController(t) - - const reqID = "test-queue/2" - const batchID = "test-queue/batch/2" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: []string{reqID}, - State: entity.BatchStateMerging, - Version: 3, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( - entity.PushResult{}, - fmt.Errorf("apply: %w", pusher.ErrConflict), - ) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.NoError(t, err, "conflict ack-s the message; failure is recorded on the batch") -} - -func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { - ctrl := gomock.NewController(t) - - const reqID = "test-queue/3" - const batchID = "test-queue/batch/3" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: []string{reqID}, - State: entity.BatchStateMerging, - Version: 1, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( - entity.PushResult{}, - fmt.Errorf("ssh: connection refused"), - ) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.Error(t, err) -} - -func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { +func TestProcess_HaltedBatchSkips(t *testing.T) { for _, state := range []entity.BatchState{ entity.BatchStateSucceeded, entity.BatchStateFailed, entity.BatchStateCancelled, + entity.BatchStateCancelling, } { t.Run(string(state), func(t *testing.T) { ctrl := gomock.NewController(t) - const batchID = "test-queue/batch/4" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - State: state, - Version: 7, - } + const batchID = "test-queue/batch/halted" + batch := entity.Batch{ID: batchID, Queue: "test-queue", State: state, Version: 7} batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + // No request-store reads and no publish for a halted batch: gomock + // fails if GetRequestStore or Publish is touched. store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - // Push must NOT be called for an already-terminal batch. - mockPusher := pushermock.NewMockPusher(ctrl) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", + pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMerge, Name: "merger", Queue: q}}, ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) + + c := newController(t, store, registry) + require.NoError(t, c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue))) }) } } -// BatchStateCancelling must be silently acked: no push, and crucially no -// fan-out (no publish to conclude or speculate). The cancel controller owns -// the terminal write and the downstream publishes; conclude would error on -// a non-terminal Cancelling batch. -func TestController_Process_CancellingShortCircuit(t *testing.T) { +func TestProcess_PublishFailureIsRetryable(t *testing.T) { ctrl := gomock.NewController(t) - const batchID = "test-queue/batch/4c" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - State: entity.BatchStateCancelling, - Version: 7, - } + const batchID = "test-queue/batch/2" + req := entity.Request{ID: "test-queue/1", Queue: "test-queue", LandStrategy: mergestrategy.MergeStrategyRebase} + batch := entity.Batch{ID: batchID, Queue: "test-queue", Contains: []string{req.ID}, State: entity.BatchStateMerging, Version: 1} batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - // Pusher and publisher with no EXPECTs — neither must be called. - mockPusher := pushermock.NewMockPusher(ctrl) - mockPub := queuemock.NewMockPublisher(ctrl) - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - - registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, - }) - require.NoError(t, err) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - registry, - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("enqueue failed")) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMerge, Name: "merger", Queue: q}}, ) + require.NoError(t, err) + c := newController(t, store, registry) err = c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.NoError(t, err) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) } -func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { +func TestProcess_BatchStoreGetFailureNotRetryable(t *testing.T) { ctrl := gomock.NewController(t) - const batchID = "test-queue/batch/5" + const batchID = "test-queue/batch/3" batchStore := storagemock.NewMockBatchStore(ctrl) batchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{}, fmt.Errorf("db connection lost")) @@ -379,62 +232,14 @@ func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, nil), - newPusherFactory(ctrl, pushermock.NewMockPusher(ctrl)), - topickey.TopicKeyMerge, - "orchestrator-merge", + q := queuemock.NewMockQueue(ctrl) + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: runwaymq.TopicKeyMerge, Name: "merger", Queue: q}}, ) + require.NoError(t, err) - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, "test-queue")) + c := newController(t, store, registry) + err = c.Process(context.Background(), newDelivery(t, ctrl, batchID, "test-queue")) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } - -func TestController_Process_PublishFailureSurfaces(t *testing.T) { - ctrl := gomock.NewController(t) - - const reqID = "test-queue/7" - const batchID = "test-queue/batch/7" - - batch := entity.Batch{ - ID: batchID, - Queue: "test-queue", - Contains: []string{reqID}, - State: entity.BatchStateMerging, - Version: 2, - } - - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) - batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(2), int32(3), entity.BatchStateSucceeded).Return(nil) - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - - mockPusher := pushermock.NewMockPusher(ctrl) - mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( - entity.PushResult{Batches: []entity.BatchOutcome{{ - BatchID: batchID, - Outcomes: []entity.ChangeOutcome{{ - Status: entity.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, - }}, - }}}, nil, - ) - - c := NewController( - zaptest.NewLogger(t).Sugar(), - tally.NoopScope, - store, - newRegistry(t, ctrl, fmt.Errorf("queue down")), - newPusherFactory(ctrl, mockPusher), - topickey.TopicKeyMerge, - "orchestrator-merge", - ) - - err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) - require.Error(t, err) -} diff --git a/submitqueue/orchestrator/controller/mergesignal/BUILD.bazel b/submitqueue/orchestrator/controller/mergesignal/BUILD.bazel new file mode 100644 index 00000000..259e5175 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergesignal/BUILD.bazel @@ -0,0 +1,39 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergesignal", + srcs = ["mergesignal.go"], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergesignal", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergesignal_test", + srcs = ["mergesignal_test.go"], + embed = [":mergesignal"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//submitqueue/core/topickey", + "//submitqueue/entity", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/submitqueue/orchestrator/controller/mergesignal/mergesignal.go b/submitqueue/orchestrator/controller/mergesignal/mergesignal.go new file mode 100644 index 00000000..1bc6d058 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergesignal/mergesignal.go @@ -0,0 +1,207 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mergesignal consumes merge results from runway's merger-signal queue, +// correlates them to the batch by the echoed id, and transitions the batch to a +// terminal state — Succeeded when runway merged the batch, Failed when it could +// not — then fans the batch out to conclude (so member requests pick up the +// outcome) and speculate (so dependents can re-plan). Like mergeconflictsignal +// it is purely result-driven — runway pushes the result, so there is no poll +// loop or self-reschedule. +package mergesignal + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// Controller handles mergesignal queue messages. Implements consumer.Controller. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new mergesignal controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + registry consumer.TopicRegistry, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("mergesignal_controller"), + metricsScope: scope.SubScope("mergesignal_controller"), + store: store, + registry: registry, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process consumes a runway merge result and advances or fails the batch. +// Returns nil to ack, or error to nack/reject. +// +// A not-merged verdict is an expected outcome of the merge, not a failure: the +// batch is driven to terminal Failed inline and the message is acked. Only +// infrastructure faults — deserialize, storage, the state transition, and the +// fan-out publishes — return an error and reject to the DLQ, where the batch is +// reconciled to Failed. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + // The runway result carries full data (it crosses the service boundary). Its + // id is the batch id echoed back, so correlate straight to the batch. + result, err := runwaymq.MergeResultFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge result: %w", err) + } + + batch, err := c.store.GetBatchStore().Get(ctx, result.Id) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get batch %s: %w", result.Id, err) + } + + c.logger.Infow("received merge signal", + "batch_id", batch.ID, + "merged", result.Success, + "state", string(batch.State), + "version", batch.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // Cancelling: the cancel path (via speculate) owns the terminal write and the + // downstream fan-out for a batch the user asked to cancel. Silently ack — do + // not transition (a racing terminal merge result must not override the + // cancel) and do not fan out. + if batch.State == entity.BatchStateCancelling { + metrics.NamedCounter(c.metricsScope, opName, "skipped_cancelling", 1) + return nil + } + + // Idempotency: a previous delivery already transitioned this batch to a + // terminal state. Re-fan-out in case that attempt missed the downstream + // publishes, then ack. + if batch.State.IsTerminal() { + metrics.NamedCounter(c.metricsScope, opName, "skipped_terminal", 1) + return c.fanout(ctx, batch.ID, batch.Queue) + } + + var newState entity.BatchState + if result.Success { + newState = entity.BatchStateSucceeded + c.logger.Infow("merged batch", + "batch_id", batch.ID, + "steps", result.Steps, + ) + } else { + metrics.NamedCounter(c.metricsScope, opName, "not_merged", 1) + newState = entity.BatchStateFailed + c.logger.Warnw("batch merge failed", + "batch_id", batch.ID, + "reason", result.Reason, + ) + } + + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "state_update_errors", 1) + return fmt.Errorf("failed to transition batch %s to %s: %w", batch.ID, newState, err) + } + batch.Version = newVersion + batch.State = newState + + return c.fanout(ctx, batch.ID, batch.Queue) +} + +// fanout publishes the batch ID to conclude (so requests are updated) and to +// speculate (so dependents can re-evaluate now that this batch is done). +func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { + if err := c.publish(ctx, topickey.TopicKeyConclude, batchID, partitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "publish_conclude_errors", 1) + return fmt.Errorf("failed to publish to conclude: %w", err) + } + if err := c.publish(ctx, topickey.TopicKeySpeculate, batchID, partitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "publish_speculate_errors", 1) + return fmt.Errorf("failed to publish to speculate: %w", err) + } + return nil +} + +// publish publishes a batch ID to the given topic key, partitioned by queue. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + payload, err := entity.BatchID{ID: batchID}.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize batch ID: %w", err) + } + + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "mergesignal" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/mergesignal/mergesignal_test.go b/submitqueue/orchestrator/controller/mergesignal/mergesignal_test.go new file mode 100644 index 00000000..4505be15 --- /dev/null +++ b/submitqueue/orchestrator/controller/mergesignal/mergesignal_test.go @@ -0,0 +1,212 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergesignal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/submitqueue/core/topickey" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +const ( + testBatchID = "test-queue/batch/1" + testQueue = "test-queue" +) + +func resultPayload(t *testing.T, res runwaymq.MergeResult) []byte { + payload, err := runwaymq.MergeResultToBytes(&res) + require.NoError(t, err) + return payload +} + +func newDelivery(ctrl *gomock.Controller, msg entityqueue.Message) *queuemock.MockDelivery { + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + return d +} + +// recordingRegistry returns a registry whose conclude and speculate topics +// share one publisher that records the topic names it is asked to publish to. +func recordingRegistry(t *testing.T, ctrl *gomock.Controller, got *[]string) consumer.TopicRegistry { + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, _ entityqueue.Message) error { + *got = append(*got, topic) + return nil + }, + ).AnyTimes() + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyConclude, Name: "conclude", Queue: q}, + {Key: topickey.TopicKeySpeculate, Name: "speculate", Queue: q}, + }) + require.NoError(t, err) + return registry +} + +func newController(t *testing.T, store *storagemock.MockStorage, registry consumer.TopicRegistry) *Controller { + return NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + registry, + runwaymq.TopicKeyMergeSignal, + "orchestrator-mergesignal", + ) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + assert.Equal(t, consumer.TopicKey(runwaymq.TopicKeyMergeSignal), c.TopicKey()) + assert.Equal(t, "orchestrator-mergesignal", c.ConsumerGroup()) + assert.Equal(t, "mergesignal", c.Name()) + var _ consumer.Controller = c +} + +func TestProcess_MergedAdvancesBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), testBatchID).Return( + entity.Batch{ID: testBatchID, Queue: testQueue, State: entity.BatchStateMerging, Version: 1}, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), testBatchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + res := runwaymq.MergeResult{ + Id: testBatchID, + Success: true, + Steps: []*runwaymq.StepResult{{StepId: "test-queue/1", OutputIds: []string{"deadbeef"}}}, + } + msg := entityqueue.NewMessage(testBatchID, resultPayload(t, res), testQueue, nil) + require.NoError(t, c.Process(context.Background(), newDelivery(ctrl, msg))) + + // Fans the batch out to conclude (requests pick up the outcome) and + // speculate (dependents re-plan). + assert.ElementsMatch(t, []string{"conclude", "speculate"}, got) +} + +func TestProcess_NotMergedMarksBatchFailed(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), testBatchID).Return( + entity.Batch{ID: testBatchID, Queue: testQueue, State: entity.BatchStateMerging, Version: 3}, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), testBatchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + res := runwaymq.MergeResult{Id: testBatchID, Success: false, Reason: "conflict in foo.go"} + msg := entityqueue.NewMessage(testBatchID, resultPayload(t, res), testQueue, nil) + // Not-merged is an expected terminal outcome, so Process acks (no error). + require.NoError(t, c.Process(context.Background(), newDelivery(ctrl, msg))) + + assert.ElementsMatch(t, []string{"conclude", "speculate"}, got) +} + +func TestProcess_CancellingShortCircuit(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), testBatchID).Return( + entity.Batch{ID: testBatchID, Queue: testQueue, State: entity.BatchStateCancelling, Version: 4}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + // No UpdateState and no fan-out: gomock fails if either runs. + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + res := runwaymq.MergeResult{Id: testBatchID, Success: true} + msg := entityqueue.NewMessage(testBatchID, resultPayload(t, res), testQueue, nil) + require.NoError(t, c.Process(context.Background(), newDelivery(ctrl, msg))) + assert.Empty(t, got) +} + +func TestProcess_TerminalReFansOut(t *testing.T) { + ctrl := gomock.NewController(t) + + // Already terminal (a prior delivery won): no state write, but re-fan-out in + // case the earlier attempt missed the downstream publishes. + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), testBatchID).Return( + entity.Batch{ID: testBatchID, Queue: testQueue, State: entity.BatchStateSucceeded, Version: 5}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + res := runwaymq.MergeResult{Id: testBatchID, Success: true} + msg := entityqueue.NewMessage(testBatchID, resultPayload(t, res), testQueue, nil) + require.NoError(t, c.Process(context.Background(), newDelivery(ctrl, msg))) + assert.ElementsMatch(t, []string{"conclude", "speculate"}, got) +} + +func TestProcess_DeserializeErrorRejects(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + msg := entityqueue.NewMessage(testBatchID, []byte("garbage"), testQueue, nil) + require.Error(t, c.Process(context.Background(), newDelivery(ctrl, msg))) +} + +func TestProcess_StorageErrorRejects(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), testBatchID).Return(entity.Batch{}, assert.AnError) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + var got []string + c := newController(t, store, recordingRegistry(t, ctrl, &got)) + + res := runwaymq.MergeResult{Id: testBatchID, Success: true} + msg := entityqueue.NewMessage(testBatchID, resultPayload(t, res), testQueue, nil) + require.Error(t, c.Process(context.Background(), newDelivery(ctrl, msg))) +}