feat: add PollRange
This commit is contained in:
parent
6eb7e62a0c
commit
94bff8e8b7
3 changed files with 164 additions and 0 deletions
|
@ -33,6 +33,7 @@ type Client
|
||||||
func (c *Client) InsertBatch(ctx context.Context, b Bucket, items []BatchInsertItem) error
|
func (c *Client) InsertBatch(ctx context.Context, b Bucket, items []BatchInsertItem) error
|
||||||
func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error
|
func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error
|
||||||
func (c *Client) PollItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error)
|
func (c *Client) PollItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error)
|
||||||
|
func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error)
|
||||||
func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error)
|
func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error)
|
||||||
func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error)
|
func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error)
|
||||||
func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error)
|
func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error)
|
||||||
|
@ -63,6 +64,8 @@ This will repeatedly make calls to **ReadBatch** (batch search), using `nextStar
|
||||||
|
|
||||||
See `ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, query ReadIndexQuery, fn ReadIndexResponseHandler) error` for the equivalent for batch index reads.
|
See `ScrollIndex(ctx context.Context, client IndexScroller, b Bucket, query ReadIndexQuery, fn ReadIndexResponseHandler) error` for the equivalent for batch index reads.
|
||||||
|
|
||||||
|
No helper is available for `PollRange()` yet.
|
||||||
|
|
||||||
## Integration Tests
|
## Integration Tests
|
||||||
```shell
|
```shell
|
||||||
K2V_ENDPOINT="http://[::1]:3904" \
|
K2V_ENDPOINT="http://[::1]:3904" \
|
||||||
|
|
79
poll_batch.go
Normal file
79
poll_batch.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
package k2v
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PollRangeQuery struct {
|
||||||
|
// Prefix restricts items to poll to those whose sort keys start with this prefix.
|
||||||
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
|
||||||
|
// Start is the sort key of the first item to poll.
|
||||||
|
Start string `json:"start,omitempty"`
|
||||||
|
|
||||||
|
// End is the sort key of the last item to poll (excluded).
|
||||||
|
End string `json:"end,omitempty"`
|
||||||
|
|
||||||
|
// SeenMarker is an opaque string returned by a previous PollRange call, that represents items already seen.
|
||||||
|
SeenMarker string `json:"seenMarker,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PollRangeResponse struct {
|
||||||
|
SeenMarker string `json:"seenMarker"`
|
||||||
|
Items []SearchResultItem `json:"items"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) PollRange(ctx context.Context, b Bucket, pk string, q PollRangeQuery, timeout time.Duration) (*PollRangeResponse, error) {
|
||||||
|
u, err := url.Parse(c.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
u.Path = string(b) + "/" + pk
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Set("poll_range", "")
|
||||||
|
u.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
reqBody, err := json.Marshal(struct {
|
||||||
|
PollRangeQuery
|
||||||
|
Timeout int `json:"timeout,omitempty"`
|
||||||
|
}{
|
||||||
|
PollRangeQuery: q,
|
||||||
|
Timeout: int(timeout.Seconds()),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "SEARCH", u.String(), bytes.NewReader(reqBody))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.executeRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("http status code %d: %s", resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result PollRangeResponse
|
||||||
|
if err := json.Unmarshal(body, &result); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &result, nil
|
||||||
|
}
|
82
poll_batch_test.go
Normal file
82
poll_batch_test.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package k2v_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
k2v "code.notaphish.fyi/milas/garage-k2v-go"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"net/http/httptrace"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestClient_PollRange(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
f, ctx := newFixture(t)
|
||||||
|
|
||||||
|
pk := randomPk()
|
||||||
|
sk := randomSk()
|
||||||
|
|
||||||
|
for i := range 5 {
|
||||||
|
err := f.cli.InsertItem(ctx, f.bucket, pk, sk+"-"+strconv.Itoa(i), "", []byte("hello1"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// first read should complete immediately
|
||||||
|
q := k2v.PollRangeQuery{
|
||||||
|
Start: sk,
|
||||||
|
}
|
||||||
|
result, err := f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, result.SeenMarker)
|
||||||
|
require.Len(t, result.Items, 5)
|
||||||
|
for i := range result.Items {
|
||||||
|
require.Len(t, result.Items[i].Values, 1)
|
||||||
|
require.Equal(t, "hello1", string(result.Items[i].Values[0]))
|
||||||
|
}
|
||||||
|
|
||||||
|
updateErrCh := make(chan error, 1)
|
||||||
|
pollReadyCh := make(chan struct{})
|
||||||
|
go func(sk string, ct k2v.CausalityToken) {
|
||||||
|
defer close(updateErrCh)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Errorf("Context canceled: %v", ctx.Err())
|
||||||
|
return
|
||||||
|
case <-pollReadyCh:
|
||||||
|
t.Logf("PollRange connected")
|
||||||
|
}
|
||||||
|
updateErrCh <- f.cli.InsertItem(ctx, f.bucket, pk, sk, ct, []byte("hello2"))
|
||||||
|
}(result.Items[3].SortKey, k2v.CausalityToken(result.Items[3].CausalityToken))
|
||||||
|
|
||||||
|
trace := &httptrace.ClientTrace{
|
||||||
|
WroteRequest: func(_ httptrace.WroteRequestInfo) {
|
||||||
|
pollReadyCh <- struct{}{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
q.SeenMarker = result.SeenMarker
|
||||||
|
result, err = f.cli.PollRange(httptrace.WithClientTrace(ctx, trace), f.bucket, pk, q, 5*time.Second)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
require.NotEmpty(t, result.SeenMarker)
|
||||||
|
require.Len(t, result.Items, 1)
|
||||||
|
require.Len(t, result.Items[0].Values, 1)
|
||||||
|
require.Equal(t, sk+"-3", result.Items[0].SortKey)
|
||||||
|
require.Equal(t, "hello2", string(result.Items[0].Values[0]))
|
||||||
|
}
|
||||||
|
require.NoError(t, <-updateErrCh)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, f.cli.InsertItem(ctx, f.bucket, pk, result.Items[0].SortKey, k2v.CausalityToken(result.Items[0].CausalityToken), []byte("hello3")))
|
||||||
|
|
||||||
|
q.SeenMarker = result.SeenMarker
|
||||||
|
result, err = f.cli.PollRange(ctx, f.bucket, pk, q, 5*time.Second)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
require.NotEmpty(t, result.SeenMarker)
|
||||||
|
require.Len(t, result.Items, 1)
|
||||||
|
require.Len(t, result.Items[0].Values, 1)
|
||||||
|
require.Equal(t, sk+"-3", result.Items[0].SortKey)
|
||||||
|
require.Equal(t, "hello3", string(result.Items[0].Values[0]))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue