From 6eb7e62a0cda06a4d8c470e3d864251f473e8f5e Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Mon, 10 Mar 2025 21:31:07 -0400 Subject: [PATCH] feat: add scroll / iteration helpers --- README.md | 24 +++++++++++++ client.go | 10 +++--- pager.go | 77 ++++++++++++++++++++++++++++++++++++++++++ pager_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++ read_batch.go | 10 +++--- read_batch_test.go | 2 +- 6 files changed, 196 insertions(+), 11 deletions(-) create mode 100644 pager.go create mode 100644 pager_test.go diff --git a/README.md b/README.md index 127b6b2..ce850ec 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,30 @@ type Client func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error) ``` +## Scrolling (Client-side / Go API) +To handle iteration in the K2V API, helper functions for simple cases are provided. + +For example, to perform a bulk search: +```go +handleBatch := func(result *k2v.BatchSearchResult) error { + log.Println(result.Items) + return nil +} +err := k2v.ScrollBatchSearch(ctx, f.cli, f.bucket, []k2v.BatchSearch{ + { + PartitionKey: "pk1", + }, + { + PartitionKey: "pk2", + Limit: 1, + }, +}, handleBatch) +``` + +This will repeatedly make calls to **ReadBatch** (batch search), using `nextStart` from the responses to generate subsequent requests until all queries are exhausted. + +See `ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, query ReadIndexQuery, fn ReadIndexResponseHandler) error` for the equivalent for batch index reads. + ## Integration Tests ```shell K2V_ENDPOINT="http://[::1]:3904" \ diff --git a/client.go b/client.go index 8b63909..4bc6a1c 100644 --- a/client.go +++ b/client.go @@ -143,14 +143,14 @@ type ReadIndexResponsePartitionKey struct { } type ReadIndexResponse struct { - Prefix any `json:"prefix"` - Start any `json:"start"` - End any `json:"end"` - Limit any `json:"limit"` + Prefix *string `json:"prefix"` + Start *string `json:"start"` + End *string `json:"end"` + Limit *int `json:"limit"` Reverse bool `json:"reverse"` PartitionKeys []ReadIndexResponsePartitionKey `json:"partitionKeys"` More bool `json:"more"` - NextStart any `json:"nextStart"` + NextStart *string `json:"nextStart"` } func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) { diff --git a/pager.go b/pager.go new file mode 100644 index 0000000..645e4e7 --- /dev/null +++ b/pager.go @@ -0,0 +1,77 @@ +package k2v + +import ( + "context" + "errors" +) + +var StopScroll = errors.New("scroll canceled") + +// ReadIndexResponseHandler is invoked for each batch of index read results. +// +// If an error is returned, scrolling is halted and the error is propagated. +// The sentinel value StopScroll can be returned to end iteration early without propagating an error. +type ReadIndexResponseHandler func(resp *ReadIndexResponse) error + +type BatchSearchResultHandler func(result *BatchSearchResult) error + +type IndexScroller interface { + ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) +} + +var _ IndexScroller = &Client{} + +type BatchSearchScroller interface { + ReadBatch(ctx context.Context, b Bucket, q []BatchSearch) ([]*BatchSearchResult, error) +} + +var _ BatchSearchScroller = &Client{} + +// ScrollIndex calls the ReadIndex API serially, invoking the provided function for each response (batch) until there are no more results. +func ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, q ReadIndexQuery, fn ReadIndexResponseHandler) error { + for { + resp, err := client.ReadIndex(ctx, b, q) + if err != nil { + return err + } + if err := fn(resp); err != nil { + if errors.Is(err, StopScroll) { + return nil + } + return err + } + if !resp.More || resp.NextStart == nil { + break + } + q.Start = *resp.NextStart + } + return nil +} + +func ScrollBatchSearch(ctx context.Context, client BatchSearchScroller, b Bucket, q []BatchSearch, fn BatchSearchResultHandler) error { + for { + results, err := client.ReadBatch(ctx, b, q) + if err != nil { + return err + } + var nextQ []BatchSearch + for i := range results { + if results[i].More && results[i].NextStart != nil { + batch := q[i] + batch.Start = *results[i].NextStart + nextQ = append(nextQ, batch) + } + if err := fn(results[i]); err != nil { + if errors.Is(err, StopScroll) { + return nil + } + return err + } + } + if len(nextQ) == 0 { + break + } + q = nextQ + } + return nil +} diff --git a/pager_test.go b/pager_test.go new file mode 100644 index 0000000..5d5a661 --- /dev/null +++ b/pager_test.go @@ -0,0 +1,84 @@ +package k2v_test + +import ( + k2v "code.notaphish.fyi/milas/garage-k2v-go" + "context" + "fmt" + "github.com/stretchr/testify/require" + "strconv" + "strings" + "testing" +) + +func TestScrollIndex(t *testing.T) { + t.Parallel() + f, ctx := newFixture(t) + + pkPrefix := randomPk() + for i := range 5 { + require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pkPrefix+"-"+strconv.Itoa(i), randomSk(), "", []byte("hello"+strconv.Itoa(i)))) + } + + var responses []*k2v.ReadIndexResponse + err := k2v.ScrollIndex(ctx, f.cli, f.bucket, k2v.ReadIndexQuery{Prefix: pkPrefix, Limit: 1}, func(resp *k2v.ReadIndexResponse) error { + responses = append(responses, resp) + return nil + }) + require.NoError(t, err) + require.Len(t, responses, 5) +} + +func ExampleScrollIndex() { + ctx := context.Background() + client := k2v.NewClient(k2v.EndpointFromEnv(), k2v.KeyFromEnv()) + defer client.Close() + const bucket = "k2v-test" + + pkPrefix := randomPk() + for i := range 5 { + _ = client.InsertItem(ctx, bucket, pkPrefix+"-"+strconv.Itoa(i), randomSk(), "", []byte("hello")) + } + + var responses []*k2v.ReadIndexResponse + _ = k2v.ScrollIndex(ctx, client, bucket, k2v.ReadIndexQuery{Prefix: pkPrefix, Limit: 25}, func(resp *k2v.ReadIndexResponse) error { + responses = append(responses, resp) + return nil + }) + fmt.Println(len(responses[0].PartitionKeys)) + // Output: + // 5 +} + +func TestScrollItems(t *testing.T) { + t.Parallel() + f, ctx := newFixture(t) + + pk1 := randomKey("pk1") + sk1 := randomKey("sk1") + require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk1, sk1, "", []byte(strings.Join([]string{"hello", pk1, sk1}, "-")))) + + pk2 := randomKey("pk2") + for i := range 5 { + skN := randomKey(fmt.Sprintf("sk%d", i+2)) + require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk2, skN, "", []byte(strings.Join([]string{"hello", pk2, skN, strconv.Itoa(i)}, "-")))) + } + + q := []k2v.BatchSearch{ + { + PartitionKey: pk1, + }, + { + PartitionKey: pk2, + Limit: 1, + }, + } + + var results []*k2v.BatchSearchResult + err := k2v.ScrollBatchSearch(ctx, f.cli, f.bucket, q, func(result *k2v.BatchSearchResult) error { + results = append(results, result) + return nil + }) + require.NoError(t, err) + require.NotEmpty(t, results) + require.Len(t, results, 6) +} diff --git a/read_batch.go b/read_batch.go index 876c95c..2ff045c 100644 --- a/read_batch.go +++ b/read_batch.go @@ -10,7 +10,7 @@ import ( "net/url" ) -type ReadBatchSearch struct { +type BatchSearch struct { PartitionKey string `json:"partitionKey"` // Prefix restricts listing to partition keys that start with this value. @@ -59,7 +59,7 @@ type SearchResultItem struct { Values []Item `json:"v"` } -func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error) { +func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []BatchSearch) ([]*BatchSearchResult, error) { u, err := url.Parse(c.endpoint) if err != nil { return nil, err @@ -91,7 +91,7 @@ func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ( return nil, fmt.Errorf("http status code %d: %s", resp.StatusCode, body) } - var items []BatchSearchResult + var items []*BatchSearchResult if err := json.Unmarshal(body, &items); err != nil { return nil, err } @@ -112,9 +112,9 @@ type BulkGetItem struct { } func BulkGet(ctx context.Context, cli *Client, b Bucket, keys []ItemKey) ([]BulkGetItem, error) { - q := make([]ReadBatchSearch, len(keys)) + q := make([]BatchSearch, len(keys)) for i := range keys { - q[i] = ReadBatchSearch{ + q[i] = BatchSearch{ PartitionKey: keys[i].PartitionKey, Start: keys[i].SortKey, SingleItem: true, diff --git a/read_batch_test.go b/read_batch_test.go index cbd9ada..524fbac 100644 --- a/read_batch_test.go +++ b/read_batch_test.go @@ -30,7 +30,7 @@ func TestClient_ReadBatch(t *testing.T) { require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk3, skN, "", []byte(strings.Join([]string{"hello", pk3, skN, strconv.Itoa(i)}, "-")))) } - q := []k2v.ReadBatchSearch{ + q := []k2v.BatchSearch{ { PartitionKey: pk1, },