diff --git a/README.md b/README.md index ce850ec..e263ef8 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ type Client func (c *Client) InsertBatch(ctx context.Context, b Bucket, items []BatchInsertItem) error func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error func (c *Client) PollItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error) + func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error) func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error) func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error) @@ -63,6 +64,8 @@ This will repeatedly make calls to **ReadBatch** (batch search), using `nextStar See `ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, query ReadIndexQuery, fn ReadIndexResponseHandler) error` for the equivalent for batch index reads. +No helper is available for `PollRange()` yet. + ## Integration Tests ```shell K2V_ENDPOINT="http://[::1]:3904" \ diff --git a/poll_batch.go b/poll_batch.go new file mode 100644 index 0000000..c3ed20f --- /dev/null +++ b/poll_batch.go @@ -0,0 +1,79 @@ +package k2v + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" +) + +type PollRangeQuery struct { + // Prefix restricts items to poll to those whose sort keys start with this prefix. + Prefix string `json:"prefix,omitempty"` + + // Start is the sort key of the first item to poll. + Start string `json:"start,omitempty"` + + // End is the sort key of the last item to poll (excluded). + End string `json:"end,omitempty"` + + // SeenMarker is an opaque string returned by a previous PollRange call, that represents items already seen. + SeenMarker string `json:"seenMarker,omitempty"` +} + +type PollRangeResponse struct { + SeenMarker string `json:"seenMarker"` + Items []SearchResultItem `json:"items"` +} + +func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error) { + u, err := url.Parse(c.endpoint) + if err != nil { + return nil, err + } + u.Path = string(b) + "/" + pk + query := make(url.Values) + query.Set("poll_range", "") + u.RawQuery = query.Encode() + + reqBody, err := json.Marshal(struct { + PollRangeQuery + Timeout int `json:"timeout,omitempty"` + }{ + PollRangeQuery: q, + Timeout: int(timeout.Seconds()), + }) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "SEARCH", u.String(), bytes.NewReader(reqBody)) + if err != nil { + return nil, err + } + + resp, err := c.executeRequest(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("http status code %d: %s", resp.StatusCode, body) + } + + var result PollRangeResponse + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + return &result, nil +} diff --git a/poll_batch_test.go b/poll_batch_test.go new file mode 100644 index 0000000..546bede --- /dev/null +++ b/poll_batch_test.go @@ -0,0 +1,82 @@ +package k2v_test + +import ( + k2v "code.notaphish.fyi/milas/garage-k2v-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net/http/httptrace" + "strconv" + "testing" + "time" +) + +func TestClient_PollRange(t *testing.T) { + t.Parallel() + + f, ctx := newFixture(t) + + pk := randomPk() + sk := randomSk() + + for i := range 5 { + err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1")) + require.NoError(t, err) + } + + // first read should complete immediately + q := k2v.PollRangeQuery{ + Start: sk, + } + result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second) + require.NoError(t, err) + require.NotEmpty(t, result.SeenMarker) + require.Len(t, result.Items, 5) + for i := range result.Items { + require.Len(t, result.Items[i].Values, 1) + require.Equal(t, "hello1", string(result.Items[i].Values[0])) + } + + updateErrCh := make(chan error, 1) + pollReadyCh := make(chan struct{}) + go func(sk string, ct k2v.CausalityToken) { + defer close(updateErrCh) + select { + case <-ctx.Done(): + t.Errorf("Context canceled: %v", ctx.Err()) + return + case <-pollReadyCh: + t.Logf("PollRange connected") + } + updateErrCh <- f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2")) + }(result.Items[3].SortKey, k2v.CausalityToken(result.Items[3].CausalityToken)) + + trace := &httptrace.ClientTrace{ + WroteRequest: func(_ httptrace.WroteRequestInfo) { + pollReadyCh <- struct{}{} + }, + } + + q.SeenMarker = result.SeenMarker + result, err = f.cli.PollRange(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, q, 5*time.Second) + if assert.NoError(t, err) { + require.NotEmpty(t, result.SeenMarker) + require.Len(t, result.Items, 1) + require.Len(t, result.Items[0].Values, 1) + require.Equal(t, sk+"-3", result.Items[0].SortKey) + require.Equal(t, "hello2", string(result.Items[0].Values[0])) + } + require.NoError(t, <-updateErrCh) + + require.NoError(t, err) + require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk, result.Items[0].SortKey, k2v.CausalityToken(result.Items[0].CausalityToken), []byte("hello3"))) + + q.SeenMarker = result.SeenMarker + result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second) + if assert.NoError(t, err) { + require.NotEmpty(t, result.SeenMarker) + require.Len(t, result.Items, 1) + require.Len(t, result.Items[0].Values, 1) + require.Equal(t, sk+"-3", result.Items[0].SortKey) + require.Equal(t, "hello3", string(result.Items[0].Values[0])) + } +}