datastore

package module
v2.44.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 45 Imported by: 0

README

Datastore

Golang SQL database client for ClickHouse.

Key features

Support for the ClickHouse protocol advanced features using Context:

  • Query ID
  • Quota Key
  • Settings
  • Query parameters
  • OpenTelemetry
  • Execution events:
    • Logs
    • Progress
    • Profile info
    • Profile events

Install

go get -u github.com/hanzoai/datastore-go

datastore interface (native interface)

conn, err := datastore.Open(&datastore.Options{
	Addr: []string{"127.0.0.1:9000"},
	Auth: datastore.Auth{
		Database: "default",
		Username: "default",
		Password: "",
	},
	DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
		dialCount++
		var d net.Dialer
		return d.DialContext(ctx, "tcp", addr)
	},
	Debug: true,
	Debugf: func(format string, v ...any) {
		fmt.Printf(format+"\n", v...)
	},
	Settings: datastore.Settings{
		"max_execution_time": 60,
	},
	Compression: &datastore.Compression{
		Method: datastore.CompressionLZ4,
	},
	DialTimeout:      time.Second * 30,
	MaxOpenConns:     5,
	MaxIdleConns:     5,
	ConnMaxLifetime:  time.Duration(10) * time.Minute,
	ConnOpenStrategy: datastore.ConnOpenInOrder,
	BlockBufferSize: 10,
	MaxCompressionBuffer: 10240,
	ClientInfo: datastore.ClientInfo{
		Products: []struct {
			Name    string
			Version string
		}{
			{Name: "my-app", Version: "0.1"},
		},
	},
})
if err != nil {
	return err
}
return conn.Ping(context.Background())

database/sql interface

OpenDB
conn := datastore.OpenDB(&datastore.Options{
	Addr: []string{"127.0.0.1:9999"},
	Auth: datastore.Auth{
		Database: "default",
		Username: "default",
		Password: "",
	},
	TLS: &tls.Config{
		InsecureSkipVerify: true,
	},
	Settings: datastore.Settings{
		"max_execution_time": 60,
	},
	DialTimeout: time.Second * 30,
	Compression: &datastore.Compression{
		Method: datastore.CompressionLZ4,
	},
	Debug: true,
	BlockBufferSize: 10,
	MaxCompressionBuffer: 10240,
	ClientInfo: datastore.ClientInfo{
		Products: []struct {
			Name    string
			Version string
		}{
			{Name: "my-app", Version: "0.1"},
		},
	},
})
conn.SetMaxIdleConns(5)
conn.SetMaxOpenConns(10)
conn.SetConnMaxLifetime(time.Hour)
DSN
  • hosts - comma-separated list of single address hosts for load-balancing and failover
  • username/password - auth credentials
  • database - select the current default database
  • dial_timeout - a duration string (default 30s)
  • connection_open_strategy - random/round_robin/in_order (default in_order)
  • debug - enable debug output (boolean value)
  • compress - specify the compression algorithm: none (default), zstd, lz4, lz4hc, gzip, deflate, br
  • compress_level - Level of compression (algorithm-specific)
  • block_buffer_size - size of block buffer (default 2)
  • read_timeout - a duration string (default 5m)
  • max_compression_buffer - max size (bytes) of compression buffer (default 10MiB)
  • client_info_product - optional list of product name and version pairs

Example:

datastore://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&read_timeout=30s&max_execution_time=60
HTTP Support

The native format can be used over the HTTP protocol:

http://host1:8123,host2:8123/database?dial_timeout=200ms&max_execution_time=60

Or using OpenDB:

conn := datastore.OpenDB(&datastore.Options{
	Addr: []string{"127.0.0.1:8123"},
	Auth: datastore.Auth{
		Database: "default",
		Username: "default",
		Password: "",
	},
	Settings: datastore.Settings{
		"max_execution_time": 60,
	},
	DialTimeout: 30 * time.Second,
	Compression: &datastore.Compression{
		Method: datastore.CompressionLZ4,
	},
	Protocol: datastore.HTTP,
})

Compression

ZSTD/LZ4 compression is supported over native and http protocols. This is performed column by column at a block level and is only used for inserts.

TLS/SSL

Set a non-nil tls.Config pointer in the Options struct to establish a secure connection:

conn := datastore.OpenDB(&datastore.Options{
	...
	TLS: &tls.Config{
		InsecureSkipVerify: false,
	},
	...
})
HTTPS

Use https in your DSN string:

https://host1:8443,host2:8443/database?dial_timeout=200ms&max_execution_time=60

Async insert

Async insert is supported via WithAsync() helper:

ctx := datastore.Context(ctx, datastore.WithAsync(true))

PrepareBatch options

Available options:

Examples

native interface
std database/sql interface

License

Apache License 2.0

Documentation

Index

Constants

View Source
const (
	ClientVersionMajor       = 1
	ClientVersionMinor       = 0
	ClientVersionPatch       = 0
	ClientTCPProtocolVersion = proto.DBMS_TCP_PROTOCOL_VERSION
)
View Source
const (
	CompressionNone    = CompressionMethod(compress.None)
	CompressionLZ4     = CompressionMethod(compress.LZ4)
	CompressionLZ4HC   = CompressionMethod(compress.LZ4HC)
	CompressionZSTD    = CompressionMethod(compress.ZSTD)
	CompressionGZIP    = CompressionMethod(0x95)
	CompressionDeflate = CompressionMethod(0x96)
	CompressionBrotli  = CompressionMethod(0x97)
)
View Source
const ClientName = "datastore-go"

Variables

View Source
var (
	ErrBatchInvalid              = errors.New("datastore: batch is invalid. check appended data is correct")
	ErrBatchAlreadySent          = errors.New("datastore: batch has already been sent")
	ErrBatchNotSent              = errors.New("datastore: invalid retry, batch not sent yet")
	ErrAcquireConnTimeout        = errors.New("datastore: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
	ErrUnsupportedServerRevision = errors.New("datastore: unsupported server revision")
	ErrBindMixedParamsFormats    = errors.New("datastore [bind]: mixed named, numeric or positional parameters")
	ErrAcquireConnNoAddress      = errors.New("datastore: no valid address supplied")
	ErrServerUnexpectedData      = errors.New("code: 101, message: Unexpected packet Data received from client")
	ErrConnectionClosed          = errors.New("datastore: connection is closed")
)
View Source
var (
	ErrInvalidValueInNamedDateValue = errors.New("invalid value in NamedDateValue for query parameter")
	ErrUnsupportedQueryParameter    = errors.New("unsupported query parameter type")
)
View Source
var (
	ErrInvalidTimezone = errors.New("invalid timezone value")
)

Functions

func Connector

func Connector(opt *Options) driver.Connector

func Context

func Context(parent context.Context, options ...QueryOption) context.Context

Context returns a derived context with the given ClickHouse QueryOptions. Existing QueryOptions will be overwritten per option if present. The QueryOptions Settings map will be initialized if nil.

func DateNamed

func DateNamed(name string, value time.Time, scale TimeUnit) driver.NamedDateValue

func ExtractJSONPathAs

func ExtractJSONPathAs[T any](o *JSON, path string) (valueAs T, ok bool)

ExtractJSONPathAs is a convenience function for asserting a path to a specific type. The underlying value is also extracted from its Dynamic wrapper if present. T cannot be a Dynamic, if you want a Dynamic simply use ExtractJSONPathAsDynamic.

func Named

func Named(name string, value any) driver.NamedValue

func Open

func Open(opt *Options) (driver.Conn, error)

func OpenDB

func OpenDB(opt *Options) *sql.DB

Types

type ArraySet

type ArraySet []any

type AsyncOptions

type AsyncOptions struct {
	// contains filtered or unexported fields
}

type Auth

type Auth struct {
	Database string

	Username string
	Password string
}

type ClientInfo

type ClientInfo struct {
	Products []struct {
		Name    string
		Version string
	}

	Comment []string
}

func (ClientInfo) Append

func (a ClientInfo) Append(b ClientInfo) ClientInfo

Append returns a new copy of the combined ClientInfo structs

func (ClientInfo) String

func (o ClientInfo) String() string

type ColumnNameAndType

type ColumnNameAndType struct {
	Name string
	Type string
}

ColumnNameAndType represents a column name and type

type Compression

type Compression struct {
	Method CompressionMethod
	// this only applies to lz4, lz4hc, zlib, and brotli compression algorithms
	Level int
}

type CompressionMethod

type CompressionMethod byte

func (CompressionMethod) String

func (c CompressionMethod) String() string

type Conn

type Conn = driver.Conn

type ConnOpenStrategy

type ConnOpenStrategy uint8
const (
	ConnOpenInOrder ConnOpenStrategy = iota
	ConnOpenRoundRobin
	ConnOpenRandom
)

type CustomSetting

type CustomSetting struct {
	Value string
}

CustomSetting is a helper struct to distinguish custom settings from important ones. For native protocol, is_important flag is set to value 0x02 (see https://github.com/ClickHouse/ClickHouse/blob/c873560fe7185f45eed56520ec7d033a7beb1551/src/Core/BaseSettings.h#L516-L521) Only string value is supported until formatting logic that exists in ClickHouse is implemented in clickhouse-go. (https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Field.cpp#L312 and https://github.com/ClickHouse/clickhouse-go/issues/992)

type Dial

type Dial func(ctx context.Context, addr string, opt *Options) (DialResult, error)

type DialResult

type DialResult struct {
	// contains filtered or unexported fields
}

func DefaultDialStrategy

func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dial) (r DialResult, err error)

type Dynamic

type Dynamic = chcol.Dynamic

Dynamic is an alias for the Variant type

func ExtractJSONPathAsDynamic

func ExtractJSONPathAsDynamic(o *JSON, path string) (Dynamic, bool)

ExtractJSONPathAsDynamic is a convenience function for asserting a path to a Dynamic. If the value is not a Dynamic, the value is wrapped in an untyped Dynamic with false returned.

func NewDynamic

func NewDynamic(v any) Dynamic

NewDynamic creates a new Dynamic with the given value

func NewDynamicWithType

func NewDynamicWithType(v any, chType string) Dynamic

NewDynamicWithType creates a new Dynamic with the given value and ClickHouse type

type Exception

type Exception = proto.Exception

type GetJWTFunc

type GetJWTFunc = func(ctx context.Context) (string, error)

type GroupSet

type GroupSet struct {
	Value []any
}

type HTTPProxy

type HTTPProxy func(*http.Request) (*url.URL, error)

type HTTPReaderWriter

type HTTPReaderWriter struct {
	// contains filtered or unexported fields
}

func (*HTTPReaderWriter) NewReader

func (rw *HTTPReaderWriter) NewReader(res *http.Response) (io.Reader, error)

NewReader will return a reader that will decompress data if needed.

type JSON

type JSON = chcol.JSON

JSON represents a ClickHouse JSON type that can hold multiple possible types

func NewJSON

func NewJSON() *JSON

NewJSON creates a new empty JSON value

type JSONDeserializer

type JSONDeserializer = chcol.JSONDeserializer

JSONDeserializer interface allows a struct to load its data from an optimized JSON structure instead of relying on recursive reflection to set its fields.

type JSONSerializer

type JSONSerializer = chcol.JSONSerializer

JSONSerializer interface allows a struct to be manually converted to an optimized JSON structure instead of relying on recursive reflection. Note that the struct must be a pointer in order for the interface to be matched, reflection will be used otherwise.

type Log

type Log struct {
	Time      time.Time
	TimeMicro uint32
	Hostname  string
	QueryID   string
	ThreadID  uint64
	Priority  int8
	Source    string
	Text      string
}

type OpError

type OpError struct {
	Op         string
	ColumnName string
	Err        error
}

func (*OpError) Error

func (e *OpError) Error() string

type Options

type Options struct {
	Protocol   Protocol
	ClientInfo ClientInfo

	TLS                  *tls.Config
	Addr                 []string
	Auth                 Auth
	DialContext          func(ctx context.Context, addr string) (net.Conn, error)
	DialStrategy         func(ctx context.Context, connID int, options *Options, dial Dial) (DialResult, error)
	Debug                bool
	Debugf               func(format string, v ...any) // only works when Debug is true
	Settings             Settings
	Compression          *Compression
	DialTimeout          time.Duration // default 30 second
	MaxOpenConns         int           // default MaxIdleConns + 5
	MaxIdleConns         int           // default 5
	ConnMaxLifetime      time.Duration // default 1 hour
	ConnOpenStrategy     ConnOpenStrategy
	FreeBufOnConnRelease bool              // drop preserved memory buffer after each query
	HttpHeaders          map[string]string // set additional headers on HTTP requests
	HttpUrlPath          string            // set additional URL path for HTTP requests
	HttpMaxConnsPerHost  int               // MaxConnsPerHost for http.Transport
	BlockBufferSize      uint8             // default 2 - can be overwritten on query
	MaxCompressionBuffer int               // default 10485760 - measured in bytes  i.e.

	// HTTPProxy specifies an HTTP proxy URL to use for requests made by the client.
	HTTPProxyURL *url.URL

	// GetJWT should return a JWT for authentication with ClickHouse Cloud.
	// This is called per connection/request, so you may cache the token in your app if needed.
	// Use this instead of Auth.Username and Auth.Password if you're using JWT auth.
	GetJWT GetJWTFunc

	// ReadTimeout is the maximum duration the client will wait for ClickHouse
	// to respond to a single Read call for bytes over the connection.
	// Can be overridden with context.WithDeadline.
	ReadTimeout time.Duration

	// Set a custom transport for the http client.
	// The default transport configured by the library is passed in as an argument.
	TransportFunc func(*http.Transport) (http.RoundTripper, error)
	// contains filtered or unexported fields
}

func ParseDSN

func ParseDSN(dsn string) (*Options, error)

type Parameters

type Parameters map[string]string

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool[T any](fn func() T) Pool[T]

func (*Pool[T]) Get

func (p *Pool[T]) Get() T

func (*Pool[T]) Put

func (p *Pool[T]) Put(x T)

type ProfileEvent

type ProfileEvent struct {
	Hostname    string
	CurrentTime time.Time
	ThreadID    uint64
	Type        string
	Name        string
	Value       int64
}

type ProfileInfo

type ProfileInfo = proto.ProfileInfo

type Progress

type Progress = proto.Progress

type Protocol

type Protocol int
const (
	Native Protocol = iota
	HTTP
)

func (Protocol) String

func (p Protocol) String() string

type QueryOption

type QueryOption func(*QueryOptions) error

func WithAsync

func WithAsync(wait bool) QueryOption

func WithBlockBufferSize

func WithBlockBufferSize(size uint8) QueryOption

func WithClientInfo

func WithClientInfo(ci ClientInfo) QueryOption

WithClientInfo appends client info data to the query, visible in the system.query_log table. This does not replace the client info provided in the connection options, it appends to it. Can be called multiple times to append more info.

func WithColumnNamesAndTypes

func WithColumnNamesAndTypes(columnNamesAndTypes []ColumnNameAndType) QueryOption

WithColumnNamesAndTypes is used to provide a predetermined list of column names and types for HTTP inserts. Without this, the HTTP implementation will parse the query and run a DESCRIBE TABLE request to fetch and validate column names.

func WithExternalTable

func WithExternalTable(t ...*ext.Table) QueryOption

func WithJWT

func WithJWT(jwt string) QueryOption

WithJWT overrides the existing authentication with the given JWT. This only applies for clients connected with HTTPS to ClickHouse Cloud.

func WithLogs

func WithLogs(fn func(*Log)) QueryOption

func WithParameters

func WithParameters(params Parameters) QueryOption

func WithProfileEvents

func WithProfileEvents(fn func([]ProfileEvent)) QueryOption

func WithProfileInfo

func WithProfileInfo(fn func(*ProfileInfo)) QueryOption

func WithProgress

func WithProgress(fn func(*Progress)) QueryOption

func WithQueryID

func WithQueryID(queryID string) QueryOption

func WithQuotaKey

func WithQuotaKey(quotaKey string) QueryOption

func WithSettings

func WithSettings(settings Settings) QueryOption

func WithSpan

func WithSpan(span trace.SpanContext) QueryOption

func WithStdAsync deprecated

func WithStdAsync(wait bool) QueryOption

Deprecated: use `WithAsync` instead.

func WithUserLocation

func WithUserLocation(location *time.Location) QueryOption

type QueryOptions

type QueryOptions struct {
	// contains filtered or unexported fields
}

type ServerVersion

type ServerVersion = proto.ServerHandshake

type Settings

type Settings map[string]any

type TimeUnit

type TimeUnit uint8
const (
	Seconds TimeUnit = iota
	MilliSeconds
	MicroSeconds
	NanoSeconds
)

type Variant

type Variant = chcol.Variant

Variant represents a ClickHouse Variant type that can hold multiple possible types

func NewVariant

func NewVariant(v any) Variant

NewVariant creates a new Variant with the given value

func NewVariantWithType

func NewVariantWithType(v any, chType string) Variant

NewVariantWithType creates a new Variant with the given value and ClickHouse type

Directories

Path Synopsis
benchmark
v2/read command
v2/read-native command
v2/write command
v2/write-async command
v2/write-native command
examples
std
internal
cmd/release command
lib
cityhash102
* COPY from https://github.com/zentures/cityhash/
* COPY from https://github.com/zentures/cityhash/
column/codegen command
issues/209 command
issues/360 command
issues/470 command
issues/476 command
issues/484 command
issues/485 command
std
stress command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL