Skip to content

Commit fa4d7bd

Browse files
authored
feat(client): add TimestampColumn() (#6)
1 parent b761335 commit fa4d7bd

File tree

3 files changed

+111
-9
lines changed

3 files changed

+111
-9
lines changed

sender.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func (s *LineSender) Close() error {
252252
// called before any Symbol or Column method.
253253
//
254254
// Table name cannot contain any of the following characters:
255-
// '\n', '\r', '?', ',', ''', '"', '\', '/', ':', ')', '(', '+', '*',
255+
// '\n', '\r', '?', ',', ', '"', '\', '/', ':', ')', '(', '+', '*',
256256
// '%', '~', starting '.', trailing '.', or a non-printable char.
257257
func (s *LineSender) Table(name string) *LineSender {
258258
if s.lastErr != nil {
@@ -274,7 +274,7 @@ func (s *LineSender) Table(name string) *LineSender {
274274
// before any Column method.
275275
//
276276
// Symbol name cannot contain any of the following characters:
277-
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
277+
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
278278
// '-', '*' '%%', '~', or a non-printable char.
279279
func (s *LineSender) Symbol(name, val string) *LineSender {
280280
if s.lastErr != nil {
@@ -306,7 +306,7 @@ func (s *LineSender) Symbol(name, val string) *LineSender {
306306
// message.
307307
//
308308
// Column name cannot contain any of the following characters:
309-
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
309+
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
310310
// '-', '*' '%%', '~', or a non-printable char.
311311
func (s *LineSender) Int64Column(name string, val int64) *LineSender {
312312
if !s.prepareForField(name) {
@@ -323,11 +323,43 @@ func (s *LineSender) Int64Column(name string, val int64) *LineSender {
323323
return s
324324
}
325325

326+
// TimestampColumn adds a timestamp column value to the ILP
327+
// message. Timestamp is Epoch microseconds.
328+
//
329+
// Negative timestamp value is not allowed and any attempt to
330+
// set a negative value will cause an error to be returned on subsequent
331+
// At() or AtNow() calls.
332+
//
333+
// Column name cannot contain any of the following characters:
334+
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
335+
// '-', '*' '%%', '~', or a non-printable char.
336+
func (s *LineSender) TimestampColumn(name string, ts int64) *LineSender {
337+
if ts < 0 {
338+
if s.lastErr != nil {
339+
return s
340+
}
341+
s.lastErr = fmt.Errorf("timestamp cannot be negative: %d", ts)
342+
return s
343+
}
344+
if !s.prepareForField(name) {
345+
return s
346+
}
347+
s.lastErr = s.writeColumnName(name)
348+
if s.lastErr != nil {
349+
return s
350+
}
351+
s.buf.WriteByte('=')
352+
s.buf.WriteInt(ts)
353+
s.buf.WriteByte('t')
354+
s.hasFields = true
355+
return s
356+
}
357+
326358
// Float64Column adds a 64-bit float (double) column value to the ILP
327359
// message.
328360
//
329361
// Column name cannot contain any of the following characters:
330-
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
362+
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
331363
// '-', '*' '%%', '~', or a non-printable char.
332364
func (s *LineSender) Float64Column(name string, val float64) *LineSender {
333365
if !s.prepareForField(name) {
@@ -346,7 +378,7 @@ func (s *LineSender) Float64Column(name string, val float64) *LineSender {
346378
// StringColumn adds a string column value to the ILP message.
347379
//
348380
// Column name cannot contain any of the following characters:
349-
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
381+
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
350382
// '-', '*' '%%', '~', or a non-printable char.
351383
func (s *LineSender) StringColumn(name, val string) *LineSender {
352384
if !s.prepareForField(name) {
@@ -370,7 +402,7 @@ func (s *LineSender) StringColumn(name, val string) *LineSender {
370402
// BoolColumn adds a boolean column value to the ILP message.
371403
//
372404
// Column name cannot contain any of the following characters:
373-
// '\n', '\r', '?', '.', ',', ''', '\"', '\\', '/', ':', ')', '(', '+',
405+
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
374406
// '-', '*' '%%', '~', or a non-printable char.
375407
func (s *LineSender) BoolColumn(name string, val bool) *LineSender {
376408
if !s.prepareForField(name) {

sender_integration_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
108108
return nil, err
109109
}
110110
req := testcontainers.ContainerRequest{
111-
Image: "questdb/questdb:6.4.1",
111+
Image: "questdb/questdb:6.5.1",
112112
ExposedPorts: []string{"9000/tcp", "9009/tcp"},
113113
WaitingFor: wait.ForHTTP("/").WithPort("9000"),
114114
Networks: []string{networkName},
@@ -238,6 +238,7 @@ func TestE2EValidWrites(t *testing.T) {
238238
Int64Column("long_col", 12).
239239
StringColumn("str_col", "foobar").
240240
BoolColumn("bool_col", true).
241+
TimestampColumn("timestamp_col", 42).
241242
At(ctx, 1000)
242243
if err != nil {
243244
return err
@@ -250,6 +251,7 @@ func TestE2EValidWrites(t *testing.T) {
250251
Int64Column("long_col", 11).
251252
StringColumn("str_col", "barbaz").
252253
BoolColumn("bool_col", false).
254+
TimestampColumn("timestamp_col", 43).
253255
At(ctx, 2000)
254256
},
255257
tableData{
@@ -259,11 +261,12 @@ func TestE2EValidWrites(t *testing.T) {
259261
{"long_col", "LONG"},
260262
{"str_col", "STRING"},
261263
{"bool_col", "BOOLEAN"},
264+
{"timestamp_col", "TIMESTAMP"},
262265
{"timestamp", "TIMESTAMP"},
263266
},
264267
Dataset: [][]interface{}{
265-
{"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000001Z"},
266-
{"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000002Z"},
268+
{"test_ilp1", float64(12.2), float64(12), "foobar", true, "1970-01-01T00:00:00.000042Z", "1970-01-01T00:00:00.000001Z"},
269+
{"test_ilp2", float64(11.2), float64(11), "barbaz", false, "1970-01-01T00:00:00.000043Z", "1970-01-01T00:00:00.000002Z"},
267270
},
268271
Count: 2,
269272
},

sender_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,42 @@ func TestValidWrites(t *testing.T) {
110110
}
111111
}
112112

113+
func TestTimestampSerialization(t *testing.T) {
114+
ctx := context.Background()
115+
116+
testCases := []struct {
117+
name string
118+
val int64
119+
}{
120+
{"max value", math.MaxInt64},
121+
{"zero", 0},
122+
{"small positive value", 10},
123+
}
124+
125+
for _, tc := range testCases {
126+
t.Run(tc.name, func(t *testing.T) {
127+
srv, err := newTestServer(sendToBackChannel)
128+
assert.NoError(t, err)
129+
130+
sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
131+
assert.NoError(t, err)
132+
133+
err = sender.Table(testTable).TimestampColumn("a_col", tc.val).AtNow(ctx)
134+
assert.NoError(t, err)
135+
136+
err = sender.Flush(ctx)
137+
assert.NoError(t, err)
138+
139+
sender.Close()
140+
141+
// Now check what was received by the server.
142+
expectLines(t, srv.backCh, []string{"my_test_table a_col=" + strconv.FormatInt(tc.val, 10) + "t"})
143+
144+
srv.close()
145+
})
146+
}
147+
}
148+
113149
func TestInt64Serialization(t *testing.T) {
114150
ctx := context.Background()
115151

@@ -288,6 +324,12 @@ func TestErrorOnMissingTableCall(t *testing.T) {
288324
return s.Float64Column("float", 4.2).AtNow(ctx)
289325
},
290326
},
327+
{
328+
"timestamp column",
329+
func(s *qdb.LineSender) error {
330+
return s.TimestampColumn("timestamp", 42).AtNow(ctx)
331+
},
332+
},
291333
}
292334

293335
for _, tc := range testCases {
@@ -326,6 +368,23 @@ func TestErrorOnMultipleTableCalls(t *testing.T) {
326368
assert.Empty(t, sender.Messages())
327369
}
328370

371+
func TestErrorOnNegativeTimestamp(t *testing.T) {
372+
ctx := context.Background()
373+
374+
srv, err := newTestServer(readAndDiscard)
375+
assert.NoError(t, err)
376+
defer srv.close()
377+
378+
sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr))
379+
assert.NoError(t, err)
380+
defer sender.Close()
381+
382+
err = sender.Table(testTable).TimestampColumn("timestamp_col", -42).AtNow(ctx)
383+
384+
assert.ErrorContains(t, err, "timestamp cannot be negative: -42")
385+
assert.Empty(t, sender.Messages())
386+
}
387+
329388
func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
330389
ctx := context.Background()
331390

@@ -357,6 +416,12 @@ func TestErrorOnSymbolCallAfterColumn(t *testing.T) {
357416
return s.Table("awesome_table").Float64Column("float", 4.2).Symbol("sym", "abc").AtNow(ctx)
358417
},
359418
},
419+
{
420+
"timestamp column",
421+
func(s *qdb.LineSender) error {
422+
return s.Table("awesome_table").TimestampColumn("timestamp", 42).Symbol("sym", "abc").AtNow(ctx)
423+
},
424+
},
360425
}
361426

362427
for _, tc := range testCases {
@@ -483,6 +548,7 @@ func BenchmarkLineSenderBatch1000(b *testing.B) {
483548
Int64Column("long_col", int64(i)).
484549
StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua").
485550
BoolColumn("bool_col", true).
551+
TimestampColumn("timestamp_col", 42).
486552
At(ctx, int64(1000*i))
487553
}
488554
sender.Flush(ctx)
@@ -509,6 +575,7 @@ func BenchmarkLineSenderNoFlush(b *testing.B) {
509575
Int64Column("long_col", int64(i)).
510576
StringColumn("str_col", "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua").
511577
BoolColumn("bool_col", true).
578+
TimestampColumn("timestamp_col", 42).
512579
At(ctx, int64(1000*i))
513580
}
514581
sender.Flush(ctx)

0 commit comments

Comments
 (0)