From 08d8bbbc5c4d0337770567abe328ac8756196773 Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Tue, 9 Jun 2026 20:52:55 -0700 Subject: [PATCH 1/2] refactor(storage): replace batch (queue,state) index with active_batch membership MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the secondary index over the batch table's mutable `state` column with an `active_batch` membership table that answers the only queue-scoped query the pipeline needs: "which batches in this queue are still active?" (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist while its batch is non-terminal, so the table stays bounded by the live speculation window rather than growing with batch history. `queue` leads the PK so listing is a PK-prefix scan and the table is shardable by queue — an access pattern that ports cleanly to a key-value store (queue = partition key, batch_id = sort key), unlike a server-maintained secondary index over a mutable non-key column. Membership is best-effort, not an exact mirror of batch state, and is maintained without transactions: - Create writes the membership row before the batch row. This ordering is required for correctness: whenever a batch row is visible to a reader its membership row is already present, so a concurrent ListActive can never miss an active batch. INSERT IGNORE keeps the membership write idempotent across retries. - If the batch insert then fails, Create deliberately leaves the membership row in place. A returned error does not prove the row was not written (an ambiguous failure can commit the batch row and still return an error), so deleting would risk permanently orphaning a live, non-terminal batch from ListActive. A dangling membership is the safe direction. - ListActive resolves each member by primary key: a terminal batch's membership is best-effort removed (race-free — a terminal batch is fully committed and its id is never reused); a missing batch is skipped but NOT removed (it may belong to an in-flight Create that has written its membership but not yet its batch row). Cleanup failures are swallowed so a read never fails on index maintenance, and terminal-state writers (merge, speculate, dlq) need not touch the index. Genuinely dangling rows (failed/crashed creates) and batches stuck in a non-terminal state are left for a future reconcile/prune job, documented in schema/README.md. Integration tests cover the self-heal and membership invariants: - TestActiveBatch_SelfHealsTerminalMembership - TestActiveBatch_SkipsDanglingMembershipWithoutDeleting - TestActiveBatch_CreateKeepsMembershipOnDuplicate - TestActiveBatch_CreateKeepsMembershipOnFailedInsert Co-Authored-By: Claude Opus 4.8 (1M context) --- submitqueue/extension/storage/batch_store.go | 11 +- .../storage/mock/batch_store_mock.go | 12 +- .../extension/storage/mysql/batch_store.go | 111 ++++++++++++------ .../extension/storage/mysql/schema/README.md | 20 ++-- .../storage/mysql/schema/active_batch.sql | 12 ++ .../extension/storage/mysql/schema/batch.sql | 5 +- .../orchestrator/controller/batch/batch.go | 20 ++-- .../controller/batch/batch_test.go | 23 ++-- .../orchestrator/controller/cancel/cancel.go | 16 +-- .../controller/cancel/cancel_test.go | 12 +- .../extension/storage/mysql/BUILD.bazel | 2 + .../extension/storage/mysql/storage_test.go | 102 ++++++++++++++++ .../submitqueue/extension/storage/suite.go | 96 +++++++++++++++ 13 files changed, 355 insertions(+), 87 deletions(-) create mode 100644 submitqueue/extension/storage/mysql/schema/active_batch.sql diff --git a/submitqueue/extension/storage/batch_store.go b/submitqueue/extension/storage/batch_store.go index 05e94fb8..59a5dceb 100644 --- a/submitqueue/extension/storage/batch_store.go +++ b/submitqueue/extension/storage/batch_store.go @@ -41,6 +41,13 @@ type BatchStore interface { // Version arithmetic is owned by the caller; the store performs a pure conditional write. UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error - // GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. - GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) + // ListActive returns all active (non-terminal) batches in the given queue. + // "Active" means the batch's persisted state is not terminal — see + // entity.BatchState.IsTerminal. Callers that need a narrower set filter the + // result by state in memory. + // + // The store tracks active membership internally (added on Create, self-healed + // on read) so this is a key-prefix read rather than a secondary-index query; + // see the implementation and extension/storage/mysql/schema/README.md. + ListActive(ctx context.Context, queue string) ([]entity.Batch, error) } diff --git a/submitqueue/extension/storage/mock/batch_store_mock.go b/submitqueue/extension/storage/mock/batch_store_mock.go index 48b6bdaf..c126add2 100644 --- a/submitqueue/extension/storage/mock/batch_store_mock.go +++ b/submitqueue/extension/storage/mock/batch_store_mock.go @@ -70,19 +70,19 @@ func (mr *MockBatchStoreMockRecorder) Get(ctx, id any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBatchStore)(nil).Get), ctx, id) } -// GetByQueueAndStates mocks base method. -func (m *MockBatchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) { +// ListActive mocks base method. +func (m *MockBatchStore) ListActive(ctx context.Context, queue string) ([]entity.Batch, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetByQueueAndStates", ctx, queue, states) + ret := m.ctrl.Call(m, "ListActive", ctx, queue) ret0, _ := ret[0].([]entity.Batch) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetByQueueAndStates indicates an expected call of GetByQueueAndStates. -func (mr *MockBatchStoreMockRecorder) GetByQueueAndStates(ctx, queue, states any) *gomock.Call { +// ListActive indicates an expected call of ListActive. +func (mr *MockBatchStoreMockRecorder) ListActive(ctx, queue any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByQueueAndStates", reflect.TypeOf((*MockBatchStore)(nil).GetByQueueAndStates), ctx, queue, states) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListActive", reflect.TypeOf((*MockBatchStore)(nil).ListActive), ctx, queue) } // UpdateScoreAndState mocks base method. diff --git a/submitqueue/extension/storage/mysql/batch_store.go b/submitqueue/extension/storage/mysql/batch_store.go index e6999aac..76c21e94 100644 --- a/submitqueue/extension/storage/mysql/batch_store.go +++ b/submitqueue/extension/storage/mysql/batch_store.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "github.com/go-sql-driver/mysql" "github.com/uber-go/tally" @@ -87,6 +86,19 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err) } + // Write membership before the batch row (no transaction) so a batch row is + // never visible to ListActive without its membership. ON DUPLICATE KEY UPDATE is + // an idempotent no-op on PK conflict (retry-safe) while still surfacing real + // errors, unlike INSERT IGNORE which would swallow them and let Create proceed + // without a valid membership. A create that fails after this point leaves a + // dangling row, which ListActive skips and the reconcile job reclaims. + if _, err = s.db.ExecContext(ctx, + "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE batch_id = batch_id", + batch.Queue, batch.ID, + ); err != nil { + return fmt.Errorf("failed to insert active_batch membership for batch entity id=%s queue=%s: %w", batch.ID, batch.Queue, err) + } + _, err = s.db.ExecContext(ctx, "INSERT INTO batch (id, queue, contains, dependencies, score, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)", batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.Score, batch.State, batch.Version, @@ -96,6 +108,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists) } + // Leave the membership row in place: a returned error doesn't prove the + // batch row was not written (an ambiguous failure can commit it and still + // error), so deleting could permanently hide a live batch from ListActive. + // A dangling row is the safe direction. return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err) } @@ -174,52 +190,81 @@ func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVers return nil } -// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. -func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) { - op := metrics.Begin(s.scope, "get_by_queue_and_states") +// ListActive returns all active (non-terminal) batches in the given queue. +// +// Membership is tracked in active_batch (queue leads the PK), so listing is a +// PK-prefix scan that ports cleanly to a key-value store. Each member is fetched +// by primary key: a terminal batch's membership is best-effort removed (race-free, +// its id is never reused), while a missing batch is skipped but NOT removed (it +// may belong to an in-flight Create that hasn't written its batch row yet). +func (s *batchStore) ListActive(ctx context.Context, queue string) (ret []entity.Batch, retErr error) { + op := metrics.Begin(s.scope, "list_active") defer func() { op.Complete(retErr) }() - if len(states) == 0 { - return nil, nil + // Read all membership rows and release the connection before resolving each + // batch, since Get issues its own query. + ids, err := s.activeBatchIDs(ctx, queue) + if err != nil { + return nil, err } - query := "SELECT id, queue, contains, dependencies, score, state, version FROM batch WHERE queue = ? AND state IN (?" + strings.Repeat(", ?", len(states)-1) + ")" - - args := make([]any, 1+len(states)) - args[0] = queue - for i, state := range states { - args[i+1] = state + var results []entity.Batch + for _, id := range ids { + batch, err := s.Get(ctx, id) + if err != nil { + if storage.IsNotFound(err) { + // Missing batch: either an in-flight Create or a dangling row. We + // can't tell them apart, so skip without deleting. + continue + } + return nil, fmt.Errorf("failed to get active batch id=%q queue=%q: %w", id, queue, err) + } + if batch.State.IsTerminal() { + // Stale membership: the batch has finished. Race-free to remove since + // its id is never reused. + s.removeActive(ctx, queue, id) + continue + } + results = append(results, batch) } - rows, err := s.db.QueryContext(ctx, query, args...) + return results, nil +} + +// activeBatchIDs reads the batch IDs recorded as active for the queue, owning the +// result set's lifecycle so the caller can resolve each batch after it's closed. +func (s *batchStore) activeBatchIDs(ctx context.Context, queue string) ([]string, error) { + rows, err := s.db.QueryContext(ctx, + "SELECT batch_id FROM active_batch WHERE queue = ?", + queue, + ) if err != nil { - return nil, fmt.Errorf("failed to query batches by queue=%q states=%v from the database: %w", queue, states, err) + return nil, fmt.Errorf("failed to query active batch membership for queue=%q: %w", queue, err) } defer rows.Close() - var results []entity.Batch + var ids []string for rows.Next() { - var batch entity.Batch - var containsJSON []byte - var dependenciesJSON []byte - - if err := rows.Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.Score, &batch.State, &batch.Version); err != nil { - return nil, fmt.Errorf("failed to scan batch entity by queue=%q states=%v from the database: %w", queue, states, err) - } - - if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil { - return nil, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", batch.ID, err) - } - - if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil { - return nil, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", batch.ID, err) + var id string + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("failed to scan active batch membership for queue=%q: %w", queue, err) } - - results = append(results, batch) + ids = append(ids, id) } if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to iterate batches by queue=%q states=%v from the database: %w", queue, states, err) + return nil, fmt.Errorf("failed to iterate active batch membership for queue=%q: %w", queue, err) } + return ids, nil +} - return results, nil +// removeActive best-effort deletes a single active_batch membership row, used by +// ListActive to reclaim terminal batches' memberships. Failures are counted and +// ignored — the row is harmless and the next read retries. +func (s *batchStore) removeActive(ctx context.Context, queue, batchID string) { + if _, err := s.db.ExecContext(ctx, + "DELETE FROM active_batch WHERE queue = ? AND batch_id = ?", + queue, batchID, + ); err != nil { + metrics.NamedCounter(s.scope, "list_active", "self_heal_errors", 1) + } } diff --git a/submitqueue/extension/storage/mysql/schema/README.md b/submitqueue/extension/storage/mysql/schema/README.md index 9e8c36bf..e398d186 100644 --- a/submitqueue/extension/storage/mysql/schema/README.md +++ b/submitqueue/extension/storage/mysql/schema/README.md @@ -2,19 +2,23 @@ ## batch table -### Secondary index: `idx_queue_state (queue, state)` +The `batch` table is reachable only by its primary key (`id`). It carries no secondary index — every access pattern is expressed as a primary-key get or as a key-prefix scan over a companion membership table (see `active_batch` below). This keeps the access patterns portable to a key-value / document store, where a server-maintained secondary index over a mutable, non-key column (such as `state`) is not a primitive every backend offers cheaply. -The `batch` table has a composite secondary index on `(queue, state)`. This index supports the `GetByQueueAndStates` query, which retrieves batches filtered by queue and one or more states. Without this index, the query would require a full table scan. +## active_batch table -#### Trade-offs +`active_batch` is the membership index that answers "which batches in this queue are still active?" — the only queue-scoped query the pipeline needs (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist per non-terminal batch, so the table stays bounded by the live speculation window rather than full batch history. The correspondence is best-effort, not exact: readers treat membership as a hint and resolve each batch by primary key — see *Maintenance and self-healing* below. -- **Write overhead**: Every `INSERT` and `UPDATE` to the `batch` table must also update the secondary index, adding latency to write operations. -- **Storage cost**: The index consumes additional disk space proportional to the number of rows in the table. -- **Lock contention**: Under high write concurrency, index maintenance can increase lock contention on the affected index pages. +`queue` leads the composite primary key `(queue, batch_id)`, so listing a queue's active batches is a primary-key-prefix scan and the table is shardable by queue. On a key-value store the same shape maps directly onto a partition key (`queue`) and sort key (`batch_id`) with no secondary index. -#### Future: Prune job +### Maintenance and self-healing -As the `batch` table grows, the secondary index will grow with it, increasing storage costs and degrading write performance. To mitigate this, a prune job should be introduced to periodically delete batches in terminal states (`succeeded`, `failed`, `cancelled`) that are older than a configurable retention period. This keeps the table and its indexes bounded in size, ensuring consistent query and write performance over time. +`BatchStore.Create` writes the membership row before the batch row, so a batch row is never visible to `ListActive` without its membership. If the batch insert then fails, `Create` leaves the membership row in place: a returned error doesn't prove the row wasn't written (an ambiguous failure can commit it and still error), so deleting could permanently hide a live batch. A dangling row is the safe direction. + +On read, `ListActive` resolves each member by primary key. A **terminal** batch's membership is best-effort removed (race-free — its id is never reused). A **missing** batch is skipped but not removed, since it may belong to an in-flight `Create` that hasn't written its batch row yet. Cleanup failures are swallowed, so reads never fail on index maintenance and terminal-state writers (merge, speculate, dlq) never touch the index. Because the two writes are independent (no transaction), the design tolerates partial failure via idempotent retries and read-time reconciliation. + +### Future: prune / reconcile job + +Read-time reconciliation only removes terminal memberships, so two kinds of stale row need a periodic sweep: dangling memberships whose batch never landed (a failed or crashed create), and memberships of batches that are stuck in a non-terminal state (e.g. an orphan stuck in `created` after a mid-process failure). A reconcile job should sweep both, keeping the table bounded independently of read traffic. ## change table diff --git a/submitqueue/extension/storage/mysql/schema/active_batch.sql b/submitqueue/extension/storage/mysql/schema/active_batch.sql new file mode 100644 index 00000000..cc6f80ca --- /dev/null +++ b/submitqueue/extension/storage/mysql/schema/active_batch.sql @@ -0,0 +1,12 @@ +-- active_batch is the membership index for "active (non-terminal) batches in a +-- queue", keeping the set bounded by the live speculation window rather than batch +-- history. queue leads the PK so listing is a PK-prefix scan (shardable by queue; +-- portable to a key-value store with queue = partition key, batch_id = sort key). +-- Membership is best-effort: it is added on Create (before the batch row) and +-- removed on read by ListActive once a batch is terminal. A reconcile job reclaims +-- rows left dangling by a failed or crashed create. See schema/README.md. +CREATE TABLE IF NOT EXISTS active_batch ( + queue VARCHAR(255) NOT NULL, + batch_id VARCHAR(255) NOT NULL, + PRIMARY KEY (queue, batch_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/schema/batch.sql b/submitqueue/extension/storage/mysql/schema/batch.sql index 8e7deda0..079cdecd 100644 --- a/submitqueue/extension/storage/mysql/schema/batch.sql +++ b/submitqueue/extension/storage/mysql/schema/batch.sql @@ -4,8 +4,7 @@ CREATE TABLE IF NOT EXISTS batch ( contains JSON NOT NULL, dependencies JSON NOT NULL, score DOUBLE NOT NULL, - state VARCHAR(255) NOT NUll, + state VARCHAR(255) NOT NULL, version INT NOT NULL, - PRIMARY KEY (id), - INDEX idx_queue_state (queue, state) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index fef116fd..f9b34cf1 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -134,18 +134,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r Version: 1, } - // Get active batches for this queue and ask the conflict analyzer which - // of them the new batch must serialize behind. The dependency set drives - // the speculation graph downstream. - activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ - entity.BatchStateCreated, - entity.BatchStateSpeculating, - entity.BatchStateMerging, - }) + // Ask the conflict analyzer which active batches the new batch must serialize + // behind. ListActive returns all non-terminal batches; we narrow to + // Created/Speculating/Merging so the analyzer only sees batches that can still + // acquire new conflicts. + allActive, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } + activeBatches := make([]entity.Batch, 0, len(allActive)) + for _, b := range allActive { + switch b.State { + case entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging: + activeBatches = append(activeBatches, b) + } + } // Dedupe by batch ID since a single (analyzed, in-flight) pair may be // reported with multiple Conflict entries when different conflict types diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index 792c9773..beb229c8 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -81,7 +81,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchStore.EXPECT().ListActive(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockReqStore := storagemock.NewMockRequestStore(ctrl) @@ -212,14 +212,21 @@ func TestController_Process_WithDependencies(t *testing.T) { Version: 1, } - // Set up storage with active batches to become dependencies. + // Set up storage with active batches to become dependencies. ListActive + // returns every non-terminal batch; the controller filters to + // Created/Speculating/Merging in memory. The Scored and Cancelling batches + // below must be excluded — note no BatchDependent expectations are registered + // for them, so the default all.New() analyzer would fail the test on an + // unexpected mock call if the filter let them through. activeBatches := []entity.Batch{ {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, {ID: "test-queue/batch/2", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 2}, + {ID: "test-queue/batch/3", Queue: "test-queue", State: entity.BatchStateScored, Version: 1}, + {ID: "test-queue/batch/4", Queue: "test-queue", State: entity.BatchStateCancelling, Version: 1}, } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -271,7 +278,7 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -317,7 +324,7 @@ func TestController_Process_AnalyzerFailure(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) @@ -413,7 +420,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -466,7 +473,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -512,7 +519,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { request.Version = 2 // prior attempt bumped from 1 → 2 mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index 96a214ea..2ac3341f 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -184,21 +184,11 @@ func (c *Controller) markCancelling(ctx context.Context, request entity.Request) // findActiveBatch scans all active batches in the request's queue for one whose // Contains list includes the request. Returns (batch, true, nil) on a hit, // (zero, false, nil) when the request is not yet batched, and any storage -// error otherwise. -// -// BatchStateCancelling is included in the active-state list so an idempotent -// redelivery of the cancel message (the prior pass wrote the intent but the -// speculate hand-off publish failed) still resolves the batch and re-attempts -// the publish. +// error otherwise. ListActive includes Cancelling batches, so redelivery of a +// cancel whose speculate hand-off publish failed still resolves and retries. func (c *Controller) findActiveBatch(ctx context.Context, request entity.Request) (entity.Batch, bool, error) { // TODO: Scans all the batches in flight - make it more efficient? - active, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ - entity.BatchStateCreated, - entity.BatchStateScored, - entity.BatchStateSpeculating, - entity.BatchStateMerging, - entity.BatchStateCancelling, - }) + active, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) if err != nil { c.metricsScope.Counter("batch_store_errors").Inc(1) return entity.Batch{}, false, fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index a97079fb..833f97d0 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -144,7 +144,7 @@ func TestProcess_CancelsUnbatchedRequest(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -175,7 +175,7 @@ func TestProcess_AlreadyCancelling_SkipsMarkCancelling(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateCancelled).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -228,7 +228,7 @@ func TestProcess_UnbatchedVersionMismatch_Retryable(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return(nil, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() @@ -276,7 +276,7 @@ func TestProcess_BatchPath_HandsOffToSpeculate(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) // Single batch CAS: intent only. No terminal CAS. batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(3), int32(4), entity.BatchStateCancelling).Return(nil) @@ -325,7 +325,7 @@ func TestProcess_BatchAlreadyCancelling_RepublishesToSpeculate(t *testing.T) { // No request UpdateState — already in Cancelling. batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) // No batch UpdateState — already in Cancelling. store := storagemock.NewMockStorage(ctrl) @@ -356,7 +356,7 @@ func TestProcess_BatchIntentVersionMismatch_Retryable(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "q", gomock.Any()).Return([]entity.Batch{batch}, nil) + batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelling). Return(storage.ErrVersionMismatch) diff --git a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel index fd6270de..7733d1f9 100644 --- a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel @@ -12,6 +12,8 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", + "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", "//test/integration/submitqueue/extension/storage", "//test/testutil", diff --git a/test/integration/submitqueue/extension/storage/mysql/storage_test.go b/test/integration/submitqueue/extension/storage/mysql/storage_test.go index 03e3ee4a..69ff0dcd 100644 --- a/test/integration/submitqueue/extension/storage/mysql/storage_test.go +++ b/test/integration/submitqueue/extension/storage/mysql/storage_test.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" storagesuite "github.com/uber/submitqueue/test/integration/submitqueue/extension/storage" "github.com/uber/submitqueue/test/testutil" @@ -96,3 +98,103 @@ func (s *MySQLStorageIntegrationSuite) TearDownSuite() { s.log.Logf("Tearing down MySQL Storage integration test suite") // Cleanup handled automatically by testutil.ComposeStack } + +// countActiveBatchRows returns the number of active_batch membership rows for the +// given (queue, batch_id) — index state internal to the MySQL impl, not visible +// through the storage.Storage contract. +func (s *MySQLStorageIntegrationSuite) countActiveBatchRows(queue, batchID string) int { + t := s.T() + var n int + err := s.db.QueryRowContext(context.Background(), + "SELECT COUNT(*) FROM active_batch WHERE queue = ? AND batch_id = ?", + queue, batchID, + ).Scan(&n) + require.NoError(t, err) + return n +} + +// TestActiveBatch_SelfHealsTerminalMembership verifies that ListActive deletes the +// membership row of a batch that has reached a terminal state. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SelfHealsTerminalMembership() { + t := s.T() + ctx := context.Background() + const queue = "bq-selfheal-terminal" + const id = queue + "/batch/1" + + store := s.GetStorage().GetBatchStore() + require.NoError(t, store.Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1})) + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create should record active membership") + + require.NoError(t, store.UpdateState(ctx, id, 1, 2, entity.BatchStateSucceeded)) + + active, err := store.ListActive(ctx, queue) + require.NoError(t, err) + require.Empty(t, active, "terminal batch should not be listed as active") + require.Equal(t, 0, s.countActiveBatchRows(queue, id), "ListActive should self-heal the stale membership row") +} + +// TestActiveBatch_SkipsDanglingMembershipWithoutDeleting verifies that ListActive +// skips a membership row whose batch does not exist but does NOT delete it: it may +// belong to an in-flight Create that hasn't written its batch row yet. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SkipsDanglingMembershipWithoutDeleting() { + t := s.T() + ctx := context.Background() + const queue = "bq-dangling-skip" + const id = queue + "/batch/ghost" + + _, err := s.db.ExecContext(ctx, "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?)", queue, id) + require.NoError(t, err) + + active, err := s.GetStorage().GetBatchStore().ListActive(ctx, queue) + require.NoError(t, err) + require.Empty(t, active, "dangling membership should not surface a batch") + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "ListActive must NOT delete a missing-batch membership (it may belong to an in-flight create)") +} + +// TestActiveBatch_CreateKeepsMembershipOnDuplicate verifies that a duplicate Create +// (ErrAlreadyExists) does NOT delete the membership row, which belongs to the live +// existing batch. +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnDuplicate() { + t := s.T() + ctx := context.Background() + const queue = "bq-create-dup" + const id = queue + "/batch/1" + store := s.GetStorage().GetBatchStore() + + b := entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1} + require.NoError(t, store.Create(ctx, b)) + require.Equal(t, 1, s.countActiveBatchRows(queue, id)) + + require.ErrorIs(t, store.Create(ctx, b), storage.ErrAlreadyExists) + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "duplicate Create must NOT delete the existing batch's membership") + + active, err := store.ListActive(ctx, queue) + require.NoError(t, err) + require.Len(t, active, 1, "the original batch must remain active") +} + +// TestActiveBatch_CreateKeepsMembershipOnFailedInsert verifies that a non-duplicate +// batch-insert failure leaves the membership row in place rather than deleting it +// (the batch row may have committed despite the error). The failure is induced by +// dropping the batch table (restored afterwards). +func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnFailedInsert() { + t := s.T() + ctx := context.Background() + const queue = "bq-create-fail" + const id = queue + "/batch/1" + + var tbl, ddl string + require.NoError(t, s.db.QueryRowContext(ctx, "SHOW CREATE TABLE batch").Scan(&tbl, &ddl)) + _, err := s.db.ExecContext(ctx, "DROP TABLE batch") + require.NoError(t, err) + defer func() { + _, derr := s.db.ExecContext(context.Background(), ddl) + require.NoError(t, derr, "must restore the batch table for subsequent tests") + }() + + err = s.GetStorage().GetBatchStore().Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1}) + require.Error(t, err, "Create should fail when the batch table is missing") + require.NotErrorIs(t, err, storage.ErrAlreadyExists, "the failure must be a non-duplicate error") + + require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create must NOT delete the membership on a failed insert: the batch row may have committed despite the error") +} diff --git a/test/integration/submitqueue/extension/storage/suite.go b/test/integration/submitqueue/extension/storage/suite.go index 064800c9..d2bffc75 100644 --- a/test/integration/submitqueue/extension/storage/suite.go +++ b/test/integration/submitqueue/extension/storage/suite.go @@ -48,6 +48,12 @@ func (s *StorageContractSuite) SetStorage(store storage.Storage) { s.storage = store } +// GetStorage returns the storage instance under test, for implementation-specific +// suites that need to assert backend-internal behavior alongside the contract tests. +func (s *StorageContractSuite) GetStorage() storage.Storage { + return s.storage +} + // SetLogger sets the logger for tests func (s *StorageContractSuite) SetLogger(log *testutil.TestLogger) { s.log = log @@ -381,3 +387,93 @@ func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyDetails() { require.Len(t, got, 1) assert.Equal(t, entity.ChangeDetails{}, got[0].Details) } + +// newBatch builds a batch fixture for the active-listing tests. +func newBatch(id, queue string, state entity.BatchState) entity.Batch { + return entity.Batch{ + ID: id, + Queue: queue, + Contains: []string{id + "/req"}, + Dependencies: []string{}, + State: state, + Version: 1, + } +} + +// activeBatchIDs returns the sorted IDs of the queue's active batches, for stable comparison. +func (s *StorageContractSuite) activeBatchIDs(queue string) []string { + t := s.T() + batches, err := s.storage.GetBatchStore().ListActive(s.ctx, queue) + require.NoError(t, err) + ids := make([]string, 0, len(batches)) + for _, b := range batches { + ids = append(ids, b.ID) + } + sort.Strings(ids) + return ids +} + +// TestStorage_BatchListActive_ReturnsActive verifies a freshly created batch is +// listed as active, and that ListActive resolves the full entity. +func (s *StorageContractSuite) TestStorage_BatchListActive_ReturnsActive() { + t := s.T() + ctx := s.ctx + const queue = "bq-active" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateSpeculating))) + + batches, err := s.storage.GetBatchStore().ListActive(ctx, queue) + require.NoError(t, err) + require.Len(t, batches, 2) + assert.Equal(t, []string{queue + "/batch/1", queue + "/batch/2"}, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_ExcludesTerminal verifies that a batch transitioned +// to a terminal state via UpdateState drops out of the active listing. +func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminal() { + t := s.T() + ctx := s.ctx + const queue = "bq-terminal" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateMerging))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateCreated))) + + // batch/1 lands; it must no longer be active. + require.NoError(t, s.storage.GetBatchStore().UpdateState(ctx, queue+"/batch/1", 1, 2, entity.BatchStateSucceeded)) + + assert.Equal(t, []string{queue + "/batch/2"}, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState covers the other +// terminal write path (UpdateScoreAndState) used by the score/speculate pipeline. +func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState() { + t := s.T() + ctx := s.ctx + const queue = "bq-terminal-score" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().UpdateScoreAndState(ctx, queue+"/batch/1", 1, 2, 0.5, entity.BatchStateFailed)) + + assert.Empty(t, s.activeBatchIDs(queue)) +} + +// TestStorage_BatchListActive_QueueScoped verifies the listing is scoped to one queue. +func (s *StorageContractSuite) TestStorage_BatchListActive_QueueScoped() { + t := s.T() + ctx := s.ctx + const queueA = "bq-scoped-a" + const queueB = "bq-scoped-b" + + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueA+"/batch/1", queueA, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/1", queueB, entity.BatchStateCreated))) + require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/2", queueB, entity.BatchStateCreated))) + + assert.Equal(t, []string{queueA + "/batch/1"}, s.activeBatchIDs(queueA)) + assert.Equal(t, []string{queueB + "/batch/1", queueB + "/batch/2"}, s.activeBatchIDs(queueB)) +} + +// TestStorage_BatchListActive_UnknownQueue returns an empty set for a queue with no batches. +func (s *StorageContractSuite) TestStorage_BatchListActive_UnknownQueue() { + assert.Empty(s.T(), s.activeBatchIDs("bq-does-not-exist")) +} From 4a6d90537d269c52cfdb83897adff7464785a8dd Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Tue, 16 Jun 2026 22:52:35 +0200 Subject: [PATCH 2/2] refactor(storage): move batch state membership to app layer --- submitqueue/extension/storage/BUILD.bazel | 1 + .../storage/batch_state_membership_store.go | 41 +++++ submitqueue/extension/storage/batch_store.go | 10 -- .../extension/storage/mock/BUILD.bazel | 1 + .../mock/batch_state_membership_store_mock.go | 85 +++++++++++ .../storage/mock/batch_store_mock.go | 15 -- .../extension/storage/mock/storage_mock.go | 14 ++ .../extension/storage/mysql/BUILD.bazel | 1 + .../mysql/batch_state_membership_store.go | 87 +++++++++++ .../extension/storage/mysql/batch_store.go | 96 ------------ .../extension/storage/mysql/schema/README.md | 17 +-- .../storage/mysql/schema/active_batch.sql | 12 -- .../mysql/schema/batch_state_membership.sql | 9 ++ .../extension/storage/mysql/storage.go | 39 +++-- submitqueue/extension/storage/storage.go | 3 + .../orchestrator/controller/batch/BUILD.bazel | 1 + .../orchestrator/controller/batch/batch.go | 17 +-- .../controller/batch/batch_test.go | 72 +++++++-- .../controller/batchstate/BUILD.bazel | 12 ++ .../controller/batchstate/batchstate.go | 144 ++++++++++++++++++ .../controller/cancel/BUILD.bazel | 1 + .../orchestrator/controller/cancel/cancel.go | 9 +- .../controller/cancel/cancel_test.go | 55 ++++++- .../orchestrator/controller/dlq/BUILD.bazel | 1 + .../orchestrator/controller/dlq/batch_test.go | 1 + .../controller/dlq/buildsignal_test.go | 1 + .../orchestrator/controller/dlq/dlq.go | 3 +- .../orchestrator/controller/dlq/dlq_test.go | 9 ++ .../orchestrator/controller/merge/BUILD.bazel | 1 + .../orchestrator/controller/merge/merge.go | 3 +- .../controller/merge/merge_test.go | 11 ++ .../orchestrator/controller/score/BUILD.bazel | 1 + .../orchestrator/controller/score/score.go | 4 +- .../controller/score/score_test.go | 11 ++ .../controller/speculate/BUILD.bazel | 1 + .../controller/speculate/speculate.go | 9 +- .../controller/speculate/speculate_test.go | 21 +++ .../extension/storage/mysql/BUILD.bazel | 2 - .../extension/storage/mysql/storage_test.go | 102 ------------- .../submitqueue/extension/storage/suite.go | 113 +++++++------- 40 files changed, 675 insertions(+), 361 deletions(-) create mode 100644 submitqueue/extension/storage/batch_state_membership_store.go create mode 100644 submitqueue/extension/storage/mock/batch_state_membership_store_mock.go create mode 100644 submitqueue/extension/storage/mysql/batch_state_membership_store.go delete mode 100644 submitqueue/extension/storage/mysql/schema/active_batch.sql create mode 100644 submitqueue/extension/storage/mysql/schema/batch_state_membership.sql create mode 100644 submitqueue/orchestrator/controller/batchstate/BUILD.bazel create mode 100644 submitqueue/orchestrator/controller/batchstate/batchstate.go diff --git a/submitqueue/extension/storage/BUILD.bazel b/submitqueue/extension/storage/BUILD.bazel index 867eef0f..25ef54ca 100644 --- a/submitqueue/extension/storage/BUILD.bazel +++ b/submitqueue/extension/storage/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "storage", srcs = [ "batch_dependent_store.go", + "batch_state_membership_store.go", "batch_store.go", "build_store.go", "change_store.go", diff --git a/submitqueue/extension/storage/batch_state_membership_store.go b/submitqueue/extension/storage/batch_state_membership_store.go new file mode 100644 index 00000000..80bd2514 --- /dev/null +++ b/submitqueue/extension/storage/batch_state_membership_store.go @@ -0,0 +1,41 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=batch_state_membership_store.go -destination=mock/batch_state_membership_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/submitqueue/entity" +) + +// BatchStateMembershipStore records the app-maintained lookup from +// (queue, batch state) to batch IDs. The batch row remains authoritative: +// callers must resolve IDs through BatchStore and filter on the current +// persisted state. +type BatchStateMembershipStore interface { + // Add records that batchID belongs to (queue, state). Repeating the same + // Add is idempotent. + Add(ctx context.Context, queue string, state entity.BatchState, batchID string) error + + // Remove deletes a single membership row. Removing a missing row is + // idempotent and succeeds. + Remove(ctx context.Context, queue string, state entity.BatchState, batchID string) error + + // ListIDs returns every batch ID recorded for (queue, state). An empty slice + // means no membership rows exist for that key. + ListIDs(ctx context.Context, queue string, state entity.BatchState) ([]string, error) +} diff --git a/submitqueue/extension/storage/batch_store.go b/submitqueue/extension/storage/batch_store.go index 59a5dceb..f3c0a6fb 100644 --- a/submitqueue/extension/storage/batch_store.go +++ b/submitqueue/extension/storage/batch_store.go @@ -40,14 +40,4 @@ type BatchStore interface { // if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. // Version arithmetic is owned by the caller; the store performs a pure conditional write. UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error - - // ListActive returns all active (non-terminal) batches in the given queue. - // "Active" means the batch's persisted state is not terminal — see - // entity.BatchState.IsTerminal. Callers that need a narrower set filter the - // result by state in memory. - // - // The store tracks active membership internally (added on Create, self-healed - // on read) so this is a key-prefix read rather than a secondary-index query; - // see the implementation and extension/storage/mysql/schema/README.md. - ListActive(ctx context.Context, queue string) ([]entity.Batch, error) } diff --git a/submitqueue/extension/storage/mock/BUILD.bazel b/submitqueue/extension/storage/mock/BUILD.bazel index 55c5d808..2c80434a 100644 --- a/submitqueue/extension/storage/mock/BUILD.bazel +++ b/submitqueue/extension/storage/mock/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "mock", srcs = [ "batch_dependent_store_mock.go", + "batch_state_membership_store_mock.go", "batch_store_mock.go", "build_store_mock.go", "change_store_mock.go", diff --git a/submitqueue/extension/storage/mock/batch_state_membership_store_mock.go b/submitqueue/extension/storage/mock/batch_state_membership_store_mock.go new file mode 100644 index 00000000..1501ea4e --- /dev/null +++ b/submitqueue/extension/storage/mock/batch_state_membership_store_mock.go @@ -0,0 +1,85 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: batch_state_membership_store.go +// +// Generated by this command: +// +// mockgen -source=batch_state_membership_store.go -destination=mock/batch_state_membership_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/submitqueue/entity" + gomock "go.uber.org/mock/gomock" +) + +// MockBatchStateMembershipStore is a mock of BatchStateMembershipStore interface. +type MockBatchStateMembershipStore struct { + ctrl *gomock.Controller + recorder *MockBatchStateMembershipStoreMockRecorder + isgomock struct{} +} + +// MockBatchStateMembershipStoreMockRecorder is the mock recorder for MockBatchStateMembershipStore. +type MockBatchStateMembershipStoreMockRecorder struct { + mock *MockBatchStateMembershipStore +} + +// NewMockBatchStateMembershipStore creates a new mock instance. +func NewMockBatchStateMembershipStore(ctrl *gomock.Controller) *MockBatchStateMembershipStore { + mock := &MockBatchStateMembershipStore{ctrl: ctrl} + mock.recorder = &MockBatchStateMembershipStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchStateMembershipStore) EXPECT() *MockBatchStateMembershipStoreMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockBatchStateMembershipStore) Add(ctx context.Context, queue string, state entity.BatchState, batchID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", ctx, queue, state, batchID) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockBatchStateMembershipStoreMockRecorder) Add(ctx, queue, state, batchID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockBatchStateMembershipStore)(nil).Add), ctx, queue, state, batchID) +} + +// ListIDs mocks base method. +func (m *MockBatchStateMembershipStore) ListIDs(ctx context.Context, queue string, state entity.BatchState) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListIDs", ctx, queue, state) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListIDs indicates an expected call of ListIDs. +func (mr *MockBatchStateMembershipStoreMockRecorder) ListIDs(ctx, queue, state any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIDs", reflect.TypeOf((*MockBatchStateMembershipStore)(nil).ListIDs), ctx, queue, state) +} + +// Remove mocks base method. +func (m *MockBatchStateMembershipStore) Remove(ctx context.Context, queue string, state entity.BatchState, batchID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Remove", ctx, queue, state, batchID) + ret0, _ := ret[0].(error) + return ret0 +} + +// Remove indicates an expected call of Remove. +func (mr *MockBatchStateMembershipStoreMockRecorder) Remove(ctx, queue, state, batchID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockBatchStateMembershipStore)(nil).Remove), ctx, queue, state, batchID) +} diff --git a/submitqueue/extension/storage/mock/batch_store_mock.go b/submitqueue/extension/storage/mock/batch_store_mock.go index c126add2..429a6d2a 100644 --- a/submitqueue/extension/storage/mock/batch_store_mock.go +++ b/submitqueue/extension/storage/mock/batch_store_mock.go @@ -70,21 +70,6 @@ func (mr *MockBatchStoreMockRecorder) Get(ctx, id any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBatchStore)(nil).Get), ctx, id) } -// ListActive mocks base method. -func (m *MockBatchStore) ListActive(ctx context.Context, queue string) ([]entity.Batch, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListActive", ctx, queue) - ret0, _ := ret[0].([]entity.Batch) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListActive indicates an expected call of ListActive. -func (mr *MockBatchStoreMockRecorder) ListActive(ctx, queue any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListActive", reflect.TypeOf((*MockBatchStore)(nil).ListActive), ctx, queue) -} - // UpdateScoreAndState mocks base method. func (m *MockBatchStore) UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error { m.ctrl.T.Helper() diff --git a/submitqueue/extension/storage/mock/storage_mock.go b/submitqueue/extension/storage/mock/storage_mock.go index 4133bc2a..b19e355e 100644 --- a/submitqueue/extension/storage/mock/storage_mock.go +++ b/submitqueue/extension/storage/mock/storage_mock.go @@ -68,6 +68,20 @@ func (mr *MockStorageMockRecorder) GetBatchDependentStore() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatchDependentStore", reflect.TypeOf((*MockStorage)(nil).GetBatchDependentStore)) } +// GetBatchStateMembershipStore mocks base method. +func (m *MockStorage) GetBatchStateMembershipStore() storage.BatchStateMembershipStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBatchStateMembershipStore") + ret0, _ := ret[0].(storage.BatchStateMembershipStore) + return ret0 +} + +// GetBatchStateMembershipStore indicates an expected call of GetBatchStateMembershipStore. +func (mr *MockStorageMockRecorder) GetBatchStateMembershipStore() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatchStateMembershipStore", reflect.TypeOf((*MockStorage)(nil).GetBatchStateMembershipStore)) +} + // GetBatchStore mocks base method. func (m *MockStorage) GetBatchStore() storage.BatchStore { m.ctrl.T.Helper() diff --git a/submitqueue/extension/storage/mysql/BUILD.bazel b/submitqueue/extension/storage/mysql/BUILD.bazel index 25fefdc5..297f1c95 100644 --- a/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/submitqueue/extension/storage/mysql/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "mysql", srcs = [ "batch_dependent_store.go", + "batch_state_membership_store.go", "batch_store.go", "build_store.go", "change_store.go", diff --git a/submitqueue/extension/storage/mysql/batch_state_membership_store.go b/submitqueue/extension/storage/mysql/batch_state_membership_store.go new file mode 100644 index 00000000..15f8e78c --- /dev/null +++ b/submitqueue/extension/storage/mysql/batch_state_membership_store.go @@ -0,0 +1,87 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "fmt" + + "github.com/uber-go/tally" + + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" +) + +type batchStateMembershipStore struct { + db *sql.DB + scope tally.Scope +} + +// NewBatchStateMembershipStore creates a new MySQL-backed BatchStateMembershipStore. +func NewBatchStateMembershipStore(db *sql.DB, scope tally.Scope) storage.BatchStateMembershipStore { + return &batchStateMembershipStore{db: db, scope: scope} +} + +// Add records a batch's state membership. Duplicate membership is a retry-safe no-op. +func (s *batchStateMembershipStore) Add(ctx context.Context, queue string, state entity.BatchState, batchID string) (retErr error) { + op := metrics.Begin(s.scope, "add") + defer func() { op.Complete(retErr) }() + + const query = "INSERT INTO batch_state_membership (queue, state, batch_id) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE batch_id = batch_id" + if _, err := s.db.ExecContext(ctx, query, queue, state, batchID); err != nil { + return fmt.Errorf("failed to add batch state membership queue=%q state=%q batch_id=%q: %w", queue, state, batchID, err) + } + return nil +} + +// Remove deletes a batch's state membership. Missing rows are treated as already removed. +func (s *batchStateMembershipStore) Remove(ctx context.Context, queue string, state entity.BatchState, batchID string) (retErr error) { + op := metrics.Begin(s.scope, "remove") + defer func() { op.Complete(retErr) }() + + const query = "DELETE FROM batch_state_membership WHERE queue = ? AND state = ? AND batch_id = ?" + if _, err := s.db.ExecContext(ctx, query, queue, state, batchID); err != nil { + return fmt.Errorf("failed to remove batch state membership queue=%q state=%q batch_id=%q: %w", queue, state, batchID, err) + } + return nil +} + +// ListIDs returns batch IDs recorded for a queue and state. +func (s *batchStateMembershipStore) ListIDs(ctx context.Context, queue string, state entity.BatchState) (ret []string, retErr error) { + op := metrics.Begin(s.scope, "list_ids") + defer func() { op.Complete(retErr) }() + + const query = "SELECT batch_id FROM batch_state_membership WHERE queue = ? AND state = ?" + rows, err := s.db.QueryContext(ctx, query, queue, state) + if err != nil { + return nil, fmt.Errorf("failed to list batch state memberships queue=%q state=%q: %w", queue, state, err) + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("failed to scan batch state membership queue=%q state=%q: %w", queue, state, err) + } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate batch state memberships queue=%q state=%q: %w", queue, state, err) + } + return ids, nil +} diff --git a/submitqueue/extension/storage/mysql/batch_store.go b/submitqueue/extension/storage/mysql/batch_store.go index 76c21e94..a7eef395 100644 --- a/submitqueue/extension/storage/mysql/batch_store.go +++ b/submitqueue/extension/storage/mysql/batch_store.go @@ -86,19 +86,6 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err) } - // Write membership before the batch row (no transaction) so a batch row is - // never visible to ListActive without its membership. ON DUPLICATE KEY UPDATE is - // an idempotent no-op on PK conflict (retry-safe) while still surfacing real - // errors, unlike INSERT IGNORE which would swallow them and let Create proceed - // without a valid membership. A create that fails after this point leaves a - // dangling row, which ListActive skips and the reconcile job reclaims. - if _, err = s.db.ExecContext(ctx, - "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE batch_id = batch_id", - batch.Queue, batch.ID, - ); err != nil { - return fmt.Errorf("failed to insert active_batch membership for batch entity id=%s queue=%s: %w", batch.ID, batch.Queue, err) - } - _, err = s.db.ExecContext(ctx, "INSERT INTO batch (id, queue, contains, dependencies, score, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)", batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.Score, batch.State, batch.Version, @@ -108,10 +95,6 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists) } - // Leave the membership row in place: a returned error doesn't prove the - // batch row was not written (an ambiguous failure can commit it and still - // error), so deleting could permanently hide a live batch from ListActive. - // A dangling row is the safe direction. return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err) } @@ -189,82 +172,3 @@ func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVers return nil } - -// ListActive returns all active (non-terminal) batches in the given queue. -// -// Membership is tracked in active_batch (queue leads the PK), so listing is a -// PK-prefix scan that ports cleanly to a key-value store. Each member is fetched -// by primary key: a terminal batch's membership is best-effort removed (race-free, -// its id is never reused), while a missing batch is skipped but NOT removed (it -// may belong to an in-flight Create that hasn't written its batch row yet). -func (s *batchStore) ListActive(ctx context.Context, queue string) (ret []entity.Batch, retErr error) { - op := metrics.Begin(s.scope, "list_active") - defer func() { op.Complete(retErr) }() - - // Read all membership rows and release the connection before resolving each - // batch, since Get issues its own query. - ids, err := s.activeBatchIDs(ctx, queue) - if err != nil { - return nil, err - } - - var results []entity.Batch - for _, id := range ids { - batch, err := s.Get(ctx, id) - if err != nil { - if storage.IsNotFound(err) { - // Missing batch: either an in-flight Create or a dangling row. We - // can't tell them apart, so skip without deleting. - continue - } - return nil, fmt.Errorf("failed to get active batch id=%q queue=%q: %w", id, queue, err) - } - if batch.State.IsTerminal() { - // Stale membership: the batch has finished. Race-free to remove since - // its id is never reused. - s.removeActive(ctx, queue, id) - continue - } - results = append(results, batch) - } - - return results, nil -} - -// activeBatchIDs reads the batch IDs recorded as active for the queue, owning the -// result set's lifecycle so the caller can resolve each batch after it's closed. -func (s *batchStore) activeBatchIDs(ctx context.Context, queue string) ([]string, error) { - rows, err := s.db.QueryContext(ctx, - "SELECT batch_id FROM active_batch WHERE queue = ?", - queue, - ) - if err != nil { - return nil, fmt.Errorf("failed to query active batch membership for queue=%q: %w", queue, err) - } - defer rows.Close() - - var ids []string - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return nil, fmt.Errorf("failed to scan active batch membership for queue=%q: %w", queue, err) - } - ids = append(ids, id) - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to iterate active batch membership for queue=%q: %w", queue, err) - } - return ids, nil -} - -// removeActive best-effort deletes a single active_batch membership row, used by -// ListActive to reclaim terminal batches' memberships. Failures are counted and -// ignored — the row is harmless and the next read retries. -func (s *batchStore) removeActive(ctx context.Context, queue, batchID string) { - if _, err := s.db.ExecContext(ctx, - "DELETE FROM active_batch WHERE queue = ? AND batch_id = ?", - queue, batchID, - ); err != nil { - metrics.NamedCounter(s.scope, "list_active", "self_heal_errors", 1) - } -} diff --git a/submitqueue/extension/storage/mysql/schema/README.md b/submitqueue/extension/storage/mysql/schema/README.md index e398d186..fdcb5606 100644 --- a/submitqueue/extension/storage/mysql/schema/README.md +++ b/submitqueue/extension/storage/mysql/schema/README.md @@ -2,27 +2,26 @@ ## batch table -The `batch` table is reachable only by its primary key (`id`). It carries no secondary index — every access pattern is expressed as a primary-key get or as a key-prefix scan over a companion membership table (see `active_batch` below). This keeps the access patterns portable to a key-value / document store, where a server-maintained secondary index over a mutable, non-key column (such as `state`) is not a primitive every backend offers cheaply. +The `batch` table is reachable only by its primary key (`id`). It carries no secondary index over mutable columns. Queue/state access patterns are expressed through an app-maintained companion table (see `batch_state_membership` below), and callers resolve batch IDs back through the authoritative `batch` row before making state decisions. -## active_batch table +## batch_state_membership table -`active_batch` is the membership index that answers "which batches in this queue are still active?" — the only queue-scoped query the pipeline needs (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist per non-terminal batch, so the table stays bounded by the live speculation window rather than full batch history. The correspondence is best-effort, not exact: readers treat membership as a hint and resolve each batch by primary key — see *Maintenance and self-healing* below. +`batch_state_membership` is the app-maintained lookup that answers "which batch IDs are recorded for this queue and state?" The table's primary key is `(queue, state, batch_id)`, so reads by queue/state are primary-key-prefix scans. The same shape ports to a key-value/document store that supports ranged scans over a partition/sort key, and it avoids a server-maintained secondary index over mutable `batch.state`. -`queue` leads the composite primary key `(queue, batch_id)`, so listing a queue's active batches is a primary-key-prefix scan and the table is shardable by queue. On a key-value store the same shape maps directly onto a partition key (`queue`) and sort key (`batch_id`) with no secondary index. +The table is not authoritative. The orchestrator resolves every listed `batch_id` with `BatchStore.Get` and filters by the current persisted `Batch.State`. This keeps storage generic: storage owns primitive membership records, while the orchestrator owns app concepts such as "active", "terminal", and "eligible for conflict analysis". -### Maintenance and self-healing +### Maintenance -`BatchStore.Create` writes the membership row before the batch row, so a batch row is never visible to `ListActive` without its membership. If the batch insert then fails, `Create` leaves the membership row in place: a returned error doesn't prove the row wasn't written (an ambiguous failure can commit it and still error), so deleting could permanently hide a live batch. A dangling row is the safe direction. +The orchestrator writes the target non-terminal membership row before creating a batch or before CASing a batch into a new non-terminal state. After a successful CAS, it best-effort removes the previous non-terminal membership row. Terminal transitions do not add a target membership row; after the CAS succeeds, the previous non-terminal membership row is best-effort removed. -On read, `ListActive` resolves each member by primary key. A **terminal** batch's membership is best-effort removed (race-free — its id is never reused). A **missing** batch is skipped but not removed, since it may belong to an in-flight `Create` that hasn't written its batch row yet. Cleanup failures are swallowed, so reads never fail on index maintenance and terminal-state writers (merge, speculate, dlq) never touch the index. Because the two writes are independent (no transaction), the design tolerates partial failure via idempotent retries and read-time reconciliation. +Because membership writes and batch writes are independent, stale rows are expected in failure windows. Readers skip missing batch rows and filter stale state rows against the authoritative batch row. A terminal stale row may be removed on read because batch IDs are never reused. ### Future: prune / reconcile job -Read-time reconciliation only removes terminal memberships, so two kinds of stale row need a periodic sweep: dangling memberships whose batch never landed (a failed or crashed create), and memberships of batches that are stuck in a non-terminal state (e.g. an orphan stuck in `created` after a mid-process failure). A reconcile job should sweep both, keeping the table bounded independently of read traffic. +A reconcile job can periodically sweep dangling rows whose batch never landed and stale rows whose authoritative batch state no longer matches the membership state. This keeps the table bounded independently of read traffic. ## change table ### Composite primary key: `(queue, uri, request_id)` The `change` table records per-URI claims by in-flight requests. `request_id` is part of the primary key so that concurrent claims on the same URI by different requests coexist as distinct rows — a same-request retry collides on the PK and is a no-op (`INSERT IGNORE`), while a different-request claim is a new row that `GetByURI` surfaces for overlap detection. `queue` leads the key so queue-scoped lookups are primary-key-prefix scans and the table is shardable by queue. - diff --git a/submitqueue/extension/storage/mysql/schema/active_batch.sql b/submitqueue/extension/storage/mysql/schema/active_batch.sql deleted file mode 100644 index cc6f80ca..00000000 --- a/submitqueue/extension/storage/mysql/schema/active_batch.sql +++ /dev/null @@ -1,12 +0,0 @@ --- active_batch is the membership index for "active (non-terminal) batches in a --- queue", keeping the set bounded by the live speculation window rather than batch --- history. queue leads the PK so listing is a PK-prefix scan (shardable by queue; --- portable to a key-value store with queue = partition key, batch_id = sort key). --- Membership is best-effort: it is added on Create (before the batch row) and --- removed on read by ListActive once a batch is terminal. A reconcile job reclaims --- rows left dangling by a failed or crashed create. See schema/README.md. -CREATE TABLE IF NOT EXISTS active_batch ( - queue VARCHAR(255) NOT NULL, - batch_id VARCHAR(255) NOT NULL, - PRIMARY KEY (queue, batch_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/schema/batch_state_membership.sql b/submitqueue/extension/storage/mysql/schema/batch_state_membership.sql new file mode 100644 index 00000000..f6b35f14 --- /dev/null +++ b/submitqueue/extension/storage/mysql/schema/batch_state_membership.sql @@ -0,0 +1,9 @@ +-- batch_state_membership is the app-maintained lookup for +-- queue,state -> batch_ids. Batch rows are authoritative: callers resolve IDs +-- through the batch table and filter by the current persisted state. +CREATE TABLE IF NOT EXISTS batch_state_membership ( + queue VARCHAR(255) NOT NULL, + state VARCHAR(255) NOT NULL, + batch_id VARCHAR(255) NOT NULL, + PRIMARY KEY (queue, state, batch_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/storage.go b/submitqueue/extension/storage/mysql/storage.go index 4ba0cf41..f38aa862 100644 --- a/submitqueue/extension/storage/mysql/storage.go +++ b/submitqueue/extension/storage/mysql/storage.go @@ -24,27 +24,29 @@ import ( ) type mysqlStorage struct { - db *sql.DB - requestStore storage.RequestStore - changeStore storage.ChangeStore - batchStore storage.BatchStore - batchDependentStore storage.BatchDependentStore - buildStore storage.BuildStore - speculationTreeStore storage.SpeculationTreeStore - requestLogStore storage.RequestLogStore + db *sql.DB + requestStore storage.RequestStore + changeStore storage.ChangeStore + batchStore storage.BatchStore + batchStateMembershipStore storage.BatchStateMembershipStore + batchDependentStore storage.BatchDependentStore + buildStore storage.BuildStore + speculationTreeStore storage.SpeculationTreeStore + requestLogStore storage.RequestLogStore } // NewStorage creates a new MySQL storage. func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { return &mysqlStorage{ - db: db, - requestStore: NewRequestStore(db, scope.SubScope("request_store")), - changeStore: NewChangeStore(db, scope.SubScope("change_store")), - batchStore: NewBatchStore(db, scope.SubScope("batch_store")), - batchDependentStore: NewBatchDependentStore(db, scope.SubScope("batch_dependent_store")), - buildStore: NewBuildStore(db, scope.SubScope("build_store")), - speculationTreeStore: NewSpeculationTreeStore(db, scope.SubScope("speculation_tree_store")), - requestLogStore: NewRequestLogStore(db, scope.SubScope("request_log_store")), + db: db, + requestStore: NewRequestStore(db, scope.SubScope("request_store")), + changeStore: NewChangeStore(db, scope.SubScope("change_store")), + batchStore: NewBatchStore(db, scope.SubScope("batch_store")), + batchStateMembershipStore: NewBatchStateMembershipStore(db, scope.SubScope("batch_state_membership_store")), + batchDependentStore: NewBatchDependentStore(db, scope.SubScope("batch_dependent_store")), + buildStore: NewBuildStore(db, scope.SubScope("build_store")), + speculationTreeStore: NewSpeculationTreeStore(db, scope.SubScope("speculation_tree_store")), + requestLogStore: NewRequestLogStore(db, scope.SubScope("request_log_store")), }, nil } @@ -63,6 +65,11 @@ func (f *mysqlStorage) GetBatchStore() storage.BatchStore { return f.batchStore } +// GetBatchStateMembershipStore returns the MySQL-backed BatchStateMembershipStore. +func (f *mysqlStorage) GetBatchStateMembershipStore() storage.BatchStateMembershipStore { + return f.batchStateMembershipStore +} + // GetBatchDependentStore returns the MySQL-backed BatchDependentStore. func (f *mysqlStorage) GetBatchDependentStore() storage.BatchDependentStore { return f.batchDependentStore diff --git a/submitqueue/extension/storage/storage.go b/submitqueue/extension/storage/storage.go index a02bef73..c6841a1a 100644 --- a/submitqueue/extension/storage/storage.go +++ b/submitqueue/extension/storage/storage.go @@ -53,6 +53,9 @@ type Storage interface { // GetBatchStore returns the BatchStore instance. GetBatchStore() BatchStore + // GetBatchStateMembershipStore returns the BatchStateMembershipStore instance. + GetBatchStateMembershipStore() BatchStateMembershipStore + // GetBatchDependentStore returns the BatchDependentStore instance. GetBatchDependentStore() BatchDependentStore diff --git a/submitqueue/orchestrator/controller/batch/BUILD.bazel b/submitqueue/orchestrator/controller/batch/BUILD.bazel index 72a89398..89c1e84f 100644 --- a/submitqueue/orchestrator/controller/batch/BUILD.bazel +++ b/submitqueue/orchestrator/controller/batch/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//submitqueue/entity", "//submitqueue/extension/conflict", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index f9b34cf1..178ef38c 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -28,6 +28,7 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" "go.uber.org/zap" ) @@ -135,21 +136,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Ask the conflict analyzer which active batches the new batch must serialize - // behind. ListActive returns all non-terminal batches; we narrow to - // Created/Speculating/Merging so the analyzer only sees batches that can still - // acquire new conflicts. - allActive, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) + // behind. Membership rows are hints; batchstate.List resolves authoritative + // batch rows and returns only the current states requested here. + activeBatches, err := batchstate.List(ctx, c.store, request.Queue, batchstate.ConflictStates...) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) } - activeBatches := make([]entity.Batch, 0, len(allActive)) - for _, b := range allActive { - switch b.State { - case entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging: - activeBatches = append(activeBatches, b) - } - } // Dedupe by batch ID since a single (analyzed, in-flight) pair may be // reported with multiple Conflict entries when different conflict types @@ -283,7 +276,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Persist batch to storage. // This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist. // We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries. - if err := c.store.GetBatchStore().Create(ctx, batch); err != nil { + if err := batchstate.Create(ctx, c.store, batch); err != nil { metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) return fmt.Errorf("failed to create batch in batch store: %w", err) } diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index beb229c8..c8700acf 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -72,6 +72,32 @@ func testRequest() entity.Request { } } +func expectConflictMembership( + membershipStore *storagemock.MockBatchStateMembershipStore, + batchStore *storagemock.MockBatchStore, + queue string, + batches []entity.Batch, +) { + states := []entity.BatchState{ + entity.BatchStateCreated, + entity.BatchStateSpeculating, + entity.BatchStateMerging, + } + byState := make(map[entity.BatchState][]string, len(states)) + byID := make(map[string]entity.Batch, len(batches)) + for _, b := range batches { + byState[b.State] = append(byState[b.State], b.ID) + byID[b.ID] = b + } + for _, state := range states { + ids := byState[state] + membershipStore.EXPECT().ListIDs(gomock.Any(), queue, state).Return(ids, nil) + for _, id := range ids { + batchStore.EXPECT().Get(gomock.Any(), id).Return(byID[id], nil) + } + } +} + // newTestController creates a controller with test dependencies. // If mockStorage is nil, a default MockStorage with an empty batch store is created. // If analyzer is nil, the "all" conflict analyzer is used (every active batch becomes a dependency). @@ -81,8 +107,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + for _, state := range []entity.BatchState{entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging} { + mockMembershipStore.EXPECT().ListIDs(gomock.Any(), gomock.Any(), state).Return(nil, nil).AnyTimes() + } + mockMembershipStore.EXPECT().Add(gomock.Any(), gomock.Any(), entity.BatchStateCreated, gomock.Any()).Return(nil).AnyTimes() mockReqStore := storagemock.NewMockRequestStore(ctrl) req := testRequest() @@ -94,6 +124,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() } @@ -212,12 +243,12 @@ func TestController_Process_WithDependencies(t *testing.T) { Version: 1, } - // Set up storage with active batches to become dependencies. ListActive - // returns every non-terminal batch; the controller filters to - // Created/Speculating/Merging in memory. The Scored and Cancelling batches - // below must be excluded — note no BatchDependent expectations are registered - // for them, so the default all.New() analyzer would fail the test on an - // unexpected mock call if the filter let them through. + // Set up storage with active batches to become dependencies. The controller + // queries only Created/Speculating/Merging membership states. The Scored and + // Cancelling batches below must be excluded — note no BatchDependent + // expectations are registered for them, so the default all.New() analyzer + // would fail the test on an unexpected mock call if the filter let them + // through. activeBatches := []entity.Batch{ {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, {ID: "test-queue/batch/2", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 2}, @@ -226,8 +257,10 @@ func TestController_Process_WithDependencies(t *testing.T) { } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", activeBatches) + mockMembershipStore.EXPECT().Add(gomock.Any(), "test-queue", entity.BatchStateCreated, gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) // batch/1 has no existing dependents. @@ -252,6 +285,7 @@ func TestController_Process_WithDependencies(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() @@ -278,8 +312,10 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { } mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", activeBatches) + mockMembershipStore.EXPECT().Add(gomock.Any(), "test-queue", entity.BatchStateCreated, gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) // Only batch/2 is selected by the analyzer, so only it gets a reverse-index update. @@ -296,6 +332,7 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() @@ -324,13 +361,15 @@ func TestController_Process_AnalyzerFailure(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() analyzer := conflictmock.NewMockAnalyzer(ctrl) @@ -420,7 +459,8 @@ func TestController_Process_CASLostToCancel(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -436,6 +476,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() @@ -473,7 +514,8 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { request := testRequest() mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", nil) // Create must NOT be called — gomock fails if it is. mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -488,6 +530,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() @@ -519,8 +562,10 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { request.Version = 2 // prior attempt bumped from 1 → 2 mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil) mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockMembershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectConflictMembership(mockMembershipStore, mockBatchStore, "test-queue", nil) + mockMembershipStore.EXPECT().Add(gomock.Any(), "test-queue", entity.BatchStateCreated, gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) @@ -533,6 +578,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchStateMembershipStore().Return(mockMembershipStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() diff --git a/submitqueue/orchestrator/controller/batchstate/BUILD.bazel b/submitqueue/orchestrator/controller/batchstate/BUILD.bazel new file mode 100644 index 00000000..ff0aca84 --- /dev/null +++ b/submitqueue/orchestrator/controller/batchstate/BUILD.bazel @@ -0,0 +1,12 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "batchstate", + srcs = ["batchstate.go"], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate", + visibility = ["//visibility:public"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/storage", + ], +) diff --git a/submitqueue/orchestrator/controller/batchstate/batchstate.go b/submitqueue/orchestrator/controller/batchstate/batchstate.go new file mode 100644 index 00000000..b18e11b9 --- /dev/null +++ b/submitqueue/orchestrator/controller/batchstate/batchstate.go @@ -0,0 +1,144 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package batchstate keeps batch-state membership maintenance in the +// orchestrator layer. Storage owns primitive records; this package owns the app +// semantics for creating batches, transitioning states, and resolving +// queue/state membership into authoritative Batch entities. +package batchstate + +import ( + "context" + "fmt" + + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" +) + +// NonTerminalStates are the batch states that should remain discoverable via +// membership lookups. +var NonTerminalStates = []entity.BatchState{ + entity.BatchStateCreated, + entity.BatchStateScored, + entity.BatchStateSpeculating, + entity.BatchStateMerging, + entity.BatchStateCancelling, +} + +// ConflictStates are the active states the batch controller considers when +// building conflict dependencies for a new batch. +var ConflictStates = []entity.BatchState{ + entity.BatchStateCreated, + entity.BatchStateSpeculating, + entity.BatchStateMerging, +} + +// Create records the initial non-terminal membership before creating the batch +// row, so a successfully persisted batch is not hidden from queue/state reads. +func Create(ctx context.Context, store storage.Storage, batch entity.Batch) error { + if !batch.State.IsTerminal() { + if err := store.GetBatchStateMembershipStore().Add(ctx, batch.Queue, batch.State, batch.ID); err != nil { + return fmt.Errorf("failed to add initial batch state membership for batch %s: %w", batch.ID, err) + } + } + if err := store.GetBatchStore().Create(ctx, batch); err != nil { + return err + } + return nil +} + +// UpdateState transitions a batch and maintains queue/state membership in the +// safe direction: add the target non-terminal state before the CAS, then +// best-effort remove the previous non-terminal state after the CAS succeeds. +func UpdateState(ctx context.Context, store storage.Storage, batch entity.Batch, newVersion int32, newState entity.BatchState) error { + if !newState.IsTerminal() { + if err := store.GetBatchStateMembershipStore().Add(ctx, batch.Queue, newState, batch.ID); err != nil { + return fmt.Errorf("failed to add batch state membership for batch %s state %s: %w", batch.ID, newState, err) + } + } + if err := store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { + return err + } + removePrevious(ctx, store, batch, newState) + return nil +} + +// UpdateScoreAndState is the score-writing variant of UpdateState. +func UpdateScoreAndState(ctx context.Context, store storage.Storage, batch entity.Batch, newVersion int32, score float64, newState entity.BatchState) error { + if !newState.IsTerminal() { + if err := store.GetBatchStateMembershipStore().Add(ctx, batch.Queue, newState, batch.ID); err != nil { + return fmt.Errorf("failed to add batch state membership for batch %s state %s: %w", batch.ID, newState, err) + } + } + if err := store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, newVersion, score, newState); err != nil { + return err + } + removePrevious(ctx, store, batch, newState) + return nil +} + +// List returns batches in queue whose current authoritative state is one of +// states. Membership rows are only hints: missing batch rows are skipped, stale +// rows are filtered, and terminal stale rows are best-effort removed. +func List(ctx context.Context, store storage.Storage, queue string, states ...entity.BatchState) ([]entity.Batch, error) { + if len(states) == 0 { + return nil, nil + } + + wanted := make(map[entity.BatchState]struct{}, len(states)) + for _, state := range states { + wanted[state] = struct{}{} + } + + seen := make(map[string]struct{}) + results := make([]entity.Batch, 0) + for _, state := range states { + ids, err := store.GetBatchStateMembershipStore().ListIDs(ctx, queue, state) + if err != nil { + return nil, fmt.Errorf("failed to list batch IDs for queue=%s state=%s: %w", queue, state, err) + } + for _, id := range ids { + if _, ok := seen[id]; ok { + continue + } + batch, err := store.GetBatchStore().Get(ctx, id) + if err != nil { + if storage.IsNotFound(err) { + continue + } + return nil, fmt.Errorf("failed to get batch id=%s from queue=%s state=%s membership: %w", id, queue, state, err) + } + if batch.Queue != queue { + continue + } + if batch.State.IsTerminal() { + _ = store.GetBatchStateMembershipStore().Remove(ctx, queue, state, id) + continue + } + if _, ok := wanted[batch.State]; !ok { + continue + } + seen[id] = struct{}{} + results = append(results, batch) + } + } + return results, nil +} + +func removePrevious(ctx context.Context, store storage.Storage, batch entity.Batch, newState entity.BatchState) { + if batch.State.IsTerminal() || batch.State == newState { + return + } + _ = store.GetBatchStateMembershipStore().Remove(ctx, batch.Queue, batch.State, batch.ID) +} diff --git a/submitqueue/orchestrator/controller/cancel/BUILD.bazel b/submitqueue/orchestrator/controller/cancel/BUILD.bazel index 466799f2..e2c29f2d 100644 --- a/submitqueue/orchestrator/controller/cancel/BUILD.bazel +++ b/submitqueue/orchestrator/controller/cancel/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/cancel/cancel.go b/submitqueue/orchestrator/controller/cancel/cancel.go index 2ac3341f..37f65490 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel.go +++ b/submitqueue/orchestrator/controller/cancel/cancel.go @@ -65,6 +65,7 @@ import ( "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" "go.uber.org/zap" ) @@ -184,11 +185,11 @@ func (c *Controller) markCancelling(ctx context.Context, request entity.Request) // findActiveBatch scans all active batches in the request's queue for one whose // Contains list includes the request. Returns (batch, true, nil) on a hit, // (zero, false, nil) when the request is not yet batched, and any storage -// error otherwise. ListActive includes Cancelling batches, so redelivery of a -// cancel whose speculate hand-off publish failed still resolves and retries. +// error otherwise. NonTerminalStates includes Cancelling batches, so redelivery +// of a cancel whose speculate hand-off publish failed still resolves and retries. func (c *Controller) findActiveBatch(ctx context.Context, request entity.Request) (entity.Batch, bool, error) { // TODO: Scans all the batches in flight - make it more efficient? - active, err := c.store.GetBatchStore().ListActive(ctx, request.Queue) + active, err := batchstate.List(ctx, c.store, request.Queue, batchstate.NonTerminalStates...) if err != nil { c.metricsScope.Counter("batch_store_errors").Inc(1) return entity.Batch{}, false, fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) @@ -261,7 +262,7 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error if batch.State != entity.BatchStateCancelling { newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateCancelling); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, entity.BatchStateCancelling); err != nil { c.metricsScope.Counter("batch_update_errors").Inc(1) // storage.ErrVersionMismatch here means the batch advanced concurrently // (e.g. speculate / merge progressed). Returned as-is for the base diff --git a/submitqueue/orchestrator/controller/cancel/cancel_test.go b/submitqueue/orchestrator/controller/cancel/cancel_test.go index 833f97d0..d69d744d 100644 --- a/submitqueue/orchestrator/controller/cancel/cancel_test.go +++ b/submitqueue/orchestrator/controller/cancel/cancel_test.go @@ -71,6 +71,34 @@ func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte, partitio return d } +func expectNonTerminalMembership( + membershipStore *storagemock.MockBatchStateMembershipStore, + batchStore *storagemock.MockBatchStore, + queue string, + batches []entity.Batch, +) { + states := []entity.BatchState{ + entity.BatchStateCreated, + entity.BatchStateScored, + entity.BatchStateSpeculating, + entity.BatchStateMerging, + entity.BatchStateCancelling, + } + byState := make(map[entity.BatchState][]string, len(states)) + byID := make(map[string]entity.Batch, len(batches)) + for _, b := range batches { + byState[b.State] = append(byState[b.State], b.ID) + byID[b.ID] = b + } + for _, state := range states { + ids := byState[state] + membershipStore.EXPECT().ListIDs(gomock.Any(), queue, state).Return(ids, nil) + for _, id := range ids { + batchStore.EXPECT().Get(gomock.Any(), id).Return(byID[id], nil) + } + } +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) registry, pub := newRegistry(t, ctrl) @@ -144,11 +172,13 @@ func TestProcess_CancelsUnbatchedRequest(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() controller := newController(t, store, registry) err := controller.Process(context.Background(), newDelivery(t, ctrl, cancelPayload(t, "q/1", "user changed mind"), "q/1")) @@ -175,11 +205,13 @@ func TestProcess_AlreadyCancelling_SkipsMarkCancelling(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateCancelled).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() controller := newController(t, store, registry) err := controller.Process(context.Background(), newDelivery(t, ctrl, cancelPayload(t, "q/1", ""), "q/1")) @@ -228,11 +260,13 @@ func TestProcess_UnbatchedVersionMismatch_Retryable(t *testing.T) { ) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return(nil, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() controller := newController(t, store, registry) err := controller.Process(context.Background(), newDelivery(t, ctrl, cancelPayload(t, "q/1", ""), "q/1")) @@ -276,13 +310,17 @@ func TestProcess_BatchPath_HandsOffToSpeculate(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", []entity.Batch{batch}) + membershipStore.EXPECT().Add(gomock.Any(), "q", entity.BatchStateCancelling, batch.ID).Return(nil) // Single batch CAS: intent only. No terminal CAS. batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(3), int32(4), entity.BatchStateCancelling).Return(nil) + membershipStore.EXPECT().Remove(gomock.Any(), "q", entity.BatchStateSpeculating, batch.ID).Return(nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() // BatchDependentStore and BuildStore must NOT be touched — speculate owns those now. controller := newController(t, store, registry) @@ -325,12 +363,14 @@ func TestProcess_BatchAlreadyCancelling_RepublishesToSpeculate(t *testing.T) { // No request UpdateState — already in Cancelling. batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", []entity.Batch{batch}) // No batch UpdateState — already in Cancelling. store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() controller := newController(t, store, registry) err := controller.Process(context.Background(), newDelivery(t, ctrl, cancelPayload(t, "q/1", ""), "q/1")) @@ -356,13 +396,16 @@ func TestProcess_BatchIntentVersionMismatch_Retryable(t *testing.T) { reqStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateCancelling).Return(nil) batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().ListActive(gomock.Any(), "q").Return([]entity.Batch{batch}, nil) + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + expectNonTerminalMembership(membershipStore, batchStore, "q", []entity.Batch{batch}) + membershipStore.EXPECT().Add(gomock.Any(), "q", entity.BatchStateCancelling, batch.ID).Return(nil) batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelling). Return(storage.ErrVersionMismatch) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() controller := newController(t, store, registry) err := controller.Process(context.Background(), newDelivery(t, ctrl, cancelPayload(t, "q/1", ""), "q/1")) diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel index f38da3af..7857b03d 100644 --- a/submitqueue/orchestrator/controller/dlq/BUILD.bazel +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//core/metrics", "//submitqueue/entity", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/dlq/batch_test.go b/submitqueue/orchestrator/controller/dlq/batch_test.go index db55ed6f..1f41bbea 100644 --- a/submitqueue/orchestrator/controller/dlq/batch_test.go +++ b/submitqueue/orchestrator/controller/dlq/batch_test.go @@ -60,6 +60,7 @@ func TestDLQBatchController_Process_FailsAndFansOut(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchFailedTransition(ctrl, store, "q/batch/9", "q", entity.BatchStateMerging) store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() diff --git a/submitqueue/orchestrator/controller/dlq/buildsignal_test.go b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go index 329f35b5..1f85cdb9 100644 --- a/submitqueue/orchestrator/controller/dlq/buildsignal_test.go +++ b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go @@ -67,6 +67,7 @@ func TestDLQBuildSignalController_Process_FansOutToBatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchFailedTransition(ctrl, store, "q/batch/2", "q", entity.BatchStateSpeculating) store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() diff --git a/submitqueue/orchestrator/controller/dlq/dlq.go b/submitqueue/orchestrator/controller/dlq/dlq.go index 77570ca4..3795f333 100644 --- a/submitqueue/orchestrator/controller/dlq/dlq.go +++ b/submitqueue/orchestrator/controller/dlq/dlq.go @@ -41,6 +41,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" "go.uber.org/zap" ) @@ -158,7 +159,7 @@ func failBatch(ctx context.Context, store storage.Storage, logger *zap.SugaredLo ) } else { newVersion := batch.Version + 1 - if err := store.GetBatchStore().UpdateState(ctx, batchID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { + if err := batchstate.UpdateState(ctx, store, batch, newVersion, entity.BatchStateFailed); err != nil { return fmt.Errorf("failed to update batch %s state to failed: %w", batchID, err) } logger.Infow("dlq reconcile: batch marked failed", diff --git a/submitqueue/orchestrator/controller/dlq/dlq_test.go b/submitqueue/orchestrator/controller/dlq/dlq_test.go index 4ace4fe4..de039c74 100644 --- a/submitqueue/orchestrator/controller/dlq/dlq_test.go +++ b/submitqueue/orchestrator/controller/dlq/dlq_test.go @@ -183,6 +183,12 @@ func TestFailRequest_GenericGetErrorIsNonRetryable(t *testing.T) { // failBatch +func expectBatchFailedTransition(ctrl *gomock.Controller, store *storagemock.MockStorage, batchID, queue string, oldState entity.BatchState) { + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + membershipStore.EXPECT().Remove(gomock.Any(), queue, oldState, batchID).Return(nil).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() +} + func TestFailBatch_TransitionsAndFansOut(t *testing.T) { ctrl := gomock.NewController(t) @@ -208,6 +214,7 @@ func TestFailBatch_TransitionsAndFansOut(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchFailedTransition(ctrl, store, "q/batch/1", "q", entity.BatchStateMerging) store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() @@ -239,6 +246,7 @@ func TestFailBatch_AlreadyTerminalFansOutOnly(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchFailedTransition(ctrl, store, "q/batch/1", "q", entity.BatchStateCancelling) store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() @@ -273,6 +281,7 @@ func TestFailBatch_CancellingTransitionsToFailed(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchFailedTransition(ctrl, store, "q/batch/1", "q", entity.BatchStateCancelling) store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() diff --git a/submitqueue/orchestrator/controller/merge/BUILD.bazel b/submitqueue/orchestrator/controller/merge/BUILD.bazel index cd74e742..6ae4d65e 100644 --- a/submitqueue/orchestrator/controller/merge/BUILD.bazel +++ b/submitqueue/orchestrator/controller/merge/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//submitqueue/entity", "//submitqueue/extension/pusher", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/merge/merge.go b/submitqueue/orchestrator/controller/merge/merge.go index 72607ec6..9726fd4e 100644 --- a/submitqueue/orchestrator/controller/merge/merge.go +++ b/submitqueue/orchestrator/controller/merge/merge.go @@ -29,6 +29,7 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/pusher" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" ) // Controller handles merge queue messages. It loads every request in a batch, @@ -153,7 +154,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, newState); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "state_update_errors", 1) return fmt.Errorf("failed to transition batch %s to %s: %w", batch.ID, newState, err) } diff --git a/submitqueue/orchestrator/controller/merge/merge_test.go b/submitqueue/orchestrator/controller/merge/merge_test.go index ffdcddd8..fb9eea5a 100644 --- a/submitqueue/orchestrator/controller/merge/merge_test.go +++ b/submitqueue/orchestrator/controller/merge/merge_test.go @@ -76,6 +76,12 @@ func newPusherFactory(ctrl *gomock.Controller, p pusher.Pusher) pusher.Factory { return f } +func expectTerminalBatchTransition(ctrl *gomock.Controller, store *storagemock.MockStorage, batch entity.Batch) { + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + membershipStore.EXPECT().Remove(gomock.Any(), batch.Queue, batch.State, batch.ID).Return(nil).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) @@ -117,6 +123,7 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectTerminalBatchTransition(ctrl, store, batch) mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( @@ -172,6 +179,7 @@ func TestController_Process_ForwardsBatchToPusher(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectTerminalBatchTransition(ctrl, store, batch) mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( @@ -216,6 +224,7 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectTerminalBatchTransition(ctrl, store, batch) mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -256,6 +265,7 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectTerminalBatchTransition(ctrl, store, batch) mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( @@ -414,6 +424,7 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectTerminalBatchTransition(ctrl, store, batch) mockPusher := pushermock.NewMockPusher(ctrl) mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( diff --git a/submitqueue/orchestrator/controller/score/BUILD.bazel b/submitqueue/orchestrator/controller/score/BUILD.bazel index 96976e6c..9d2ebb92 100644 --- a/submitqueue/orchestrator/controller/score/BUILD.bazel +++ b/submitqueue/orchestrator/controller/score/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//submitqueue/entity", "//submitqueue/extension/scorer", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 272713bc..bb0c20aa 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -27,6 +27,7 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/scorer" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" "go.uber.org/zap" ) @@ -140,11 +141,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Atomically update score and state to "scored" in the database newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, newVersion, batchScore, entity.BatchStateScored); err != nil { + if err := batchstate.UpdateScoreAndState(ctx, c.store, batch, newVersion, batchScore, entity.BatchStateScored); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update score for batch %s: %w", batch.ID, err) } batch.Version = newVersion + batch.State = entity.BatchStateScored c.logger.Infow("scored batch", "batch_id", batch.ID, diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index f1b91ddd..ac54a441 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -87,6 +87,13 @@ func mockChangeStore(ctrl *gomock.Controller, requests ...entity.Request) *stora return cs } +func expectScoreMembership(ctrl *gomock.Controller, store *storagemock.MockStorage, batch entity.Batch) { + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + membershipStore.EXPECT().Add(gomock.Any(), batch.Queue, entity.BatchStateScored, batch.ID).Return(nil).AnyTimes() + membershipStore.EXPECT().Remove(gomock.Any(), batch.Queue, batch.State, batch.ID).Return(nil).AnyTimes() + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() +} + // newMockStorage creates a MockStorage with a MockBatchStore, MockRequestStore, and MockChangeStore. func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity.Request) *storagemock.MockStorage { mockBatchStore := storagemock.NewMockBatchStore(ctrl) @@ -98,6 +105,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity. store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + expectScoreMembership(ctrl, store, batch) store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() return store @@ -190,6 +198,7 @@ func TestController_Process_BatchLevelScore(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + expectScoreMembership(ctrl, store, batch) // The controller passes the batch identity to the scorer and persists its score. mockScorer := scorermock.NewMockScorer(ctrl) @@ -246,6 +255,7 @@ func TestController_Process_ScorerFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + expectScoreMembership(ctrl, store, batch) store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() @@ -278,6 +288,7 @@ func TestController_Process_UpdateScoreFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + expectScoreMembership(ctrl, store, batch) store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() diff --git a/submitqueue/orchestrator/controller/speculate/BUILD.bazel b/submitqueue/orchestrator/controller/speculate/BUILD.bazel index fa1e9ae7..d7626f19 100644 --- a/submitqueue/orchestrator/controller/speculate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/speculate/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//submitqueue/core/topickey", "//submitqueue/entity", "//submitqueue/extension/storage", + "//submitqueue/orchestrator/controller/batchstate", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 2631c42e..888b520f 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -26,6 +26,7 @@ import ( "github.com/uber/submitqueue/submitqueue/core/topickey" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/batchstate" "go.uber.org/zap" ) @@ -162,7 +163,7 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e // Optimistic CAS: if the version has already advanced (concurrent speculate), // the next event will see the new state and behave correctly. newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateSpeculating); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, entity.BatchStateSpeculating); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to speculating: %w", batch.ID, err) } @@ -223,7 +224,7 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error } newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateMerging); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, entity.BatchStateMerging); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to merging: %w", batch.ID, err) } @@ -245,7 +246,7 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d ) newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, entity.BatchStateFailed); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) } @@ -307,7 +308,7 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error } newVersion := batch.Version + 1 - if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateCancelled); err != nil { + if err := batchstate.UpdateState(ctx, c.store, batch, newVersion, entity.BatchStateCancelled); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to cancelled: %w", batch.ID, err) } diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 6b0a6b5f..d8042701 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -90,6 +90,17 @@ func runProcess(t *testing.T, ctrl *gomock.Controller, controller *Controller, b return controller.Process(context.Background(), delivery) } +func expectBatchStateTransition(ctrl *gomock.Controller, store *storagemock.MockStorage, batch entity.Batch, newState entity.BatchState) { + membershipStore := storagemock.NewMockBatchStateMembershipStore(ctrl) + if !newState.IsTerminal() { + membershipStore.EXPECT().Add(gomock.Any(), batch.Queue, newState, batch.ID).Return(nil).AnyTimes() + } + if !batch.State.IsTerminal() && batch.State != newState { + membershipStore.EXPECT().Remove(gomock.Any(), batch.Queue, batch.State, batch.ID).Return(nil).AnyTimes() + } + store.EXPECT().GetBatchStateMembershipStore().Return(membershipStore).AnyTimes() +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) @@ -123,6 +134,7 @@ func TestController_Process_StartSpeculation(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateSpeculating) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -141,6 +153,7 @@ func TestController_Process_FinalizeNoDeps(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateMerging) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -161,6 +174,7 @@ func TestController_Process_FinalizeAllDepsSucceeded(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateMerging) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -199,6 +213,7 @@ func TestController_Process_FailedDepFailsBatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateFailed) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -221,6 +236,7 @@ func TestController_Process_CancelledDepSkipped(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateMerging) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -375,6 +391,7 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateCancelled) type pubRec struct { topic string @@ -438,6 +455,7 @@ func TestController_Process_CancellingBuildAlreadyTerminal(t *testing.T) { store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateCancelled) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -467,6 +485,7 @@ func TestController_Process_CancellingNoBuildYet(t *testing.T) { store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateCancelled) controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -494,6 +513,7 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateCancelled) mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) @@ -534,6 +554,7 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + expectBatchStateTransition(ctrl, store, batch, entity.BatchStateCancelled) // BatchDependentStore must NOT be touched — terminal CAS failed before fan-out. // No publish expected (terminal CAS failed before fan-out). diff --git a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel index 7733d1f9..fd6270de 100644 --- a/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/test/integration/submitqueue/extension/storage/mysql/BUILD.bazel @@ -12,8 +12,6 @@ go_test( "integration", ], deps = [ - "//submitqueue/entity", - "//submitqueue/extension/storage", "//submitqueue/extension/storage/mysql", "//test/integration/submitqueue/extension/storage", "//test/testutil", diff --git a/test/integration/submitqueue/extension/storage/mysql/storage_test.go b/test/integration/submitqueue/extension/storage/mysql/storage_test.go index 69ff0dcd..03e3ee4a 100644 --- a/test/integration/submitqueue/extension/storage/mysql/storage_test.go +++ b/test/integration/submitqueue/extension/storage/mysql/storage_test.go @@ -23,8 +23,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" - "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/storage" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" storagesuite "github.com/uber/submitqueue/test/integration/submitqueue/extension/storage" "github.com/uber/submitqueue/test/testutil" @@ -98,103 +96,3 @@ func (s *MySQLStorageIntegrationSuite) TearDownSuite() { s.log.Logf("Tearing down MySQL Storage integration test suite") // Cleanup handled automatically by testutil.ComposeStack } - -// countActiveBatchRows returns the number of active_batch membership rows for the -// given (queue, batch_id) — index state internal to the MySQL impl, not visible -// through the storage.Storage contract. -func (s *MySQLStorageIntegrationSuite) countActiveBatchRows(queue, batchID string) int { - t := s.T() - var n int - err := s.db.QueryRowContext(context.Background(), - "SELECT COUNT(*) FROM active_batch WHERE queue = ? AND batch_id = ?", - queue, batchID, - ).Scan(&n) - require.NoError(t, err) - return n -} - -// TestActiveBatch_SelfHealsTerminalMembership verifies that ListActive deletes the -// membership row of a batch that has reached a terminal state. -func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SelfHealsTerminalMembership() { - t := s.T() - ctx := context.Background() - const queue = "bq-selfheal-terminal" - const id = queue + "/batch/1" - - store := s.GetStorage().GetBatchStore() - require.NoError(t, store.Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1})) - require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create should record active membership") - - require.NoError(t, store.UpdateState(ctx, id, 1, 2, entity.BatchStateSucceeded)) - - active, err := store.ListActive(ctx, queue) - require.NoError(t, err) - require.Empty(t, active, "terminal batch should not be listed as active") - require.Equal(t, 0, s.countActiveBatchRows(queue, id), "ListActive should self-heal the stale membership row") -} - -// TestActiveBatch_SkipsDanglingMembershipWithoutDeleting verifies that ListActive -// skips a membership row whose batch does not exist but does NOT delete it: it may -// belong to an in-flight Create that hasn't written its batch row yet. -func (s *MySQLStorageIntegrationSuite) TestActiveBatch_SkipsDanglingMembershipWithoutDeleting() { - t := s.T() - ctx := context.Background() - const queue = "bq-dangling-skip" - const id = queue + "/batch/ghost" - - _, err := s.db.ExecContext(ctx, "INSERT INTO active_batch (queue, batch_id) VALUES (?, ?)", queue, id) - require.NoError(t, err) - - active, err := s.GetStorage().GetBatchStore().ListActive(ctx, queue) - require.NoError(t, err) - require.Empty(t, active, "dangling membership should not surface a batch") - require.Equal(t, 1, s.countActiveBatchRows(queue, id), "ListActive must NOT delete a missing-batch membership (it may belong to an in-flight create)") -} - -// TestActiveBatch_CreateKeepsMembershipOnDuplicate verifies that a duplicate Create -// (ErrAlreadyExists) does NOT delete the membership row, which belongs to the live -// existing batch. -func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnDuplicate() { - t := s.T() - ctx := context.Background() - const queue = "bq-create-dup" - const id = queue + "/batch/1" - store := s.GetStorage().GetBatchStore() - - b := entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1} - require.NoError(t, store.Create(ctx, b)) - require.Equal(t, 1, s.countActiveBatchRows(queue, id)) - - require.ErrorIs(t, store.Create(ctx, b), storage.ErrAlreadyExists) - require.Equal(t, 1, s.countActiveBatchRows(queue, id), "duplicate Create must NOT delete the existing batch's membership") - - active, err := store.ListActive(ctx, queue) - require.NoError(t, err) - require.Len(t, active, 1, "the original batch must remain active") -} - -// TestActiveBatch_CreateKeepsMembershipOnFailedInsert verifies that a non-duplicate -// batch-insert failure leaves the membership row in place rather than deleting it -// (the batch row may have committed despite the error). The failure is induced by -// dropping the batch table (restored afterwards). -func (s *MySQLStorageIntegrationSuite) TestActiveBatch_CreateKeepsMembershipOnFailedInsert() { - t := s.T() - ctx := context.Background() - const queue = "bq-create-fail" - const id = queue + "/batch/1" - - var tbl, ddl string - require.NoError(t, s.db.QueryRowContext(ctx, "SHOW CREATE TABLE batch").Scan(&tbl, &ddl)) - _, err := s.db.ExecContext(ctx, "DROP TABLE batch") - require.NoError(t, err) - defer func() { - _, derr := s.db.ExecContext(context.Background(), ddl) - require.NoError(t, derr, "must restore the batch table for subsequent tests") - }() - - err = s.GetStorage().GetBatchStore().Create(ctx, entity.Batch{ID: id, Queue: queue, Contains: []string{id + "/req"}, Dependencies: []string{}, State: entity.BatchStateCreated, Version: 1}) - require.Error(t, err, "Create should fail when the batch table is missing") - require.NotErrorIs(t, err, storage.ErrAlreadyExists, "the failure must be a non-duplicate error") - - require.Equal(t, 1, s.countActiveBatchRows(queue, id), "Create must NOT delete the membership on a failed insert: the batch row may have committed despite the error") -} diff --git a/test/integration/submitqueue/extension/storage/suite.go b/test/integration/submitqueue/extension/storage/suite.go index d2bffc75..94bd7b93 100644 --- a/test/integration/submitqueue/extension/storage/suite.go +++ b/test/integration/submitqueue/extension/storage/suite.go @@ -388,92 +388,91 @@ func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyDetails() { assert.Equal(t, entity.ChangeDetails{}, got[0].Details) } -// newBatch builds a batch fixture for the active-listing tests. -func newBatch(id, queue string, state entity.BatchState) entity.Batch { - return entity.Batch{ - ID: id, - Queue: queue, - Contains: []string{id + "/req"}, - Dependencies: []string{}, - State: state, - Version: 1, - } -} - -// activeBatchIDs returns the sorted IDs of the queue's active batches, for stable comparison. -func (s *StorageContractSuite) activeBatchIDs(queue string) []string { +// membershipIDs returns sorted batch IDs for stable comparison. +func (s *StorageContractSuite) membershipIDs(queue string, state entity.BatchState) []string { t := s.T() - batches, err := s.storage.GetBatchStore().ListActive(s.ctx, queue) + ids, err := s.storage.GetBatchStateMembershipStore().ListIDs(s.ctx, queue, state) require.NoError(t, err) - ids := make([]string, 0, len(batches)) - for _, b := range batches { - ids = append(ids, b.ID) - } sort.Strings(ids) return ids } -// TestStorage_BatchListActive_ReturnsActive verifies a freshly created batch is -// listed as active, and that ListActive resolves the full entity. -func (s *StorageContractSuite) TestStorage_BatchListActive_ReturnsActive() { +// TestStorage_BatchStateMembership_AddAndList verifies membership rows are +// listed by their queue/state key. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_AddAndList() { t := s.T() ctx := s.ctx - const queue = "bq-active" + const queue = "bsm-list" - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateSpeculating))) + store := s.storage.GetBatchStateMembershipStore() + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/2")) - batches, err := s.storage.GetBatchStore().ListActive(ctx, queue) - require.NoError(t, err) - require.Len(t, batches, 2) - assert.Equal(t, []string{queue + "/batch/1", queue + "/batch/2"}, s.activeBatchIDs(queue)) + assert.Equal(t, []string{queue + "/batch/1", queue + "/batch/2"}, s.membershipIDs(queue, entity.BatchStateCreated)) } -// TestStorage_BatchListActive_ExcludesTerminal verifies that a batch transitioned -// to a terminal state via UpdateState drops out of the active listing. -func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminal() { +// TestStorage_BatchStateMembership_AddIdempotent verifies repeated Add calls do +// not duplicate rows. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_AddIdempotent() { t := s.T() ctx := s.ctx - const queue = "bq-terminal" + const queue = "bsm-idempotent" - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateMerging))) - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/2", queue, entity.BatchStateCreated))) + store := s.storage.GetBatchStateMembershipStore() + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + + assert.Equal(t, []string{queue + "/batch/1"}, s.membershipIDs(queue, entity.BatchStateCreated)) +} + +// TestStorage_BatchStateMembership_Remove verifies Remove is idempotent and +// deletes only the specified row. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_Remove() { + t := s.T() + ctx := s.ctx + const queue = "bsm-remove" - // batch/1 lands; it must no longer be active. - require.NoError(t, s.storage.GetBatchStore().UpdateState(ctx, queue+"/batch/1", 1, 2, entity.BatchStateSucceeded)) + store := s.storage.GetBatchStateMembershipStore() + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/2")) + require.NoError(t, store.Remove(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + require.NoError(t, store.Remove(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) - assert.Equal(t, []string{queue + "/batch/2"}, s.activeBatchIDs(queue)) + assert.Equal(t, []string{queue + "/batch/2"}, s.membershipIDs(queue, entity.BatchStateCreated)) } -// TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState covers the other -// terminal write path (UpdateScoreAndState) used by the score/speculate pipeline. -func (s *StorageContractSuite) TestStorage_BatchListActive_ExcludesTerminalViaScoreAndState() { +// TestStorage_BatchStateMembership_QueueScoped verifies ListIDs never returns +// rows from another queue. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_QueueScoped() { t := s.T() ctx := s.ctx - const queue = "bq-terminal-score" + const queueA = "bsm-scoped-a" + const queueB = "bsm-scoped-b" - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queue+"/batch/1", queue, entity.BatchStateCreated))) - require.NoError(t, s.storage.GetBatchStore().UpdateScoreAndState(ctx, queue+"/batch/1", 1, 2, 0.5, entity.BatchStateFailed)) + store := s.storage.GetBatchStateMembershipStore() + require.NoError(t, store.Add(ctx, queueA, entity.BatchStateCreated, queueA+"/batch/1")) + require.NoError(t, store.Add(ctx, queueB, entity.BatchStateCreated, queueB+"/batch/1")) + require.NoError(t, store.Add(ctx, queueB, entity.BatchStateCreated, queueB+"/batch/2")) - assert.Empty(t, s.activeBatchIDs(queue)) + assert.Equal(t, []string{queueA + "/batch/1"}, s.membershipIDs(queueA, entity.BatchStateCreated)) + assert.Equal(t, []string{queueB + "/batch/1", queueB + "/batch/2"}, s.membershipIDs(queueB, entity.BatchStateCreated)) } -// TestStorage_BatchListActive_QueueScoped verifies the listing is scoped to one queue. -func (s *StorageContractSuite) TestStorage_BatchListActive_QueueScoped() { +// TestStorage_BatchStateMembership_StateScoped verifies ListIDs is scoped by state. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_StateScoped() { t := s.T() ctx := s.ctx - const queueA = "bq-scoped-a" - const queueB = "bq-scoped-b" + const queue = "bsm-state-scoped" - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueA+"/batch/1", queueA, entity.BatchStateCreated))) - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/1", queueB, entity.BatchStateCreated))) - require.NoError(t, s.storage.GetBatchStore().Create(ctx, newBatch(queueB+"/batch/2", queueB, entity.BatchStateCreated))) + store := s.storage.GetBatchStateMembershipStore() + require.NoError(t, store.Add(ctx, queue, entity.BatchStateCreated, queue+"/batch/1")) + require.NoError(t, store.Add(ctx, queue, entity.BatchStateSpeculating, queue+"/batch/2")) - assert.Equal(t, []string{queueA + "/batch/1"}, s.activeBatchIDs(queueA)) - assert.Equal(t, []string{queueB + "/batch/1", queueB + "/batch/2"}, s.activeBatchIDs(queueB)) + assert.Equal(t, []string{queue + "/batch/1"}, s.membershipIDs(queue, entity.BatchStateCreated)) + assert.Equal(t, []string{queue + "/batch/2"}, s.membershipIDs(queue, entity.BatchStateSpeculating)) } -// TestStorage_BatchListActive_UnknownQueue returns an empty set for a queue with no batches. -func (s *StorageContractSuite) TestStorage_BatchListActive_UnknownQueue() { - assert.Empty(s.T(), s.activeBatchIDs("bq-does-not-exist")) +// TestStorage_BatchStateMembership_UnknownKey returns an empty set for a key with no rows. +func (s *StorageContractSuite) TestStorage_BatchStateMembership_UnknownKey() { + assert.Empty(s.T(), s.membershipIDs("bsm-does-not-exist", entity.BatchStateCreated)) }