Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
f46da54
Add sqlite driver
jrauh01 Dec 10, 2024
77c38d0
Introduce database#QueryBuilder
jrauh01 Nov 28, 2024
471e86f
Add database#QueryBuilder to database#DB
jrauh01 Nov 28, 2024
a70f150
Introduce database#InsertStatement
jrauh01 Nov 28, 2024
5c0d512
Add and implement QueryBuilder.InsertStatement()
jrauh01 Nov 28, 2024
f1e114a
Add DB.BuildInsertStatement()
jrauh01 Nov 28, 2024
9606455
Introduce database#SelectStatement
jrauh01 Nov 28, 2024
c06b5fc
Add and implement QueryBuilder.SelectStatement()
jrauh01 Nov 28, 2024
3ad0449
Add DB.BuildSelectStatement()
jrauh01 Nov 28, 2024
a2434ae
Introduce database#InsertSelectStatement
jrauh01 Nov 28, 2024
63536f8
Add and implement QueryBuilder.InsertSelectStatement()
jrauh01 Nov 28, 2024
88ca40c
Add DB.BuildInsertSelectStatement()
jrauh01 Nov 28, 2024
9f565f0
Move statements to own files
jrauh01 Nov 28, 2024
0738b85
Introduce database#DeleteStatement
jrauh01 Nov 28, 2024
fb70a5e
Add and implement QueryBuilder.DeleteStatement()
jrauh01 Nov 28, 2024
7fc66d1
Add BD.BuildDeleteStatement()
jrauh01 Nov 28, 2024
f3f9239
Add and implement QueryBuilder.DeleteAllStatement()
jrauh01 Nov 28, 2024
8c5e429
Add DB.BuildDeleteAllStatement()
jrauh01 Nov 28, 2024
5613c62
Add and implement QueryBuilder.InsertIgnoreStatement()
jrauh01 Nov 28, 2024
e9213b5
Add DB.BuildInsertIgnoreStatement()
jrauh01 Nov 28, 2024
5bbcf6d
Introduce database#UpdateStatement
jrauh01 Nov 28, 2024
c27569b
Add and implement QueryBuilder.UpdateStatement()
jrauh01 Nov 28, 2024
1b302fb
Add DB.BuildUpdateStatement()
jrauh01 Nov 28, 2024
1bb410a
Add and implement QueryBuilder.UpdateAllStatement()
jrauh01 Nov 28, 2024
ea8d0b1
Add DB.BuildUpdateAllStatement()
jrauh01 Nov 28, 2024
92dad9a
Add and implement QueryBuilder.UpsertStatement()
jrauh01 Nov 29, 2024
cfbe484
Add DB.BuildUpsertStatement()
jrauh01 Nov 29, 2024
cd4e607
Rename QueryBuilder.driver to 'dbDriver'
jrauh01 Nov 29, 2024
c4fb886
Adjust unsupported driver error message
jrauh01 Nov 29, 2024
2790aa5
Make statements generic
jrauh01 Nov 29, 2024
b4d9cde
Remove generics from statements
jrauh01 Dec 2, 2024
a6ca001
Introduce database#UpsertStatement
jrauh01 Dec 2, 2024
295843e
Use UpsertStatement in QueryBuilder.UpsertStatement()
jrauh01 Dec 2, 2024
17256fd
Use UpsertStatement in DB.BuildUpsertStatement()
jrauh01 Dec 2, 2024
3dee8cf
Rename constructor funcs for statements
jrauh01 Dec 2, 2024
82242b0
Add not retryable error
jrauh01 Dec 3, 2024
05d3d6e
Return number of placeholders for upsert
jrauh01 Dec 3, 2024
88dfad1
wip: streamed statements
jrauh01 Dec 3, 2024
b3fa406
Use fmt.Errorf() instead of errors.New()
jrauh01 Dec 3, 2024
9783bc4
Add functional options for insert
jrauh01 Dec 3, 2024
1c16f5c
Add functional options for update
jrauh01 Dec 3, 2024
ddcf1ea
Add docs for statements
jrauh01 Dec 4, 2024
ae380b6
Add custom errors for query builder
jrauh01 Dec 4, 2024
0e1bcaf
Sort columns for unit testing
jrauh01 Dec 4, 2024
251ee1f
Fix bug: cache is changed via reference
jrauh01 Dec 4, 2024
d0712db
Introduce new errors
jrauh01 Dec 4, 2024
bf0f262
Make InsertSelect a standalone statement
jrauh01 Dec 4, 2024
62d9942
Build SET clause from columns
jrauh01 Dec 4, 2024
4f83692
Add support for sqlite
jrauh01 Dec 9, 2024
df38c2f
Use own error type for returning error
jrauh01 Dec 9, 2024
66f87bf
Add unit tests for query builder
jrauh01 Dec 9, 2024
f6e5cc7
Create pgsql constraint without function
jrauh01 Dec 9, 2024
035dd5a
Make MockEntity implement Entity directly
jrauh01 Dec 10, 2024
5206f81
Add docs example for upsert streamed
jrauh01 Dec 10, 2024
48fe632
Move unit test data to database package
jrauh01 Dec 10, 2024
3ac8ae8
wip: unit tests for upsert streamed
jrauh01 Dec 11, 2024
cf6e4f0
Revise the functional options for statements
jrauh01 Dec 11, 2024
9cefa4b
Add testutils for database
jrauh01 Dec 11, 2024
e1cc60f
Add query builder to DB
jrauh01 Dec 11, 2024
0bf1e5c
Append callbacks instead overwriting
jrauh01 Dec 11, 2024
d63981d
Add extra function for prefilling test database
jrauh01 Dec 11, 2024
6494fdd
Use one line if for error handling
jrauh01 Dec 12, 2024
7d0b155
wip: more upsert examples
jrauh01 Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions database/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type IDer interface {
// EntityFactoryFunc knows how to create an Entity.
type EntityFactoryFunc func() Entity

type EntityConstraint[T any] interface {
Entity
*T
}

// Upserter implements the Upsert method,
// which returns a part of the object for ON DUPLICATE KEY UPDATE.
type Upserter interface {
Expand Down
7 changes: 7 additions & 0 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
_ "modernc.org/sqlite"
"net"
"net/url"
"slices"
Expand All @@ -39,6 +40,7 @@ type DB struct {
Options *Options

addr string
queryBuilder QueryBuilder
columnMap ColumnMap
logger *logging.Logger
tableSemaphores map[string]*semaphore.Weighted
Expand Down Expand Up @@ -256,6 +258,7 @@ func NewDbFromConfig(c *Config, logger *logging.Logger, connectorCallbacks Retry
return &DB{
DB: db,
Options: &c.Options,
queryBuilder: NewQueryBuilder(db.DriverName()),
columnMap: NewColumnMap(db.Mapper),
addr: addr,
logger: logger,
Expand Down Expand Up @@ -932,6 +935,10 @@ func (db *DB) Log(ctx context.Context, query string, counter *com.Counter) perio
}))
}

func (db *DB) QueryBuilder() QueryBuilder {
return db.queryBuilder
}

var (
// Assert TxOrDB interface compliance of the DB and sqlx.Tx types.
_ TxOrDB = (*DB)(nil)
Expand Down
213 changes: 213 additions & 0 deletions database/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package database

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/backoff"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/retry"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
"time"
)

// DeleteStatement is the interface for building DELETE statements.
type DeleteStatement interface {
// From sets the table name for the DELETE statement.
// Overrides the table name provided by the entity.
From(table string) DeleteStatement

// SetWhere sets the where clause for the DELETE statement.
SetWhere(where string) DeleteStatement

// Entity returns the entity associated with the DELETE statement.
Entity() Entity

// Table returns the table name for the DELETE statement.
Table() string

Where() string
}

// NewDeleteStatement returns a new deleteStatement for the given entity.
func NewDeleteStatement(entity Entity) DeleteStatement {
return &deleteStatement{
entity: entity,
}
}

// deleteStatement is the default implementation of the DeleteStatement interface.
type deleteStatement struct {
entity Entity
table string
where string
}

func (d *deleteStatement) From(table string) DeleteStatement {
d.table = table

return d
}

func (d *deleteStatement) SetWhere(where string) DeleteStatement {
d.where = where

return d
}

func (d *deleteStatement) Entity() Entity {
return d.entity
}

func (d *deleteStatement) Table() string {
return d.table
}

func (d *deleteStatement) Where() string {
return d.where
}

// DeleteOption is a functional option for DeleteStreamed().
type DeleteOption func(opts *deleteOptions)

// WithDeleteStatement sets the DELETE statement to be used for deleting entities.
func WithDeleteStatement(stmt DeleteStatement) DeleteOption {
return func(opts *deleteOptions) {
opts.stmt = stmt
}
}

// WithOnDelete sets the callbacks for a successful DELETE operation.
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
return func(opts *deleteOptions) {
opts.onDelete = append(opts.onDelete, onDelete...)
}
}

// deleteOptions stores the options for DeleteStreamed.
type deleteOptions struct {
stmt DeleteStatement
onDelete []OnSuccess[any]
}

// DeleteStreamed deletes entities from the given channel from the database.
func DeleteStreamed(
ctx context.Context,
db *DB,
entityType Entity,
entities <-chan any,
options ...DeleteOption,
) error {
opts := &deleteOptions{}
for _, option := range options {
option(opts)
}

first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := db.GetSemaphoreForTable(TableName(entityType))

var stmt string

if opts.stmt != nil {
stmt, err = db.QueryBuilder().DeleteStatement(opts.stmt)
if err != nil {
return err
}
} else {
stmt, err = db.QueryBuilder().DeleteStatement(NewDeleteStatement(entityType))
if err != nil {
return err
}
}

switch reflect.TypeOf(first).Kind() {
case reflect.Struct, reflect.Map:
return namedBulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, com.NeverSplit[any], opts.onDelete...)
default:
return bulkExec(ctx, db, stmt, db.Options.MaxPlaceholdersPerStatement, sem, forward, opts.onDelete...)
}
}

func bulkExec(
ctx context.Context, db *DB, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
) error {
var counter com.Counter
defer db.Log(ctx, query, &counter).Stop()

g, ctx := errgroup.WithContext(ctx)
// Use context from group.
bulk := com.Bulk(ctx, arg, count, com.NeverSplit[any])

g.Go(func() error {
g, ctx := errgroup.WithContext(ctx)

for b := range bulk {
if err := sem.Acquire(ctx, 1); err != nil {
return errors.Wrap(err, "can't acquire semaphore")
}

g.Go(func(b []any) func() error {
return func() error {
defer sem.Release(1)

return retry.WithBackoff(
ctx,
func(context.Context) error {
var valCollection []any

for _, v := range b {
val := reflect.ValueOf(v)
if val.Kind() == reflect.Slice {
for i := 0; i < val.Len(); i++ {
valCollection = append(valCollection, val.Index(i).Interface())
}
} else {
valCollection = append(valCollection, val.Interface())
}
}

stmt, args, err := sqlx.In(query, valCollection)
if err != nil {
return fmt.Errorf(
"%w: %w",
retry.ErrNotRetryable,
errors.Wrapf(err, "can't build placeholders for %q", query),
)
}

stmt = db.Rebind(stmt)
_, err = db.ExecContext(ctx, stmt, args...)
if err != nil {
return CantPerformQuery(err, query)
}

counter.Add(uint64(len(b)))

for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, b); err != nil {
return err
}
}

return nil
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
db.GetDefaultRetrySettings(),
)
}
}(b))
}

return g.Wait()
})

return g.Wait()
}
Loading
Loading