Skip to content

Commit f2b199d

Browse files
committed
wip: more upsert examples
1 parent 242c1e9 commit f2b199d

File tree

2 files changed

+324
-58
lines changed

2 files changed

+324
-58
lines changed

database/example_upsert_streamed_test.go

Lines changed: 0 additions & 58 deletions
This file was deleted.

database/example_upsert_test.go

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
7+
"golang.org/x/sync/errgroup"
8+
"time"
9+
)
10+
11+
func ExampleUpsertStreamed() {
12+
var (
13+
testEntites = []User{
14+
{Id: 1, Name: "test1", Age: 10, Email: "[email protected]"},
15+
{Id: 2, Name: "test2", Age: 20, Email: "[email protected]"},
16+
}
17+
testSelect = &[]User{}
18+
g, ctx = errgroup.WithContext(context.Background())
19+
entities = make(chan User, len(testEntites))
20+
logs = getTestLogging()
21+
db = getTestDb(logs)
22+
log = logs.GetLogger()
23+
err error
24+
)
25+
initTestDb(db)
26+
27+
g.Go(func() error {
28+
return UpsertStreamed(ctx, db, entities)
29+
})
30+
31+
for _, entity := range testEntites {
32+
entities <- entity
33+
}
34+
35+
close(entities)
36+
time.Sleep(10 * time.Millisecond)
37+
38+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
39+
log.Fatalf("cannot select from db: %v", err)
40+
}
41+
42+
fmt.Println(*testSelect)
43+
44+
if err = g.Wait(); err != nil {
45+
log.Fatalf("error while upserting entities: %v", err)
46+
}
47+
48+
_ = db.Close()
49+
50+
// Output:
51+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
52+
}
53+
54+
func ExampleUpsertStreamedWithStatement() {
55+
var (
56+
testEntites = []User{
57+
{Id: 1, Name: "test1"},
58+
{Id: 2, Name: "test2"},
59+
}
60+
testSelect = &[]User{}
61+
g, ctx = errgroup.WithContext(context.Background())
62+
entities = make(chan User, len(testEntites))
63+
logs = getTestLogging()
64+
db = getTestDb(logs)
65+
log = logs.GetLogger()
66+
stmt = NewUpsertStatement(User{}).SetColumns("id", "name")
67+
err error
68+
)
69+
initTestDb(db)
70+
71+
g.Go(func() error {
72+
return UpsertStreamed(ctx, db, entities, WithUpsertStatement(stmt))
73+
})
74+
75+
for _, entity := range testEntites {
76+
entities <- entity
77+
}
78+
79+
close(entities)
80+
time.Sleep(10 * time.Millisecond)
81+
82+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
83+
log.Fatalf("cannot select from db: %v", err)
84+
}
85+
86+
fmt.Println(*testSelect)
87+
88+
if err = g.Wait(); err != nil {
89+
log.Fatalf("error while upserting entities: %v", err)
90+
}
91+
92+
_ = db.Close()
93+
94+
// Output:
95+
// [{1 test1 0 } {2 test2 0 }]
96+
}
97+
98+
func ExampleUpsertStreamedWithOnUpsert() {
99+
var (
100+
testEntites = []User{
101+
{Id: 1, Name: "test1", Age: 10, Email: "[email protected]"},
102+
{Id: 2, Name: "test2", Age: 20, Email: "[email protected]"},
103+
}
104+
callback = func(ctx context.Context, affectedRows []any) (err error) {
105+
fmt.Printf("number of affected rows: %d\n", len(affectedRows))
106+
return nil
107+
}
108+
testSelect = &[]User{}
109+
g, ctx = errgroup.WithContext(context.Background())
110+
entities = make(chan User, len(testEntites))
111+
logs = getTestLogging()
112+
db = getTestDb(logs)
113+
log = logs.GetLogger()
114+
err error
115+
)
116+
initTestDb(db)
117+
118+
g.Go(func() error {
119+
return UpsertStreamed(ctx, db, entities, WithOnUpsert(callback))
120+
})
121+
122+
for _, entity := range testEntites {
123+
entities <- entity
124+
}
125+
126+
time.Sleep(1 * time.Second)
127+
close(entities)
128+
129+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
130+
log.Fatalf("cannot select from db: %v", err)
131+
}
132+
133+
fmt.Println(*testSelect)
134+
135+
if err = g.Wait(); err != nil {
136+
log.Fatalf("error while upserting entities: %v", err)
137+
}
138+
139+
_ = db.Close()
140+
141+
// Output:
142+
// number of affected rows: 2
143+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
144+
}
145+
146+
func ExampleNamedBulkUpsert() {
147+
var (
148+
testEntites = []User{
149+
{Id: 1, Name: "test1", Age: 10, Email: "[email protected]"},
150+
{Id: 2, Name: "test2", Age: 20, Email: "[email protected]"},
151+
}
152+
testSelect = &[]User{}
153+
g, ctx = errgroup.WithContext(context.Background())
154+
entities = make(chan Entity, len(testEntites))
155+
logs = getTestLogging()
156+
db = getTestDb(logs)
157+
log = logs.GetLogger()
158+
sem = db.GetSemaphoreForTable(TableName(User{}))
159+
err error
160+
)
161+
initTestDb(db)
162+
163+
stmt, placeholders, err := db.QueryBuilder().UpsertStatement(NewUpsertStatement(User{}))
164+
if err != nil {
165+
log.Fatalf("error while building upsert statement: %v", err)
166+
}
167+
168+
g.Go(func() error {
169+
return db.NamedBulkExec(ctx, stmt, placeholders, sem, entities, com.NeverSplit)
170+
})
171+
172+
for _, entity := range testEntites {
173+
entities <- entity
174+
}
175+
176+
time.Sleep(1 * time.Second)
177+
close(entities)
178+
179+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
180+
log.Fatalf("cannot select from db: %v", err)
181+
}
182+
183+
fmt.Println(*testSelect)
184+
185+
if err = g.Wait(); err != nil {
186+
log.Fatalf("error while upserting entities: %v", err)
187+
}
188+
189+
_ = db.Close()
190+
191+
// Output:
192+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
193+
}
194+
195+
func ExampleNamedBulkUpsertWithOnUpsert() {
196+
var (
197+
testEntites = []User{
198+
{Id: 1, Name: "test1", Age: 10, Email: "[email protected]"},
199+
{Id: 2, Name: "test2", Age: 20, Email: "[email protected]"},
200+
}
201+
testSelect = &[]User{}
202+
callback = func(ctx context.Context, affectedRows []Entity) (err error) {
203+
fmt.Printf("number of affected rows: %d\n", len(affectedRows))
204+
return nil
205+
}
206+
g, ctx = errgroup.WithContext(context.Background())
207+
entities = make(chan Entity, len(testEntites))
208+
logs = getTestLogging()
209+
db = getTestDb(logs)
210+
log = logs.GetLogger()
211+
sem = db.GetSemaphoreForTable(TableName(User{}))
212+
err error
213+
)
214+
initTestDb(db)
215+
216+
stmt, placeholders, err := db.QueryBuilder().UpsertStatement(NewUpsertStatement(User{}))
217+
if err != nil {
218+
log.Fatalf("error while building upsert statement: %v", err)
219+
}
220+
221+
g.Go(func() error {
222+
return db.NamedBulkExec(ctx, stmt, placeholders, sem, entities, com.NeverSplit, callback)
223+
})
224+
225+
for _, entity := range testEntites {
226+
entities <- entity
227+
}
228+
229+
time.Sleep(1 * time.Second)
230+
close(entities)
231+
232+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
233+
log.Fatalf("cannot select from db: %v", err)
234+
}
235+
236+
fmt.Println(*testSelect)
237+
238+
if err = g.Wait(); err != nil {
239+
log.Fatalf("error while upserting entities: %v", err)
240+
}
241+
242+
_ = db.Close()
243+
244+
// Output:
245+
// number of affected rows: 2
246+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
247+
}
248+
249+
func ExampleNamedExecUpsert() {
250+
var (
251+
testEntites = []User{
252+
{Id: 1, Name: "test1", Age: 10, Email: "[email protected]"},
253+
{Id: 2, Name: "test2", Age: 20, Email: "[email protected]"},
254+
}
255+
testSelect = &[]User{}
256+
ctx = context.Background()
257+
logs = getTestLogging()
258+
db = getTestDb(logs)
259+
log = logs.GetLogger()
260+
err error
261+
)
262+
initTestDb(db)
263+
264+
stmt, _, err := db.QueryBuilder().UpsertStatement(NewUpsertStatement(User{}))
265+
if err != nil {
266+
log.Fatalf("error while building upsert statement: %v", err)
267+
}
268+
269+
for _, entity := range testEntites {
270+
if _, err = db.NamedExecContext(ctx, stmt, entity); err != nil {
271+
log.Fatalf("error while upserting entity: %v", err)
272+
}
273+
}
274+
275+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
276+
log.Fatalf("cannot select from db: %v", err)
277+
}
278+
279+
fmt.Println(*testSelect)
280+
281+
_ = db.Close()
282+
283+
// Output:
284+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
285+
}
286+
287+
func ExampleExecUpsert() {
288+
var (
289+
testEntites = [][]any{
290+
{1, "test1", 10, "[email protected]"},
291+
{2, "test2", 20, "[email protected]"},
292+
}
293+
testSelect = &[]User{}
294+
stmt = `INSERT INTO user ("id", "name", "age", "email") VALUES (?, ?, ?, ?) ON CONFLICT DO UPDATE SET "name" = EXCLUDED."name", "age" = EXCLUDED."age", "email" = EXCLUDED."email"`
295+
ctx = context.Background()
296+
logs = getTestLogging()
297+
db = getTestDb(logs)
298+
log = logs.GetLogger()
299+
err error
300+
)
301+
initTestDb(db)
302+
303+
//stmt, _, err := db.QueryBuilder().UpsertStatement(NewUpsertStatement(User{}))
304+
//if err != nil {
305+
// log.Fatalf("error while building upsert statement: %v", err)
306+
//}
307+
308+
for _, entity := range testEntites {
309+
if _, err = db.ExecContext(ctx, stmt, entity...); err != nil {
310+
log.Fatalf("error while upserting entity: %v", err)
311+
}
312+
}
313+
314+
if err = db.Select(testSelect, "SELECT * FROM user"); err != nil {
315+
log.Fatalf("cannot select from db: %v", err)
316+
}
317+
318+
fmt.Println(*testSelect)
319+
320+
_ = db.Close()
321+
322+
// Output:
323+
// [{1 test1 10 [email protected]} {2 test2 20 [email protected]}]
324+
}

0 commit comments

Comments
 (0)