Skip to content

feat(stovepipe): add validate controller forwarding commits to batch#255

Open
mnoah1 wants to merge 5 commits into
mnoah1/stovepipe-start-wiringfrom
mnoah1/stovepipe-validate-controller
Open

feat(stovepipe): add validate controller forwarding commits to batch#255
mnoah1 wants to merge 5 commits into
mnoah1/stovepipe-start-wiringfrom
mnoah1/stovepipe-validate-controller

Conversation

@mnoah1

@mnoah1 mnoah1 commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Set up empty validate controller. This consumes events from the existing start controller, than forwards them to batch if valid. Here, we can implement additional validation logic on a given commit before passing it forward to a batch.

Also wires this into the existing controller

Why?

Building out overall workflow outlined in https://github.com/uber/submitqueue/blob/main/doc/rfc/stovepipe/workflow.md

What?

Validate step in the outline above

Test Plan

  • make local-stovepipe-start
  • Insert a message and confirm flow through the steps:
rchestrator-service-1  | 2026-06-16T22:15:23.665Z	DEBUG	consumer/consumer.go:351	processing delivery	{"controller": "start", "topic_key": "start", "message_id": "manual-1781648123538", "partition_key": "github.com/uber/cadence", "attempt": 1}
orchestrator-service-1  | 2026-06-16T22:15:23.665Z	INFO	start_controller	start/start.go:91	received change event	{"uri": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "attempt": 1, "partition_key": "github.com/uber/cadence"}
orchestrator-service-1  | 2026-06-16T22:15:23.665Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:72	inserting messages	{"topic": "validate", "count": 1, "visible_after": 0}
orchestrator-service-1  | 2026-06-16T22:15:23.669Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:125	inserted messages	{"topic": "validate", "count": 1}
orchestrator-service-1  | 2026-06-16T22:15:23.669Z	DEBUG	queue_mysql.publisher	mysql/publisher.go:65	published message	{"topic": "validate", "message_id": "git://github.com/uber/cadence/refs/heads/main/abc123def456"}
orchestrator-service-1  | 2026-06-16T22:15:23.669Z	INFO	start_controller	start/start.go:106	published commit to validate	{"uri": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "topic_key": "validate"}
orchestrator-service-1  | 2026-06-16T22:15:23.674Z	DEBUG	consumer/consumer.go:479	message processed successfully	{"controller": "start", "topic_key": "start", "message_id": "manual-1781648123538", "partition_key": "github.com/uber/cadence", "attempt": 1, "elapsed_ms": 4}
orchestrator-service-1  | 2026-06-16T22:15:23.757Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:217	fetched messages	{"topic": "start", "partition_key": "github.com/uber/cadence", "count": 1}
orchestrator-service-1  | 2026-06-16T22:15:23.767Z	DEBUG	consumer/consumer.go:351	processing delivery	{"controller": "validate", "topic_key": "validate", "message_id": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "partition_key": "github.com/uber/cadence", "attempt": 1}
orchestrator-service-1  | 2026-06-16T22:15:23.767Z	INFO	validate_controller	validate/validate.go:89	received change event	{"uri": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "attempt": 1, "partition_key": "github.com/uber/cadence"}
orchestrator-service-1  | 2026-06-16T22:15:23.767Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:72	inserting messages	{"topic": "batch", "count": 1, "visible_after": 0}
orchestrator-service-1  | 2026-06-16T22:15:23.770Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:125	inserted messages	{"topic": "batch", "count": 1}
orchestrator-service-1  | 2026-06-16T22:15:23.770Z	DEBUG	queue_mysql.publisher	mysql/publisher.go:65	published message	{"topic": "batch", "message_id": "git://github.com/uber/cadence/refs/heads/main/abc123def456"}
orchestrator-service-1  | 2026-06-16T22:15:23.770Z	INFO	validate_controller	validate/validate.go:104	published commit to batch	{"uri": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "topic_key": "batch"}
orchestrator-service-1  | 2026-06-16T22:15:23.774Z	DEBUG	consumer/consumer.go:479	message processed successfully	{"controller": "validate", "topic_key": "validate", "message_id": "git://github.com/uber/cadence/refs/heads/main/abc123def456", "partition_key": "github.com/uber/cadence", "attempt": 1, "elapsed_ms": 3}

Issue

@mnoah1 mnoah1 marked this pull request as ready for review June 16, 2026 22:20
@mnoah1 mnoah1 requested review from a team, behinddwalls and sbalabanov as code owners June 16, 2026 22:20
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-validate-controller branch from 4ca3b54 to 5d2e6de Compare June 17, 2026 03:03
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-start-wiring branch 2 times, most recently from 07811a3 to fbe881a Compare June 18, 2026 13:28
@behinddwalls behinddwalls force-pushed the mnoah1/stovepipe-start-wiring branch from fbe881a to 57745ce Compare June 18, 2026 15:45
@behinddwalls

Copy link
Copy Markdown
Collaborator

⚠️ Automatic stack rebase failed

This PR could not be automatically rebased after its base PR was merged. The rebase hit conflicts that need manual resolution.

To fix manually:

git fetch origin
git checkout mnoah1/stovepipe-validate-controller
git rebase --onto origin/mnoah1/stovepipe-start-wiring fbe881aa56233c566385b3e9a10ceb177555b5f6 mnoah1/stovepipe-validate-controller
# resolve conflicts, then:
git push --force-with-lease

Then update this PR's base branch:

gh pr edit 255 --base mnoah1/stovepipe-start-wiring

@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-start-wiring branch from 57745ce to aaf851f Compare June 18, 2026 16:23
mnoah1 and others added 2 commits June 18, 2026 16:33
Wires the start controller into the orchestrator sample app.

## Why?
Building out overall structure of
https://github.com/uber/submitqueue/blob/main/doc/rfc/stovepipe/workflow.md

## What?
- Update main.go to include the newly added start controller from branch
below this
- This will create a start controller and begin watching for events on
the topic. Also adding the validate topic, as start will insert these

## Test Plan
- `make local-stovepipe-start`
- `make local-stovepipe-logs`
- Insert a sample message:
```
docker exec -i stovepipe-mysql-queue-1 mysql -uroot -proot submitqueue -e "INSERT INTO queue_messages (topic, partition_key, id, payload, metadata, created_at, published_at, visible_after, failed_at, failure_count, last_error, original_topic) VALUES ('start', 'github.com/uber/cadence', CONCAT('manual-', CAST(UNIX_TIMESTAMP(NOW(3)) * 1000 AS UNSIGNED)), '{\"uri\":\"git://github.com/uber/cadence/refs/heads/main/abc123def456\",\"partition_key\":\"github.com/uber/cadence\"}', NULL, CAST(UNIX_TIMESTAMP(NOW(3)) * 1000 AS UNSIGNED), CAST(UNIX_TIMESTAMP(NOW(3)) * 1000 AS UNSIGNED), 0, 0, 0, '', '');"
mysql: [Warning] Using a password on the command line interface can be insecure.
```
- Confirm via logs that the message is receives and enqueues the
validate message:
```
orchestrator-service-1  | 2026-06-18T13:21:59.727Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:217	fetched messages	{"topic": "start", "partition_key": "stovepipe-monorepo", "count": 1}
orchestrator-service-1  | 2026-06-18T13:21:59.732Z	DEBUG	consumer/consumer.go:351	processing delivery	{"controller": "start", "topic_key": "start", "message_id": "stovepipe-monorepo/1", "partition_key": "stovepipe-monorepo", "attempt": 1}
orchestrator-service-1  | 2026-06-18T13:21:59.732Z	INFO	start_controller	start/start.go:97	received ingest request	{"spid": "stovepipe-monorepo/1", "queue": "stovepipe-monorepo", "change_uris": ["git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01"], "change_count": 1, "attempt": 1, "partition_key": "stovepipe-monorepo"}
orchestrator-service-1  | 2026-06-18T13:21:59.732Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:72	inserting messages	{"topic": "validate", "count": 1, "visible_after": 0}
orchestrator-service-1  | 2026-06-18T13:21:59.740Z	DEBUG	queue_mysql.message_store	mysql/message_store.go:125	inserted messages	{"topic": "validate", "count": 1}
orchestrator-service-1  | 2026-06-18T13:21:59.740Z	DEBUG	queue_mysql.publisher	mysql/publisher.go:65	published message	{"topic": "validate", "message_id": "stovepipe-monorepo/1"}
orchestrator-service-1  | 2026-06-18T13:21:59.740Z	INFO	start_controller	start/start.go:115	published ingest request to validate	{"spid": "stovepipe-monorepo/1", "topic_key": "validate"}
```
- main.go doesn't have tests of its own but we can get some e2e tests in
place for stovepipe separately
## Summary
Adds gateway List API rfc

Main idea here is the storage for list api is colocated / materialized
at the same time request logs are upserted and the list api results
should match up with what the status endpoint returns (both Status and
List data is derived from immutable request logs stream and data is
stored in gateway layer, not orchestrator).

## Test Plan


## Issues


## Stack
1. @ #225
1. #226
@behinddwalls

Copy link
Copy Markdown
Collaborator

⚠️ Automatic stack rebase failed

This PR could not be automatically rebased after its base PR was merged. The rebase hit conflicts that need manual resolution.

To fix manually:

git fetch origin
git checkout mnoah1/stovepipe-validate-controller
git rebase --onto origin/main aaf851f16882bd1876b6f143e9fefb46d56c0949 mnoah1/stovepipe-validate-controller
# resolve conflicts, then:
git push --force-with-lease

Then update this PR's base branch:

gh pr edit 255 --base main

## Summary
### Why?

The message queue topic-binding proto option was named `topics`, which
reads as a concrete wire topic name. It is not — it carries a stable
**logical topic key**. Each implementer maps the key to whatever topic
name its broker/queue requires (subject to that backend's naming
constraints); on our Go side the keys are `consumer.TopicKey` values,
mapped to concrete names through `TopicRegistry`. The name `topics`
invited the wrong mental model.

### What?

Rename the `google.protobuf.MessageOptions` extension `topics` →
`topic_keys` (field number `50001` unchanged, so the wire/extension
layout is identical) and reframe its doc comment to say it carries a
logical key, not a wire name. Regenerated `messagequeue.pb.go`
(`E_Topics` → `E_TopicKeys`).

This is the base of a stack; the runway contract and the contract RFC
that consume the option are updated in the branches stacked on top.

## Test Plan
✅ `make proto` — descriptor field renamed (`name=topics` →
`name=topic_keys`), field number `50001` unchanged
✅ `./tool/bazel build //...`

## Issues


## Stack
1. @ #264
1. #259
1. #260
1. #245
1. #247
## Summary
### Why?

Queue payloads are Go structs serialized with `encoding/json`, so the
wire shape is defined only by Go source. There is no language-neutral
contract a non-Go client can compile against, no explicit
topic-to-payload binding, and no distinction between a domain's private
wiring and a published cross-domain contract.

### What?

Doc-only — the design of record for message queue contracts
(`doc/rfc/messagequeue-contract.md`):

- **Contract language: Protobuf**, serialized as **protobuf JSON**
(`protojson`) so payloads stay self-describing JSON on the wire. The
`.proto` is the authority and the Go binding is generated from it, so it
cannot drift (no hand-authored struct, no drift test to keep them in
sync).
- **Topic binding:** a custom `topics` proto option (defined in
`api/base/messagequeue`) carries the wire topic names on the message
itself, read back by reflection — not on the publish/consume hot path,
which still resolves topics from a `consumer.TopicKey` via the registry.
- **Location by audience:** external contracts (something outside the
owning domain depends on them) live in `api/{domain}/messagequeue/`;
internal ones in `{domain}/core/messagequeue/`, with the split enforced
by Bazel `visibility`.
- Documents the accepted protojson conventions (snake_case field names,
UPPER_SNAKE enums, int64-as-string) and why JSON Schema, binary proto,
and Avro were rejected.

## Test Plan
- ✅ `make lint` (doc-only)

## Issues


## Stack
1. #264
1. @ #259
1. #260
1. #245
1. #247
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-validate-controller branch from 5d2e6de to 862f0b1 Compare June 18, 2026 18:49
Mirror the start controller's shape for the validate pipeline stage: the
validate controller consumes the validate topic, guards the carried
partition key, and forwards the change event to the batch stage. Adds the
batch topic key, wires the controller into the example orchestrator, and
registers the batch topic in the registry.

Currently a forwarding stub; placeholder comments describe the eventual
commit-metadata resolution per the workflow RFC.
@mnoah1 mnoah1 force-pushed the mnoah1/stovepipe-validate-controller branch from 862f0b1 to d152470 Compare June 18, 2026 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants