Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/rfc/submitqueue/extension-contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and
| `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider |
| `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver |
| `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider |
| `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider |
| `pusher.Pusher` *(removed)* | merge | | **moved out-of-process to runway** (`merger` / `merger-signal`); see the note below the table | — | — |
| `storage`, `changestore`, `queueconfig` | — | keys + entities | unchanged — resolution targets | entities | — |

**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes.
**Outputs are unchanged.** This RFC moves the *input* toward identity; the four live return contracts — conflicts, score, change info, build id/status — are exactly what they are today. (The `pusher` row is not an in-process extension: merge runs out-of-process in runway, so its output is not part of this catalog — see the note below.) No other output shape changes.

The validate-time mergeability check runs **asynchronously and out-of-process** in runway rather than as an in-process extension: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-checker` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The in-process `mergechecker` package is unused on the validate path.
The validate-time mergeability **check** and the **merge** itself both run **asynchronously and out-of-process** in runway rather than as in-process extensions, over the one shared `MergeRequest`/`MergeResult` contract — a check is a dry run of a merge. `validate` hands off to the `mergeconflict` controller (→ runway `merge-conflict-checker`, result back via `mergeconflictsignal`); `merge` hands the batch to runway (→ `merger`, result back via `mergesignal`) rather than calling an in-process `pusher`. See [workflow.md](workflow.md). The in-process `mergechecker` and `pusher` packages are unused on the pipeline path.

Non-obvious points:

Expand Down
161 changes: 83 additions & 78 deletions doc/rfc/submitqueue/workflow.md

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ go_library(
"//submitqueue/extension/conflict/fake",
"//submitqueue/extension/conflict/fileoverlap",
"//submitqueue/extension/conflict/none",
"//submitqueue/extension/pusher",
"//submitqueue/extension/pusher/fake",
"//submitqueue/extension/pusher/git",
"//submitqueue/extension/scorer",
"//submitqueue/extension/scorer/composite",
"//submitqueue/extension/scorer/fake",
Expand All @@ -54,6 +51,7 @@ go_library(
"//submitqueue/orchestrator/controller/merge",
"//submitqueue/orchestrator/controller/mergeconflict",
"//submitqueue/orchestrator/controller/mergeconflictsignal",
"//submitqueue/orchestrator/controller/mergesignal",
"//submitqueue/orchestrator/controller/score",
"//submitqueue/orchestrator/controller/speculate",
"//submitqueue/orchestrator/controller/start",
Expand All @@ -68,7 +66,7 @@ go_library(
)

go_binary(
name = "orchestrator",
name = "server",
embed = [":orchestrator_lib"],
visibility = ["//visibility:public"],
)
74 changes: 32 additions & 42 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ import (
conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake"
"github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap"
"github.com/uber/submitqueue/submitqueue/extension/conflict/none"
"github.com/uber/submitqueue/submitqueue/extension/pusher"
pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake"
gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git"
"github.com/uber/submitqueue/submitqueue/extension/scorer"
"github.com/uber/submitqueue/submitqueue/extension/scorer/composite"
scorerfake "github.com/uber/submitqueue/submitqueue/extension/scorer/fake"
Expand All @@ -73,6 +70,7 @@ import (
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflict"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergesignal"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/start"
Expand Down Expand Up @@ -240,13 +238,12 @@ func run() error {

// Per-extension factories all resolve against the registry by queue name.
cpf := changeProviderFactory{queues}
pshf := pusherFactory{queues}
brf := buildRunnerFactory{queues}
scf := scorerFactory{queues}
cof := analyzerFactory{queues}

// Register controllers
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store)
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, brf, scf, cof, cnt, store)
if err != nil {
return err
}
Expand Down Expand Up @@ -384,6 +381,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
{topickey.TopicKeyBuild, "build", "orchestrator-build"},
{topickey.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"},
{topickey.TopicKeyMerge, "merge", "orchestrator-merge"},
{runwaymq.TopicKeyMergeSignal, "merger-signal", "orchestrator-mergesignal"},
{topickey.TopicKeyConclude, "conclude", "orchestrator-conclude"},
}

Expand Down Expand Up @@ -443,6 +441,17 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
Queue: q,
})

// Publish-only: the orchestrator hands merge requests to runway via the
// runway-owned merger queue. Runway is the sole consumer, so the
// orchestrator registers no consuming subscription (and no DLQ) here; the
// inbound result arrives on the separate merger-signal queue, which is a
// consumed primary topic above.
configs = append(configs, consumer.TopicConfig{
Key: runwaymq.TopicKeyMerge,
Name: "merger",
Queue: q,
})

return consumer.NewTopicRegistry(configs)
}

Expand Down Expand Up @@ -475,11 +484,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
//
// queueExtensions is the full set of extension implementations for a single
// queue. Grouping them per queue (rather than per extension) lets the wiring
// read as "for this queue, here are its scorer, analyzer, pusher, …", and lets
// read as "for this queue, here are its scorer, analyzer, change provider, …", and lets
// a queue profile start from a baseline and override only what differs.
type queueExtensions struct {
changeProvider changeprovider.ChangeProvider
pusher pusher.Pusher
buildRunner buildrunner.BuildRunner
scorer scorer.Scorer
analyzer conflict.Analyzer
Expand Down Expand Up @@ -510,12 +518,6 @@ func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.Ch
return f.reg.get(cfg.QueueName).changeProvider, nil
}

type pusherFactory struct{ reg queueRegistry }

func (f pusherFactory) For(cfg pusher.Config) (pusher.Pusher, error) {
return f.reg.get(cfg.QueueName).pusher, nil
}

type buildRunnerFactory struct{ reg queueRegistry }

func (f buildRunnerFactory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) {
Expand All @@ -534,7 +536,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) {
return f.reg.get(cfg.QueueName).analyzer, nil
}

func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
var count int
requestController := start.NewController(
logger,
Expand Down Expand Up @@ -678,7 +680,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
scope,
store,
registry,
pshf,
runwaymq.TopicKeyMerge,
topickey.TopicKeyMerge,
"orchestrator-merge",
)
Expand All @@ -687,6 +689,19 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
}
count++

mergesignalController := mergesignal.NewController(
logger,
scope,
store,
registry,
runwaymq.TopicKeyMergeSignal,
"orchestrator-mergesignal",
)
if err := c.Register(mergesignalController); err != nil {
return count, fmt.Errorf("failed to register mergesignal controller: %w", err)
}
count++

concludeController := conclude.NewController(
logger,
scope,
Expand Down Expand Up @@ -724,6 +739,7 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop
{"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuild), "orchestrator-build-dlq")},
{"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")},
{"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq")},
{"mergesignal_dlq", dlq.NewDLQMergeSignalController(logger, dlqScope, store, dlq.TopicKey(runwaymq.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq")},
{"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyConclude), "orchestrator-conclude-dlq")},
}
var count int
Expand Down Expand Up @@ -784,29 +800,8 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch
}), nil
}

// newPusher creates a git-backed Pusher bound to the configured checkout path,
// remote, and target branch (PUSHER_CHECKOUT_PATH, PUSHER_REMOTE default
// "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it
// returns the fake pusher (commits succeed unless a change URI carries a failure
// marker, see pusher/fake), keeping the example runnable without a git checkout.
func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) {
checkout := os.Getenv("PUSHER_CHECKOUT_PATH")
if checkout == "" {
logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)")
return pushfake.New(resolver), nil
}
return gitpusher.NewPusher(gitpusher.Params{
CheckoutPath: checkout,
Remote: getEnv("PUSHER_REMOTE", "origin"),
Target: getEnv("PUSHER_TARGET", "main"),
Resolver: resolver,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("pusher"),
}), nil
}

// newQueueRegistry builds the per-queue extension profiles for the example.
// Edge integrations (merge checker, change provider, pusher) and the build
// Edge integrations (change provider) and the build
// runner form a shared baseline; each per-queue profile starts from that
// baseline and overrides only the extensions that differ — here the scorer and
// conflict analyzer. Queues without an explicit profile fall back to the
Expand All @@ -817,10 +812,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
if err != nil {
return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err)
}
psh, err := newPusher(logger, scope, resolver)
if err != nil {
return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err)
}

// batchLines buckets a batch by total lines changed across all its changes —
// larger batches are likelier to fail to land.
Expand All @@ -842,7 +833,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
// below does.
base := queueExtensions{
changeProvider: cp,
pusher: psh,
buildRunner: buildfake.New(resolver),
scorer: scorerfake.New(resolver, heuristic.New(
resolver,
Expand Down
2 changes: 2 additions & 0 deletions submitqueue/orchestrator/controller/dlq/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"dlq.go",
"log.go",
"mergeconflictsignal.go",
"mergesignal.go",
"request.go",
],
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq",
Expand All @@ -31,6 +32,7 @@ go_test(
"dlq_test.go",
"log_test.go",
"mergeconflictsignal_test.go",
"mergesignal_test.go",
"request_test.go",
],
embed = [":dlq"],
Expand Down
108 changes: 108 additions & 0 deletions submitqueue/orchestrator/controller/dlq/mergesignal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dlq

import (
"context"
"fmt"

"github.com/uber-go/tally"
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/submitqueue/extension/storage"
"go.uber.org/zap"
)

// mergeSignalController is the DLQ reconciler for the mergesignal topic. Its
// payload carries a runway MergeResult whose id is the batch id echoed back, so
// reconciliation fails that batch directly via failBatch (which also fans out
// to the member requests).
type mergeSignalController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
topicKey consumer.TopicKey
consumerGroup string
}

// Verify mergeSignalController implements consumer.Controller at compile time.
var _ consumer.Controller = (*mergeSignalController)(nil)

// NewDLQMergeSignalController builds a DLQ controller for the mergesignal topic.
func NewDLQMergeSignalController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
topicKey consumer.TopicKey,
consumerGroup string,
) consumer.Controller {
name := string(topicKey) + "_controller"
return &mergeSignalController{
logger: logger.Named(name),
metricsScope: scope.SubScope(name),
store: store,
topicKey: topicKey,
consumerGroup: consumerGroup,
}
}

// Process reconciles a single DLQ delivery for the mergesignal topic.
func (c *mergeSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

result, err := runwaymq.MergeResultFromBytes(msg.Payload)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
return fmt.Errorf("failed to decode merge result from dlq payload: %w", err)
}

dmeta := delivery.Metadata()
c.logger.Warnw("dlq message received",
"batch_id", result.ID,
"attempt", delivery.Attempt(),
"dlq_original_topic", dmeta["dlq.original_topic"],
"dlq_failure_count", dmeta["dlq.failure_count"],
"dlq_last_error", dmeta["dlq.last_error"],
)

if err := failBatch(ctx, c.store, c.logger, result.ID); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1)
return err
}

metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1)
return nil
}

// Name returns the controller name for logging and metrics.
func (c *mergeSignalController) Name() string {
return string(c.topicKey)
}

// TopicKey returns the topic key this controller subscribes to.
func (c *mergeSignalController) TopicKey() consumer.TopicKey {
return c.topicKey
}

// ConsumerGroup returns the consumer group for offset tracking.
func (c *mergeSignalController) ConsumerGroup() string {
return c.consumerGroup
}
Loading