package k2v import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/url" "strconv" "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" ) const CausalityTokenHeader = "X-Garage-Causality-Token" var TombstoneItemErr = errors.New("item is a tombstone") var NoSuchItemErr = errors.New("item does not exist") var ConcurrentItemsErr = errors.New("item has multiple concurrent values") var awsSigner = v4.NewSigner() type Bucket string type CausalityToken string type Item []byte func (i Item) GoString() string { return string(i) } type Key struct { ID string Secret string } type Client struct { key Key endpoint string httpClient *http.Client middleware []RequestMiddleware } type ClientOption func(*Client) type RequestMiddleware func(*http.Request) error func WithHTTPClient(httpClient *http.Client) ClientOption { return func(c *Client) { c.httpClient = httpClient } } func WithRequestMiddleware(middleware ...RequestMiddleware) ClientOption { return func(c *Client) { c.middleware = append(c.middleware, middleware...) } } func NewClient(endpoint string, key Key, opts ...ClientOption) *Client { cli := &Client{ endpoint: endpoint, key: key, httpClient: http.DefaultClient, } for _, opt := range opts { opt(cli) } return cli } func (c *Client) Clone(opts ...ClientOption) *Client { cli := *c for _, opt := range opts { opt(&cli) } return &cli } func (c *Client) Close() { c.httpClient.CloseIdleConnections() } func (c *Client) executeRequest(req *http.Request) (*http.Response, error) { if err := c.signRequest(req); err != nil { return nil, err } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } // caller is responsible for closing body return resp, nil } func (c *Client) signRequest(req *http.Request) error { creds := aws.Credentials{ AccessKeyID: c.key.ID, SecretAccessKey: c.key.Secret, } const noBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" req.Header.Set("X-Amz-Content-Sha256", noBody) err := awsSigner.SignHTTP(req.Context(), creds, req, noBody, "k2v", "garage", time.Now()) if err != nil { return err } return nil } type ReadIndexQuery struct { // Prefix restricts listing to partition keys that start with this value. Prefix string // Start is the first partition key to list, in lexicographical order. Start string // End is the last partition key to list (excluded). End string // Limit for maximum number of partition keys to list. Limit int // Reverse iterates in reverse lexicographical order. Reverse bool } type ReadIndexResponsePartitionKey struct { PK string `json:"pk"` Entries int `json:"entries"` Conflicts int `json:"conflicts"` Values int `json:"values"` Bytes int `json:"bytes"` } type ReadIndexResponse struct { Prefix *string `json:"prefix"` Start *string `json:"start"` End *string `json:"end"` Limit *int `json:"limit"` Reverse bool `json:"reverse"` PartitionKeys []ReadIndexResponsePartitionKey `json:"partitionKeys"` More bool `json:"more"` NextStart *string `json:"nextStart"` } func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) { u, err := url.Parse(c.endpoint) if err != nil { return nil, err } u.Path = string(b) query := make(url.Values) if q.Prefix != "" { query.Set("prefix", q.Prefix) } if q.Start != "" { query.Set("start", q.Start) } if q.End != "" { query.Set("end", q.End) } if q.Limit > 0 { query.Set("limit", strconv.Itoa(q.Limit)) } if q.Reverse { query.Set("reverse", "true") } u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, err } resp, err := c.executeRequest(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } var ret ReadIndexResponse if err := json.Unmarshal(body, &ret); err != nil { return nil, err } return &ret, nil } func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error) { u, err := url.Parse(c.endpoint) if err != nil { return nil, "", err } u.Path = string(b) + "/" + pk query := make(url.Values) query.Set("sort_key", sk) u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, "", err } req.Header.Set("Accept", strings.Join([]string{"application/octet-stream", "application/json"}, ",")) resp, err := c.executeRequest(req) if err != nil { return nil, "", err } defer resp.Body.Close() ct := CausalityToken(resp.Header.Get("X-Garage-Causality-Token")) switch resp.StatusCode { case http.StatusOK: break case http.StatusNoContent: return nil, ct, TombstoneItemErr case http.StatusNotFound: return nil, "", NoSuchItemErr default: return nil, "", fmt.Errorf("http status code %d", resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, "", err } switch resp.Header.Get("Content-Type") { case "application/octet-stream": // single item, return as-is return []Item{body}, ct, nil case "application/json": var items []Item if err != nil { return nil, "", err } if err := json.Unmarshal(body, &items); err != nil { return nil, "", err } return items, ct, nil default: return nil, "", fmt.Errorf("unsupported content-type: %s", resp.Header.Get("Content-Type")) } } func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error) { return c.readItemSingle(ctx, b, pk, sk, "", 0) } func (c *Client) readItemSingle(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error) { u, err := url.Parse(c.endpoint) if err != nil { return nil, "", err } u.Path = string(b) + "/" + pk query := make(url.Values) query.Set("sort_key", sk) if ct != "" && timeout > 0 { query.Set("causality_token", string(ct)) query.Set("timeout", strconv.Itoa(int(timeout.Seconds()))) } u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, "", err } req.Header.Set("Accept", "application/octet-stream") resp, err := c.executeRequest(req) if err != nil { return nil, "", err } defer resp.Body.Close() ct = CausalityToken(resp.Header.Get("X-Garage-Causality-Token")) switch resp.StatusCode { case http.StatusOK: break case http.StatusNoContent: return nil, ct, TombstoneItemErr case http.StatusNotFound: return nil, "", NoSuchItemErr case http.StatusConflict: return nil, ct, ConcurrentItemsErr default: return nil, "", fmt.Errorf("http status code %d", resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, "", err } return body, ct, nil } func (c *Client) DeleteItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken) error { if ct == "" { return errors.New("continuity token is required for delete") } u, err := url.Parse(c.endpoint) if err != nil { return err } u.Path = string(b) + "/" + pk query := make(url.Values) query.Set("sort_key", sk) u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil) if err != nil { return err } req.Header.Set(CausalityTokenHeader, string(ct)) resp, err := c.executeRequest(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusNoContent { body, err := io.ReadAll(resp.Body) if err != nil { return err } return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body)) } return nil } func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error { u, err := url.Parse(c.endpoint) if err != nil { return err } u.Path = string(b) + "/" + pk query := make(url.Values) query.Set("sort_key", sk) u.RawQuery = query.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), bytes.NewReader(item)) if err != nil { return err } if ct != "" { req.Header.Set("X-Garage-Causality-Token", string(ct)) } resp, err := c.executeRequest(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusNoContent { body, err := io.ReadAll(resp.Body) if err != nil { return err } return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body)) } return nil }