feat: initial prototype
This commit is contained in:
commit
38731258ac
10 changed files with 968 additions and 0 deletions
138
read_batch.go
Normal file
138
read_batch.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package k2v
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type ReadBatchSearch struct {
|
||||
PartitionKey string `json:"partitionKey"`
|
||||
|
||||
// Prefix restricts listing to partition keys that start with this value.
|
||||
Prefix string `json:"prefix,omitempty"`
|
||||
|
||||
// Start is the first partition key to list, in lexicographical order.
|
||||
Start string `json:"start,omitempty"`
|
||||
|
||||
// End is the last partition key to list (excluded).
|
||||
End string `json:"end,omitempty"`
|
||||
|
||||
// Limit for maximum number of partition keys to list.
|
||||
Limit int `json:"limit,omitempty"`
|
||||
|
||||
// Reverse iterates in reverse lexicographical order.
|
||||
Reverse bool `json:"reverse,omitempty"`
|
||||
|
||||
// SingleItem determines whether to return only the item with sort key start.
|
||||
SingleItem bool `json:"singleItem,omitempty"`
|
||||
|
||||
// ConflictsOnly determines whether to return only items that have several concurrent values.
|
||||
ConflictsOnly bool `json:"conflictsOnly,omitempty"`
|
||||
|
||||
// Tombstones determines whether or not to return tombstone lines to indicate the presence of old deleted items.
|
||||
Tombstones bool `json:"tombstones,omitempty"`
|
||||
}
|
||||
|
||||
type BatchSearchResult struct {
|
||||
PartitionKey string `json:"partitionKey"`
|
||||
Prefix *string `json:"prefix"`
|
||||
Start *string `json:"start"`
|
||||
End *string `json:"end"`
|
||||
Limit *int `json:"limit"`
|
||||
Reverse bool `json:"reverse"`
|
||||
SingleItem bool `json:"singleItem"`
|
||||
ConflictsOnly bool `json:"conflictsOnly"`
|
||||
Tombstones bool `json:"tombstones"`
|
||||
Items []SearchResultItem `json:"items"`
|
||||
More bool `json:"more"`
|
||||
NextStart *string `json:"nextStart"`
|
||||
}
|
||||
|
||||
type SearchResultItem struct {
|
||||
SortKey string `json:"sk"`
|
||||
CausalityToken string `json:"ct"`
|
||||
Values []Item `json:"v"`
|
||||
}
|
||||
|
||||
func (c *Client) ReadBatch(ctx context.Context, b Bucket, q []ReadBatchSearch) ([]BatchSearchResult, error) {
|
||||
u, err := url.Parse(c.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = string(b)
|
||||
|
||||
reqBody, err := json.Marshal(q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "SEARCH", u.String(), bytes.NewReader(reqBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := c.executeRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("http status code %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
var items []BatchSearchResult
|
||||
if err := json.Unmarshal(body, &items); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return items, err
|
||||
}
|
||||
|
||||
type ItemKey struct {
|
||||
PartitionKey string
|
||||
SortKey string
|
||||
}
|
||||
|
||||
type BulkGetItem struct {
|
||||
PartitionKey string
|
||||
SortKey string
|
||||
CausalityToken CausalityToken
|
||||
Values []Item
|
||||
}
|
||||
|
||||
func BulkGet(ctx context.Context, cli *Client, b Bucket, keys []ItemKey) ([]BulkGetItem, error) {
|
||||
q := make([]ReadBatchSearch, len(keys))
|
||||
for i := range keys {
|
||||
q[i] = ReadBatchSearch{
|
||||
PartitionKey: keys[i].PartitionKey,
|
||||
Start: keys[i].SortKey,
|
||||
SingleItem: true,
|
||||
Tombstones: true,
|
||||
}
|
||||
}
|
||||
results, err := cli.ReadBatch(ctx, b, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make([]BulkGetItem, len(results))
|
||||
for i := range results {
|
||||
ret[i] = BulkGetItem{
|
||||
PartitionKey: results[i].PartitionKey,
|
||||
SortKey: results[i].Items[0].SortKey,
|
||||
CausalityToken: CausalityToken(results[i].Items[0].CausalityToken),
|
||||
Values: results[i].Items[0].Values,
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue