Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions pkg/compactor/compactor_paritioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1588,10 +1589,27 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.

// Pin deterministic ring tokens so that each compactor owns exactly half of
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
// With random tokens there is a ~1-in-1000 chance per run that the second
// compactor owns zero users, which made the previous wait condition
// permanently unsatisfiable (#7565, #7608).
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))

// Each compactor will get its own temp dir for storing local files.
c, _, tsdbPlanner, _, _ := prepareForPartitioning(t, cfg, inmem, nil, nil)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
// With the long compaction interval the compactor is usually still
// waiting for its initial jittered compaction run when the test ends.
// Stopping it at that point makes running() return the context
// cancellation, which is reported as a service failure: tolerate it
// (and only it).
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
require.ErrorIs(t, err, context.Canceled)
}
})

compactors = append(compactors, c)
Expand All @@ -1610,38 +1628,51 @@ func TestPartitionCompactor_DeleteLocalSyncFiles(t *testing.T) {
// Start first compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))

// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
cortex_testutil.Poll(t, 20*time.Second, true, func() any {
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) >= 1
})
// Run a compaction cycle on the first compactor: it is alone in the ring, so
// it owns (and syncs) all the users.
c1.compactUsers(context.Background())
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))

require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))

// Verify that first compactor has synced all the users, plus there is one extra we have just created.
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))

// Now start second compactor, and wait until it runs compaction.
// Now start second compactor.
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
// Wait for at least two completed cycles so we sample after a steady-state
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
// first cycle's CompactionRunsCompleted can increment with zero owned users
// due to transient ring-view skew at startup; sampling then would race with
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
// and return a partial count.
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
len(c2.listTenantsWithMetaSyncDirectories()) > 0

// Before driving ownership-dependent compaction cycles, wait until BOTH
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
// operation ownUser itself queries). c2's own view is already barriered by
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
// ring watcher ingests c2's registration asynchronously, and the final
// c1.compactUsers() cleanup below depends on c1's view.
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
for _, c := range compactors {
rs, err := c.ring.GetAllHealthy(RingOp)
if err != nil || len(rs.Instances) != 2 {
return false
}
}
return true
})

// Run a compaction cycle on the second compactor: with pinned tokens it owns
// exactly half of the users and creates a meta-sync directory for each of them.
c2.compactUsers(context.Background())

// Let's check how many users second compactor has.
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
require.Equal(t, numUsers/2, c2Users)

// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
c1.compactUsers(context.Background())
c1Users := len(c1.listTenantsWithMetaSyncDirectories())

// Now compactor 1 should have cleaned old sync files.
require.NotEqual(t, numUsers, c1Users)
// Now compactor 1 should have cleaned the sync files of the users it no longer
// owns (including "new-user"), keeping exactly its own half.
require.Equal(t, numUsers-numUsers/2, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

Expand Down
95 changes: 78 additions & 17 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"encoding/json"
"flag"
"fmt"
"hash/fnv"
"io"
"math"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -1779,6 +1781,35 @@ func mockParquetMarker() string {
return string(content)
}

// pinnedTokens returns exactly 512 deterministic ring tokens (NumTokens is
// hardcoded to 512 in RingConfig.ToLifecyclerConfig; shorter token files are
// silently topped up with random tokens by the lifecycler) pinning ring
// ownership so that instance i (1-based) owns every 2nd test user: the guard
// token fnv32a(user)+1 is always the first token strictly greater than that
// user's hash, while the filler tokens sit far below every user hash. Pinning
// removes the ~1-in-1000 random-token draw where one compactor owns zero of
// the test users — the root cause of both #7565 and #7608. Do NOT replace the
// guards with evenly-spaced tokens: fnv32a("user-N") hashes step by the FNV
// prime 16777619 and resonate with regular spacings, which can degenerate to
// a 0/10 ownership split.
func pinnedTokens(t *testing.T, userIDs []string, instance int) ring.Tokens {
t.Helper()

tokens := make(ring.Tokens, 0, 512)
for k := instance - 1; k < len(userIDs); k += 2 {
h := fnv.New32a()
_, _ = h.Write([]byte(userIDs[k]))
require.NotEqual(t, uint32(math.MaxUint32), h.Sum32()) // the +1 below must not wrap
tokens = append(tokens, h.Sum32()+1)
}
for j := uint32(0); len(tokens) < 512; j++ {
tokens = append(tokens, uint32(instance)+2*j) // instance 1: odd 1..1013, instance 2: even 2..1014
}
require.Len(t, tokens, 512)

return tokens
}

func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
numUsers := 10

Expand Down Expand Up @@ -1812,10 +1843,27 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually.

// Pin deterministic ring tokens so that each compactor owns exactly half of
// the test users (compactor-1: user-1,3,5,7,9; compactor-2: user-2,4,6,8,10).
// With random tokens there is a ~1-in-1000 chance per run that the second
// compactor owns zero users, which made the previous wait condition
// permanently unsatisfiable (#7565, #7608).
cfg.ShardingRing.TokensFilePath = filepath.Join(t.TempDir(), "tokens")
require.NoError(t, ring.TokenFile{PreviousState: ring.ACTIVE, Tokens: pinnedTokens(t, userIDs, i)}.StoreToFile(cfg.ShardingRing.TokensFilePath))

// Each compactor will get its own temp dir for storing local files.
c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
// With the long compaction interval the compactor is usually still
// waiting for its initial jittered compaction run when the test ends.
// Stopping it at that point makes running() return the context
// cancellation, which is reported as a service failure: tolerate it
// (and only it).
if err := services.StopAndAwaitTerminated(context.Background(), c); err != nil {
require.ErrorIs(t, err, context.Canceled)
}
})

compactors = append(compactors, c)
Expand All @@ -1834,38 +1882,51 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
// Start first compactor
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c1))

// Wait until a run has been completed on first compactor. This happens as soon as compactor starts.
cortex_testutil.Poll(t, 10*time.Second, 1.0, func() any {
return prom_testutil.ToFloat64(c1.CompactionRunsCompleted)
})
// Run a compaction cycle on the first compactor: it is alone in the ring, so
// it owns (and syncs) all the users.
c1.compactUsers(context.Background())
require.Equal(t, numUsers, len(c1.listTenantsWithMetaSyncDirectories()))

require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600))

// Verify that first compactor has synced all the users, plus there is one extra we have just created.
require.Equal(t, numUsers+1, len(c1.listTenantsWithMetaSyncDirectories()))

// Now start second compactor, and wait until it runs compaction.
// Now start second compactor.
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2))
// Wait for at least two completed cycles so we sample after a steady-state
// ownership cycle, not mid-cycle following a zero-owned first cycle. The
// first cycle's CompactionRunsCompleted can increment with zero owned users
// due to transient ring-view skew at startup; sampling then would race with
// the second cycle's fetcher.NewBaseFetcher creating meta-sync directories
// and return a partial count.
cortex_testutil.Poll(t, 30*time.Second, true, func() any {
return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) >= 2 &&
len(c2.listTenantsWithMetaSyncDirectories()) > 0

// Before driving ownership-dependent compaction cycles, wait until BOTH
// compactors' ring views see two healthy ACTIVE instances (RingOp is the
// operation ownUser itself queries). c2's own view is already barriered by
// starting() — it waits until c2 is ACTIVE in its own view, and every KV
// snapshot containing c2 also contains the earlier-registered c1 — but c1's
// ring watcher ingests c2's registration asynchronously, and the final
// c1.compactUsers() cleanup below depends on c1's view.
cortex_testutil.Poll(t, 10*time.Second, true, func() any {
for _, c := range compactors {
rs, err := c.ring.GetAllHealthy(RingOp)
if err != nil || len(rs.Instances) != 2 {
return false
}
}
return true
})

// Run a compaction cycle on the second compactor: with pinned tokens it owns
// exactly half of the users and creates a meta-sync directory for each of them.
c2.compactUsers(context.Background())

// Let's check how many users second compactor has.
c2Users := len(c2.listTenantsWithMetaSyncDirectories())
require.Equal(t, numUsers/2, c2Users)

// Force new compaction cycle on first compactor. It will run the cleanup of un-owned users at the end of compaction cycle.
c1.compactUsers(context.Background())
c1Users := len(c1.listTenantsWithMetaSyncDirectories())

// Now compactor 1 should have cleaned old sync files.
require.NotEqual(t, numUsers, c1Users)
// Now compactor 1 should have cleaned the sync files of the users it no longer
// owns (including "new-user"), keeping exactly its own half.
require.Equal(t, numUsers-numUsers/2, c1Users)
require.Equal(t, numUsers, c1Users+c2Users)
}

Expand Down
Loading