@@ -48,6 +48,11 @@ import (
48
48
// chars found in table or column name.
49
49
var ErrInvalidMsg = errors .New ("invalid message" )
50
50
51
+ const (
52
+ defaultBufferCapacity = 128 * 1024
53
+ defaultFileNameLimit = 127
54
+ )
55
+
51
56
type tlsMode int64
52
57
53
58
const (
@@ -62,18 +67,19 @@ const (
62
67
// Each sender corresponds to a single TCP connection. A sender
63
68
// should not be called concurrently by multiple goroutines.
64
69
type LineSender struct {
65
- address string
66
- tlsMode tlsMode
67
- keyId string // Erased once auth is done.
68
- key string // Erased once auth is done.
69
- bufCap int
70
- conn net.Conn
71
- buf * buffer
72
- lastMsgPos int
73
- lastErr error
74
- hasTable bool
75
- hasTags bool
76
- hasFields bool
70
+ address string
71
+ tlsMode tlsMode
72
+ keyId string // Erased once auth is done.
73
+ key string // Erased once auth is done.
74
+ bufCap int
75
+ fileNameLimit int
76
+ conn net.Conn
77
+ buf * buffer
78
+ lastMsgPos int
79
+ lastErr error
80
+ hasTable bool
81
+ hasTags bool
82
+ hasFields bool
77
83
}
78
84
79
85
// LineSenderOption defines line sender option.
@@ -126,6 +132,18 @@ func WithBufferCapacity(capacity int) LineSenderOption {
126
132
}
127
133
}
128
134
135
+ // WithFileNameLimit sets maximum file name length in chars
136
+ // allowed by the server. Affects maximum table and column name
137
+ // lengths accepted by the sender. Should be set to the same value
138
+ // as on the server. Defaults to 127.
139
+ func WithFileNameLimit (limit int ) LineSenderOption {
140
+ return func (s * LineSender ) {
141
+ if limit > 0 {
142
+ s .fileNameLimit = limit
143
+ }
144
+ }
145
+ }
146
+
129
147
// NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each
130
148
// sender corresponds to a single TCP connection. Sender should
131
149
// not be called concurrently by multiple goroutines.
@@ -138,9 +156,10 @@ func NewLineSender(ctx context.Context, opts ...LineSenderOption) (*LineSender,
138
156
)
139
157
140
158
s := & LineSender {
141
- address : "127.0.0.1:9009" ,
142
- bufCap : 128 * 1024 ,
143
- tlsMode : noTls ,
159
+ address : "127.0.0.1:9009" ,
160
+ bufCap : defaultBufferCapacity ,
161
+ fileNameLimit : defaultFileNameLimit ,
162
+ tlsMode : noTls ,
144
163
}
145
164
for _ , opt := range opts {
146
165
opt (s )
@@ -375,6 +394,11 @@ func (s *LineSender) writeTableName(str string) error {
375
394
if str == "" {
376
395
return fmt .Errorf ("table name cannot be empty: %w" , ErrInvalidMsg )
377
396
}
397
+ // We use string length in bytes as an approximation. That's to
398
+ // avoid calculating the number of runes.
399
+ if len (str ) > s .fileNameLimit {
400
+ return fmt .Errorf ("table name length exceeds the limit: %w" , ErrInvalidMsg )
401
+ }
378
402
// Since we're interested in ASCII chars, it's fine to iterate
379
403
// through bytes instead of runes.
380
404
for i := 0 ; i < len (str ); i ++ {
@@ -470,6 +494,11 @@ func (s *LineSender) writeColumnName(str string) error {
470
494
if str == "" {
471
495
return fmt .Errorf ("column name cannot be empty: %w" , ErrInvalidMsg )
472
496
}
497
+ // We use string length in bytes as an approximation. That's to
498
+ // avoid calculating the number of runes.
499
+ if len (str ) > s .fileNameLimit {
500
+ return fmt .Errorf ("column name length exceeds the limit: %w" , ErrInvalidMsg )
501
+ }
473
502
// Since we're interested in ASCII chars, it's fine to iterate
474
503
// through bytes instead of runes.
475
504
for i := 0 ; i < len (str ); i ++ {
0 commit comments