Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions crate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crate

import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
Expand All @@ -10,11 +11,25 @@ import (
"io"
"net/http"
"net/url"
"strings"
"time"
)

var (
// DefaultTimeout is the default timeout for CrateDriver if one is
// not specified.
DefaultTimeout = 10 * time.Second
)

// Crate conn structure
type CrateDriver struct {
Url string // Crate http endpoint url
URL string // Crate http endpoint url
http *http.Client
// Timeout is set with a URL query value.
// Example:
// sql.Open("crate", "http://127.0.0.1:4200/?timeout=1s")
// Makes the timeout on connections one second.
Timeout time.Duration
}

// Init a new "Connection" to a Crate Data Storage instance.
Expand All @@ -28,11 +43,33 @@ func (c *CrateDriver) Open(crate_url string) (driver.Conn, error) {

sanUrl := fmt.Sprintf("%s://%s", u.Scheme, u.Host)

c.Url = sanUrl
c.URL = sanUrl

c.Timeout = DefaultTimeout
if u.Query().Get("timeout") != "" {
c.Timeout, err = time.ParseDuration(u.Query().Get("timeout"))
if err != nil {
return nil, err
}
}
c.http = &http.Client{
Transport: http.DefaultTransport,
Timeout: c.Timeout,
}

return c, nil
}

// Ping checks if we can connect to the cluster by selecting its name
// from sys.cluster. If the select fails we return an error.
func (c *CrateDriver) Ping(ctx context.Context) error {
_, err := c.Exec("SELECT name FROM sys.cluster", nil)
if err != nil && strings.Contains(err.Error(), "dial tcp") {
return fmt.Errorf("Could not connect to crate at %s", c.URL)
}
return err
}

// JSON endpoint response struct
// We expect error to be null or ommited
type endpointResponse struct {
Expand All @@ -59,7 +96,7 @@ type endpointQuery struct {
// "Parameter Substitution" is also supported, read, https://crate.io/docs/stable/sql/rest.html#parameter-substitution
// This is the internal query function
func (c *CrateDriver) query(stmt string, args []driver.Value) (*endpointResponse, error) {
endpoint := c.Url + "/_sql?types"
endpoint := c.URL + "/_sql?types"

query := &endpointQuery{
Stmt: stmt,
Expand All @@ -77,7 +114,7 @@ func (c *CrateDriver) query(stmt string, args []driver.Value) (*endpointResponse

data := bytes.NewReader(buf)

resp, err := http.Post(endpoint, "application/json", data)
resp, err := c.http.Post(endpoint, "application/json", data)

if err != nil {
return nil, err
Expand Down