garage-k2v-go/poll_batch_test.go

118 lines
3.3 KiB
Go
Raw Normal View History

2025-03-10 22:29:53 -04:00
package k2v_test
import (
k2v "code.notaphish.fyi/milas/garage-k2v-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http/httptrace"
"strconv"
"testing"
"time"
)
func TestClient_PollRange(t *testing.T) {
t.Parallel()
f, ctx := newFixture(t)
pk := randomPk()
sk := randomSk()
for i := range 5 {
err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1"))
require.NoError(t, err)
}
// first read should complete immediately
q := k2v.PollRangeQuery{
Start: sk,
}
result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second)
require.NoError(t, err)
require.NotEmpty(t, result.SeenMarker)
require.Len(t, result.Items, 5)
for i := range result.Items {
require.Len(t, result.Items[i].Values, 1)
require.Equal(t, "hello1", string(result.Items[i].Values[0]))
}
updateErrCh := make(chan error, 1)
pollReadyCh := make(chan struct{})
go func(sk string, ct k2v.CausalityToken) {
defer close(updateErrCh)
select {
case <-ctx.Done():
t.Errorf("Context canceled: %v", ctx.Err())
return
case <-pollReadyCh:
t.Logf("PollRange connected")
}
updateErrCh <- f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2"))
}(result.Items[3].SortKey, k2v.CausalityToken(result.Items[3].CausalityToken))
trace := &httptrace.ClientTrace{
WroteRequest: func(_ httptrace.WroteRequestInfo) {
pollReadyCh <- struct{}{}
},
}
q.SeenMarker = result.SeenMarker
result, err = f.cli.PollRange(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, q, 5*time.Second)
if assert.NoError(t, err) {
require.NotEmpty(t, result.SeenMarker)
require.Len(t, result.Items, 1)
require.Len(t, result.Items[0].Values, 1)
require.Equal(t, sk+"-3", result.Items[0].SortKey)
require.Equal(t, "hello2", string(result.Items[0].Values[0]))
}
require.NoError(t, <-updateErrCh)
require.NoError(t, err)
require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk, result.Items[0].SortKey, k2v.CausalityToken(result.Items[0].CausalityToken), []byte("hello3")))
q.SeenMarker = result.SeenMarker
result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second)
if assert.NoError(t, err) {
require.NotEmpty(t, result.SeenMarker)
require.Len(t, result.Items, 1)
require.Len(t, result.Items[0].Values, 1)
require.Equal(t, sk+"-3", result.Items[0].SortKey)
require.Equal(t, "hello3", string(result.Items[0].Values[0]))
}
}
func TestClient_PollRange_Timeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode: 1 sec minimum to trigger timeout")
return
}
t.Parallel()
f, ctx := newFixture(t)
pk := randomPk()
sk := randomSk()
for i := range 5 {
err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1"))
require.NoError(t, err)
}
// first read should complete immediately
q := k2v.PollRangeQuery{
Start: sk,
}
result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second)
require.NoError(t, err)
require.NotEmpty(t, result.SeenMarker)
require.Len(t, result.Items, 5)
for i := range result.Items {
require.Len(t, result.Items[i].Values, 1)
require.Equal(t, "hello1", string(result.Items[i].Values[0]))
}
q.SeenMarker = result.SeenMarker
result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 1*time.Second)
require.ErrorIs(t, err, k2v.NotModifiedTimeoutErr)
}