2025-03-09 21:47:10 -04:00
|
|
|
package k2v_test
|
|
|
|
|
|
|
|
import (
|
|
|
|
k2v "code.notaphish.fyi/milas/garage-k2v-go"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"net/http/httptrace"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestClient_PollItem(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
f, ctx := newFixture(t)
|
|
|
|
|
2025-03-09 22:28:16 -04:00
|
|
|
pk := randomPk()
|
|
|
|
sk := randomSk()
|
2025-03-09 21:47:10 -04:00
|
|
|
|
|
|
|
err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello1"))
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
_, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk)
|
|
|
|
|
|
|
|
updateErrCh := make(chan error, 1)
|
|
|
|
pollReadyCh := make(chan struct{})
|
|
|
|
go func(ct k2v.CausalityToken) {
|
|
|
|
defer close(updateErrCh)
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
t.Errorf("Context canceled: %v", ctx.Err())
|
|
|
|
return
|
|
|
|
case <-pollReadyCh:
|
|
|
|
t.Logf("PollItem connected")
|
|
|
|
}
|
|
|
|
updateErrCh <- f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2"))
|
|
|
|
}(ct)
|
|
|
|
|
|
|
|
trace := &httptrace.ClientTrace{
|
|
|
|
WroteRequest: func(_ httptrace.WroteRequestInfo) {
|
|
|
|
pollReadyCh <- struct{}{}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
item, ct, err := f.cli.PollItem(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, sk, ct, 5*time.Second)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, "hello2", string(item))
|
|
|
|
require.NotEmpty(t, ct)
|
|
|
|
require.NoError(t, <-updateErrCh)
|
|
|
|
}
|
2025-03-10 22:54:48 -04:00
|
|
|
|
|
|
|
func TestClient_PollItem_Timeout(t *testing.T) {
|
|
|
|
if testing.Short() {
|
|
|
|
t.Skip("Skipping in short mode: 1 sec minimum to trigger timeout")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
f, ctx := newFixture(t)
|
|
|
|
|
|
|
|
pk := randomPk()
|
|
|
|
sk := randomSk()
|
|
|
|
|
|
|
|
err := f.cli.InsertItem(ctx, f.bucket, pk, sk, "", []byte("hello1"))
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
_, ct, err := f.cli.ReadItemSingle(ctx, f.bucket, pk, sk)
|
|
|
|
item, ct, err := f.cli.PollItem(ctx, f.bucket, pk, sk, ct, 1*time.Second)
|
|
|
|
require.ErrorIs(t, err, k2v.NotModifiedTimeoutErr)
|
|
|
|
require.Empty(t, item)
|
|
|
|
}
|