387 lines
8.9 KiB
Go
387 lines
8.9 KiB
Go
package k2v
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
|
)
|
|
|
|
const CausalityTokenHeader = "X-Garage-Causality-Token"
|
|
|
|
var TombstoneItemErr = errors.New("item is a tombstone")
|
|
var NoSuchItemErr = errors.New("item does not exist")
|
|
var ConcurrentItemsErr = errors.New("item has multiple concurrent values")
|
|
var NotModifiedTimeoutErr = errors.New("not modified within timeout")
|
|
|
|
var awsSigner = v4.NewSigner()
|
|
|
|
type Bucket string
|
|
|
|
type CausalityToken string
|
|
|
|
type Item []byte
|
|
|
|
func (i Item) GoString() string {
|
|
return string(i)
|
|
}
|
|
|
|
type Key struct {
|
|
ID string
|
|
Secret string
|
|
}
|
|
|
|
type Client struct {
|
|
key Key
|
|
endpoint string
|
|
httpClient *http.Client
|
|
middleware []RequestMiddleware
|
|
}
|
|
|
|
type ClientOption func(*Client)
|
|
|
|
type RequestMiddleware func(*http.Request) error
|
|
|
|
func WithHTTPClient(httpClient *http.Client) ClientOption {
|
|
return func(c *Client) {
|
|
c.httpClient = httpClient
|
|
}
|
|
}
|
|
|
|
func WithRequestMiddleware(middleware ...RequestMiddleware) ClientOption {
|
|
return func(c *Client) {
|
|
c.middleware = append(c.middleware, middleware...)
|
|
}
|
|
}
|
|
|
|
func NewClient(endpoint string, key Key, opts ...ClientOption) *Client {
|
|
cli := &Client{
|
|
endpoint: endpoint,
|
|
key: key,
|
|
httpClient: http.DefaultClient,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(cli)
|
|
}
|
|
|
|
return cli
|
|
}
|
|
|
|
func (c *Client) Clone(opts ...ClientOption) *Client {
|
|
cli := *c
|
|
for _, opt := range opts {
|
|
opt(&cli)
|
|
}
|
|
return &cli
|
|
}
|
|
|
|
func (c *Client) Close() {
|
|
c.httpClient.CloseIdleConnections()
|
|
}
|
|
|
|
func (c *Client) executeRequest(req *http.Request) (*http.Response, error) {
|
|
if err := c.signRequest(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// caller is responsible for closing body
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Client) signRequest(req *http.Request) error {
|
|
creds := aws.Credentials{
|
|
AccessKeyID: c.key.ID,
|
|
SecretAccessKey: c.key.Secret,
|
|
}
|
|
const noBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
|
req.Header.Set("X-Amz-Content-Sha256", noBody)
|
|
|
|
err := awsSigner.SignHTTP(req.Context(), creds, req, noBody, "k2v", "garage", time.Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type ReadIndexQuery struct {
|
|
// Prefix restricts listing to partition keys that start with this value.
|
|
Prefix string
|
|
|
|
// Start is the first partition key to list, in lexicographical order.
|
|
Start string
|
|
|
|
// End is the last partition key to list (excluded).
|
|
End string
|
|
|
|
// Limit for maximum number of partition keys to list.
|
|
Limit int
|
|
|
|
// Reverse iterates in reverse lexicographical order.
|
|
Reverse bool
|
|
}
|
|
|
|
type ReadIndexResponsePartitionKey struct {
|
|
PK string `json:"pk"`
|
|
Entries int `json:"entries"`
|
|
Conflicts int `json:"conflicts"`
|
|
Values int `json:"values"`
|
|
Bytes int `json:"bytes"`
|
|
}
|
|
|
|
type ReadIndexResponse struct {
|
|
Prefix *string `json:"prefix"`
|
|
Start *string `json:"start"`
|
|
End *string `json:"end"`
|
|
Limit *int `json:"limit"`
|
|
Reverse bool `json:"reverse"`
|
|
PartitionKeys []ReadIndexResponsePartitionKey `json:"partitionKeys"`
|
|
More bool `json:"more"`
|
|
NextStart *string `json:"nextStart"`
|
|
}
|
|
|
|
func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) {
|
|
u, err := url.Parse(c.endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
u.Path = string(b)
|
|
|
|
query := make(url.Values)
|
|
if q.Prefix != "" {
|
|
query.Set("prefix", q.Prefix)
|
|
}
|
|
if q.Start != "" {
|
|
query.Set("start", q.Start)
|
|
}
|
|
if q.End != "" {
|
|
query.Set("end", q.End)
|
|
}
|
|
if q.Limit > 0 {
|
|
query.Set("limit", strconv.Itoa(q.Limit))
|
|
}
|
|
if q.Reverse {
|
|
query.Set("reverse", "true")
|
|
}
|
|
u.RawQuery = query.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.executeRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var ret ReadIndexResponse
|
|
if err := json.Unmarshal(body, &ret); err != nil {
|
|
return nil, err
|
|
}
|
|
return &ret, nil
|
|
}
|
|
|
|
func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error) {
|
|
u, err := url.Parse(c.endpoint)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
u.Path = string(b) + "/" + pk
|
|
query := make(url.Values)
|
|
query.Set("sort_key", sk)
|
|
u.RawQuery = query.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
req.Header.Set("Accept", strings.Join([]string{"application/octet-stream", "application/json"}, ","))
|
|
|
|
resp, err := c.executeRequest(req)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
ct := CausalityToken(resp.Header.Get("X-Garage-Causality-Token"))
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusOK:
|
|
break
|
|
case http.StatusNoContent:
|
|
return nil, ct, TombstoneItemErr
|
|
case http.StatusNotFound:
|
|
return nil, "", NoSuchItemErr
|
|
default:
|
|
return nil, "", fmt.Errorf("http status code %d", resp.StatusCode)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
|
|
switch resp.Header.Get("Content-Type") {
|
|
case "application/octet-stream":
|
|
// single item, return as-is
|
|
return []Item{body}, ct, nil
|
|
case "application/json":
|
|
var items []Item
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
if err := json.Unmarshal(body, &items); err != nil {
|
|
return nil, "", err
|
|
}
|
|
return items, ct, nil
|
|
default:
|
|
return nil, "", fmt.Errorf("unsupported content-type: %s", resp.Header.Get("Content-Type"))
|
|
}
|
|
}
|
|
|
|
func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error) {
|
|
return c.readItemSingle(ctx, b, pk, sk, "", 0)
|
|
}
|
|
|
|
func (c *Client) readItemSingle(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error) {
|
|
u, err := url.Parse(c.endpoint)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
u.Path = string(b) + "/" + pk
|
|
query := make(url.Values)
|
|
query.Set("sort_key", sk)
|
|
if ct != "" && timeout > 0 {
|
|
query.Set("causality_token", string(ct))
|
|
query.Set("timeout", strconv.Itoa(int(timeout.Seconds())))
|
|
}
|
|
u.RawQuery = query.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
req.Header.Set("Accept", "application/octet-stream")
|
|
|
|
resp, err := c.executeRequest(req)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
ct = CausalityToken(resp.Header.Get("X-Garage-Causality-Token"))
|
|
|
|
switch resp.StatusCode {
|
|
case http.StatusOK:
|
|
break
|
|
case http.StatusNoContent:
|
|
return nil, ct, TombstoneItemErr
|
|
case http.StatusNotFound:
|
|
return nil, "", NoSuchItemErr
|
|
case http.StatusConflict:
|
|
return nil, ct, ConcurrentItemsErr
|
|
case http.StatusNotModified:
|
|
return nil, "", NotModifiedTimeoutErr
|
|
default:
|
|
return nil, "", fmt.Errorf("http status code %d", resp.StatusCode)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
return body, ct, nil
|
|
}
|
|
|
|
func (c *Client) DeleteItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken) error {
|
|
if ct == "" {
|
|
return errors.New("continuity token is required for delete")
|
|
}
|
|
|
|
u, err := url.Parse(c.endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
u.Path = string(b) + "/" + pk
|
|
query := make(url.Values)
|
|
query.Set("sort_key", sk)
|
|
u.RawQuery = query.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set(CausalityTokenHeader, string(ct))
|
|
|
|
resp, err := c.executeRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusNoContent {
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error {
|
|
u, err := url.Parse(c.endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
u.Path = string(b) + "/" + pk
|
|
query := make(url.Values)
|
|
query.Set("sort_key", sk)
|
|
u.RawQuery = query.Encode()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), bytes.NewReader(item))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ct != "" {
|
|
req.Header.Set("X-Garage-Causality-Token", string(ct))
|
|
}
|
|
|
|
resp, err := c.executeRequest(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusNoContent {
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|