package k2v_test import ( k2v "code.notaphish.fyi/milas/garage-k2v-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "net/http/httptrace" "strconv" "testing" "time" ) func TestClient_PollRange(t *testing.T) { t.Parallel() f, ctx := newFixture(t) pk := randomPk() sk := randomSk() for i := range 5 { err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1")) require.NoError(t, err) } // first read should complete immediately q := k2v.PollRangeQuery{ Start: sk, } result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second) require.NoError(t, err) require.NotEmpty(t, result.SeenMarker) require.Len(t, result.Items, 5) for i := range result.Items { require.Len(t, result.Items[i].Values, 1) require.Equal(t, "hello1", string(result.Items[i].Values[0])) } updateErrCh := make(chan error, 1) pollReadyCh := make(chan struct{}) go func(sk string, ct k2v.CausalityToken) { defer close(updateErrCh) select { case <-ctx.Done(): t.Errorf("Context canceled: %v", ctx.Err()) return case <-pollReadyCh: t.Logf("PollRange connected") } updateErrCh <- f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2")) }(result.Items[3].SortKey, k2v.CausalityToken(result.Items[3].CausalityToken)) trace := &httptrace.ClientTrace{ WroteRequest: func(_ httptrace.WroteRequestInfo) { pollReadyCh <- struct{}{} }, } q.SeenMarker = result.SeenMarker result, err = f.cli.PollRange(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, q, 5*time.Second) if assert.NoError(t, err) { require.NotEmpty(t, result.SeenMarker) require.Len(t, result.Items, 1) require.Len(t, result.Items[0].Values, 1) require.Equal(t, sk+"-3", result.Items[0].SortKey) require.Equal(t, "hello2", string(result.Items[0].Values[0])) } require.NoError(t, <-updateErrCh) require.NoError(t, err) require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk, result.Items[0].SortKey, k2v.CausalityToken(result.Items[0].CausalityToken), []byte("hello3"))) q.SeenMarker = result.SeenMarker result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second) if assert.NoError(t, err) { require.NotEmpty(t, result.SeenMarker) require.Len(t, result.Items, 1) require.Len(t, result.Items[0].Values, 1) require.Equal(t, sk+"-3", result.Items[0].SortKey) require.Equal(t, "hello3", string(result.Items[0].Values[0])) } } func TestClient_PollRange_Timeout(t *testing.T) { if testing.Short() { t.Skip("Skipping in short mode: 1 sec minimum to trigger timeout") return } t.Parallel() f, ctx := newFixture(t) pk := randomPk() sk := randomSk() for i := range 5 { err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1")) require.NoError(t, err) } // first read should complete immediately q := k2v.PollRangeQuery{ Start: sk, } result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second) require.NoError(t, err) require.NotEmpty(t, result.SeenMarker) require.Len(t, result.Items, 5) for i := range result.Items { require.Len(t, result.Items[i].Values, 1) require.Equal(t, "hello1", string(result.Items[i].Values[0])) } q.SeenMarker = result.SeenMarker result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 1*time.Second) require.ErrorIs(t, err, k2v.NotModifiedTimeoutErr) }