garage-k2v-go/client.go

384 lines
8.8 KiB
Go

package k2v
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
)
const CausalityTokenHeader = "X-Garage-Causality-Token"
var TombstoneItemErr = errors.New("item is a tombstone")
var NoSuchItemErr = errors.New("item does not exist")
var ConcurrentItemsErr = errors.New("item has multiple concurrent values")
var awsSigner = v4.NewSigner()
type Bucket string
type CausalityToken string
type Item []byte
func (i Item) GoString() string {
return string(i)
}
type Key struct {
ID string
Secret string
}
type Client struct {
key Key
endpoint string
httpClient *http.Client
middleware []RequestMiddleware
}
type ClientOption func(*Client)
type RequestMiddleware func(*http.Request) error
func WithHTTPClient(httpClient *http.Client) ClientOption {
return func(c *Client) {
c.httpClient = httpClient
}
}
func WithRequestMiddleware(middleware ...RequestMiddleware) ClientOption {
return func(c *Client) {
c.middleware = append(c.middleware, middleware...)
}
}
func NewClient(endpoint string, key Key, opts ...ClientOption) *Client {
cli := &Client{
endpoint: endpoint,
key: key,
httpClient: http.DefaultClient,
}
for _, opt := range opts {
opt(cli)
}
return cli
}
func (c *Client) Clone(opts ...ClientOption) *Client {
cli := *c
for _, opt := range opts {
opt(&cli)
}
return &cli
}
func (c *Client) Close() {
c.httpClient.CloseIdleConnections()
}
func (c *Client) executeRequest(req *http.Request) (*http.Response, error) {
if err := c.signRequest(req); err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// caller is responsible for closing body
return resp, nil
}
func (c *Client) signRequest(req *http.Request) error {
creds := aws.Credentials{
AccessKeyID: c.key.ID,
SecretAccessKey: c.key.Secret,
}
const noBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
req.Header.Set("X-Amz-Content-Sha256", noBody)
err := awsSigner.SignHTTP(req.Context(), creds, req, noBody, "k2v", "garage", time.Now())
if err != nil {
return err
}
return nil
}
type ReadIndexQuery struct {
// Prefix restricts listing to partition keys that start with this value.
Prefix string
// Start is the first partition key to list, in lexicographical order.
Start string
// End is the last partition key to list (excluded).
End string
// Limit for maximum number of partition keys to list.
Limit int
// Reverse iterates in reverse lexicographical order.
Reverse bool
}
type ReadIndexResponsePartitionKey struct {
PK string `json:"pk"`
Entries int `json:"entries"`
Conflicts int `json:"conflicts"`
Values int `json:"values"`
Bytes int `json:"bytes"`
}
type ReadIndexResponse struct {
Prefix any `json:"prefix"`
Start any `json:"start"`
End any `json:"end"`
Limit any `json:"limit"`
Reverse bool `json:"reverse"`
PartitionKeys []ReadIndexResponsePartitionKey `json:"partitionKeys"`
More bool `json:"more"`
NextStart any `json:"nextStart"`
}
func (c *Client) ReadIndex(ctx context.Context, b Bucket, q ReadIndexQuery) (*ReadIndexResponse, error) {
u, err := url.Parse(c.endpoint)
if err != nil {
return nil, err
}
u.Path = string(b)
query := make(url.Values)
if q.Prefix != "" {
query.Set("prefix", q.Prefix)
}
if q.Start != "" {
query.Set("start", q.Start)
}
if q.End != "" {
query.Set("end", q.End)
}
if q.Limit > 0 {
query.Set("limit", strconv.Itoa(q.Limit))
}
if q.Reverse {
query.Set("reverse", "true")
}
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
resp, err := c.executeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ret ReadIndexResponse
if err := json.Unmarshal(body, &ret); err != nil {
return nil, err
}
return &ret, nil
}
func (c *Client) ReadItemMulti(ctx context.Context, b Bucket, pk string, sk string) ([]Item, CausalityToken, error) {
u, err := url.Parse(c.endpoint)
if err != nil {
return nil, "", err
}
u.Path = string(b) + "/" + pk
query := make(url.Values)
query.Set("sort_key", sk)
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, "", err
}
req.Header.Set("Accept", strings.Join([]string{"application/octet-stream", "application/json"}, ","))
resp, err := c.executeRequest(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
ct := CausalityToken(resp.Header.Get("X-Garage-Causality-Token"))
switch resp.StatusCode {
case http.StatusOK:
break
case http.StatusNoContent:
return nil, ct, TombstoneItemErr
case http.StatusNotFound:
return nil, "", NoSuchItemErr
default:
return nil, "", fmt.Errorf("http status code %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, "", err
}
switch resp.Header.Get("Content-Type") {
case "application/octet-stream":
// single item, return as-is
return []Item{body}, ct, nil
case "application/json":
var items []Item
if err != nil {
return nil, "", err
}
if err := json.Unmarshal(body, &items); err != nil {
return nil, "", err
}
return items, ct, nil
default:
return nil, "", fmt.Errorf("unsupported content-type: %s", resp.Header.Get("Content-Type"))
}
}
func (c *Client) ReadItemSingle(ctx context.Context, b Bucket, pk string, sk string) (Item, CausalityToken, error) {
return c.readItemSingle(ctx, b, pk, sk, "", 0)
}
func (c *Client) readItemSingle(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, timeout time.Duration) (Item, CausalityToken, error) {
u, err := url.Parse(c.endpoint)
if err != nil {
return nil, "", err
}
u.Path = string(b) + "/" + pk
query := make(url.Values)
query.Set("sort_key", sk)
if ct != "" && timeout > 0 {
query.Set("causality_token", string(ct))
query.Set("timeout", strconv.Itoa(int(timeout.Seconds())))
}
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, "", err
}
req.Header.Set("Accept", "application/octet-stream")
resp, err := c.executeRequest(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
ct = CausalityToken(resp.Header.Get("X-Garage-Causality-Token"))
switch resp.StatusCode {
case http.StatusOK:
break
case http.StatusNoContent:
return nil, ct, TombstoneItemErr
case http.StatusNotFound:
return nil, "", NoSuchItemErr
case http.StatusConflict:
return nil, ct, ConcurrentItemsErr
default:
return nil, "", fmt.Errorf("http status code %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, "", err
}
return body, ct, nil
}
func (c *Client) DeleteItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken) error {
if ct == "" {
return errors.New("continuity token is required for delete")
}
u, err := url.Parse(c.endpoint)
if err != nil {
return err
}
u.Path = string(b) + "/" + pk
query := make(url.Values)
query.Set("sort_key", sk)
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil)
if err != nil {
return err
}
req.Header.Set(CausalityTokenHeader, string(ct))
resp, err := c.executeRequest(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
}
return nil
}
func (c *Client) InsertItem(ctx context.Context, b Bucket, pk string, sk string, ct CausalityToken, item []byte) error {
u, err := url.Parse(c.endpoint)
if err != nil {
return err
}
u.Path = string(b) + "/" + pk
query := make(url.Values)
query.Set("sort_key", sk)
u.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), bytes.NewReader(item))
if err != nil {
return err
}
if ct != "" {
req.Header.Set("X-Garage-Causality-Token", string(ct))
}
resp, err := c.executeRequest(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
}
return nil
}