package k2v_test import ( "context" "math/rand/v2" "net/http/httptrace" "strconv" "testing" "time" k2v "code.notaphish.fyi/milas/garage-k2v-go" "github.com/stretchr/testify/require" "go.uber.org/goleak" ) type fixture struct { t testing.TB ctx context.Context cli *k2v.Client bucket k2v.Bucket } func newFixture(t testing.TB) (*fixture, context.Context) { t.Helper() t.Cleanup(func() { goleak.VerifyNone(t) }) ctx := testContext(t) cli := k2v.NewClient(k2v.EndpointFromEnv(), k2v.KeyFromEnv()) t.Cleanup(cli.Close) f := &fixture{ t: t, ctx: ctx, cli: cli, bucket: k2v.Bucket("k2v-test"), } return f, ctx } func testContext(t testing.TB) context.Context { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) return ctx } func randomKey() string { return "key-" + strconv.Itoa(rand.IntN(1000000)) } func TestClient_InsertItem(t *testing.T) { f, ctx := newFixture(t) err := f.cli.InsertItem(ctx, f.bucket, randomKey(), randomKey(), "", []byte("hello")) require.NoError(t, err) } func TestClient_ReadItemNotExist(t *testing.T) { f, ctx := newFixture(t) pk := randomKey() sk := randomKey() t.Run("Single", func(t *testing.T) { item, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) require.ErrorIs(t, err, k2v.NoSuchItemErr) require.Nil(t, item) require.Empty(t, ct) }) t.Run("Multi", func(t *testing.T) { items, ct, err := f.cli.ReadItemMulti(ctx, f.bucket, pk, sk) require.ErrorIs(t, err, k2v.NoSuchItemErr) require.Empty(t, items) require.Empty(t, ct) }) } func TestClient_ReadItemTombstone(t *testing.T) { f, ctx := newFixture(t) pk := randomKey() sk := randomKey() t.Logf("Creating item: PK=%s, SK=%s", pk, sk) err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello")) require.NoError(t, err) _, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) require.NoError(t, err) err = f.cli.DeleteItem(ctx, f.bucket, pk, sk, ct) require.NoError(t, err) t.Run("Single", func(t *testing.T) { item, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) require.ErrorIs(t, err, k2v.TombstoneItemErr) require.Nil(t, item) require.NotEmpty(t, ct) }) t.Run("Multi", func(t *testing.T) { items, ct, err := f.cli.ReadItemMulti(ctx, f.bucket, pk, sk) require.ErrorIs(t, err, k2v.TombstoneItemErr) require.Empty(t, items) require.NotEmpty(t, ct) }) } func TestClient_ReadItemSingleRevision(t *testing.T) { f, ctx := newFixture(t) pk := randomKey() sk := randomKey() err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello")) require.NoError(t, err) t.Run("Single", func(t *testing.T) { item, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) require.NoError(t, err) require.Equal(t, "hello", string(item)) require.NotEmpty(t, ct) }) t.Run("Multi", func(t *testing.T) { items, ct, err := f.cli.ReadItemMulti(ctx, f.bucket, pk, sk) require.NoError(t, err) require.Len(t, items, 1) require.Equal(t, "hello", string(items[0])) require.NotEmpty(t, ct) }) } func TestClient_ReadItemMultipleRevisions(t *testing.T) { f, ctx := newFixture(t) pk := randomKey() sk := randomKey() err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello1")) require.NoError(t, err) // don't use a continuation token to intentionally create 2x concurrent revisions err = f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello2")) require.NoError(t, err) t.Run("Single", func(t *testing.T) { item, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) require.ErrorIs(t, err, k2v.ConcurrentItemsErr) require.Nil(t, item) require.NotEmpty(t, ct) }) t.Run("Multi", func(t *testing.T) { items, ct, err := f.cli.ReadItemMulti(ctx, f.bucket, pk, sk) require.NoError(t, err) require.Len(t, items, 2) require.Equal(t, "hello1", string(items[0])) require.Equal(t, "hello2", string(items[1])) require.NotEmpty(t, ct) }) } func TestClient_PollItem(t *testing.T) { f, ctx := newFixture(t) pk := randomKey() sk := randomKey() err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello1")) require.NoError(t, err) _, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk) pollReadyCh := make(chan struct{}) go func(ct k2v.CausalityToken) { select { case <-ctx.Done(): t.Errorf("Context canceled: %v", ctx.Err()) return case <-pollReadyCh: t.Logf("PollItem connected") } err = f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2")) require.NoError(t, err) }(ct) trace := &httptrace.ClientTrace{ WroteRequest: func(_ httptrace.WroteRequestInfo) { pollReadyCh <- struct{}{} }, } item, ct, err := f.cli.PollItem(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, sk, ct, 5*time.Second) require.NoError(t, err) require.Equal(t, "hello2", string(item)) require.NotEmpty(t, ct) }