Skip to content

Commit 83be317

Browse files
committed
sql: add the ability to execute multiple statements in sql_raw plugins
1 parent e8d6d21 commit 83be317

10 files changed

+653
-220
lines changed

docs/modules/components/pages/inputs/sql_raw.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ query: SELECT * FROM footable WHERE user_id = $1;
223223
224224
=== `args_mapping`
225225
226-
A xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of columns specified.
226+
An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.
227227
228228
229229
*Type*: `string`

docs/modules/components/pages/outputs/sql_raw.adoc

+78-3
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ output:
4141
sql_raw:
4242
driver: "" # No default (required)
4343
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
44-
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
44+
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (optional)
4545
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
46+
queries: [] # No default (optional)
4647
max_in_flight: 64
4748
batching:
4849
count: 0
@@ -63,9 +64,10 @@ output:
6364
sql_raw:
6465
driver: "" # No default (required)
6566
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
66-
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
67+
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (optional)
6768
unsafe_dynamic_query: false
6869
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
70+
queries: [] # No default (optional)
6971
max_in_flight: 64
7072
init_files: [] # No default (optional)
7173
init_statement: | # No default (optional)
@@ -115,6 +117,35 @@ output:
115117
]
116118
```
117119
120+
--
121+
Dynamically Creating Tables (PostgreSQL)::
122+
+
123+
--
124+
125+
Here we dynamically create output tables transactionally with inserting a record into the newly created table.
126+
127+
```yaml
128+
output:
129+
processors:
130+
- mapping: |
131+
root = this
132+
# Prevent SQL injection when using unsafe_dynamic_query
133+
meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
134+
sql_raw:
135+
driver: postgres
136+
dsn: postgres://localhost/postgres
137+
unsafe_dynamic_query: true
138+
queries:
139+
- query: |
140+
CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
141+
- query: |
142+
INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
143+
ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
144+
args_mapping: |
145+
root = [ this.id, this.document.string() ]
146+
147+
```
148+
118149
--
119150
======
120151
@@ -250,6 +281,50 @@ An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evalu
250281
*Type*: `string`
251282
252283
284+
```yml
285+
# Examples
286+
287+
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
288+
289+
args_mapping: root = [ meta("user.id") ]
290+
```
291+
292+
=== `queries`
293+
294+
A list of statements to run in addition to `query`. When specifying multiple statements, they are all executed within a transaction.
295+
296+
297+
*Type*: `array`
298+
299+
300+
=== `queries[].query`
301+
302+
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:
303+
304+
| Driver | Placeholder Style |
305+
|---|---|
306+
| `clickhouse` | Dollar sign |
307+
| `mysql` | Question mark |
308+
| `postgres` | Dollar sign |
309+
| `mssql` | Question mark |
310+
| `sqlite` | Question mark |
311+
| `oracle` | Colon |
312+
| `snowflake` | Question mark |
313+
| `trino` | Question mark |
314+
| `gocosmos` | Colon |
315+
316+
317+
*Type*: `string`
318+
319+
320+
=== `queries[].args_mapping`
321+
322+
An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.
323+
324+
325+
*Type*: `string`
326+
327+
253328
```yml
254329
# Examples
255330
@@ -260,7 +335,7 @@ args_mapping: root = [ meta("user.id") ]
260335
261336
=== `max_in_flight`
262337
263-
The maximum number of inserts to run in parallel.
338+
The maximum number of statements to execute in parallel.
264339
265340
266341
*Type*: `int`

docs/modules/components/pages/processors/sql_raw.adoc

+87-6
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ label: ""
4040
sql_raw:
4141
driver: "" # No default (required)
4242
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
43-
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
43+
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (optional)
4444
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
45-
exec_only: false
45+
exec_only: false # No default (optional)
46+
queries: [] # No default (optional)
4647
```
4748
4849
--
@@ -56,10 +57,11 @@ label: ""
5657
sql_raw:
5758
driver: "" # No default (required)
5859
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
59-
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
60+
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (optional)
6061
unsafe_dynamic_query: false
6162
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
62-
exec_only: false
63+
exec_only: false # No default (optional)
64+
queries: [] # No default (optional)
6365
init_files: [] # No default (optional)
6466
init_statement: | # No default (optional)
6567
CREATE TABLE IF NOT EXISTS some_table (
@@ -120,6 +122,34 @@ pipeline:
120122
result_map: 'root.foo_rows = this'
121123
```
122124
125+
--
126+
Dynamically Creating Tables (PostgreSQL)::
127+
+
128+
--
129+
130+
Here we query a database for columns of footable that share a `user_id` with the message field `user.id`. A xref:components:processors/branch.adoc[`branch` processor] is used in order to insert the resulting array into the original message at the path `foo_rows`.
131+
132+
```yaml
133+
pipeline:
134+
processors:
135+
- mapping: |
136+
root = this
137+
# Prevent SQL injection when using unsafe_dynamic_query
138+
meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
139+
- sql_raw:
140+
driver: postgres
141+
dsn: postgres://localhost/postgres
142+
unsafe_dynamic_query: true
143+
queries:
144+
- query: |
145+
CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
146+
- query: |
147+
INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
148+
ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
149+
args_mapping: |
150+
root = [ this.id, this.document.string() ]
151+
```
152+
123153
--
124154
======
125155
@@ -267,12 +297,63 @@ args_mapping: root = [ meta("user.id") ]
267297
268298
=== `exec_only`
269299
270-
Whether the query result should be discarded. When set to `true` the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc.
300+
Whether the query result should be discarded. When set to `true` the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc. By default this is true for the last query, and previous queries don't change the results. If set to true for any query but the last one, the subsequent `args_mappings` input is overwritten.
301+
302+
303+
*Type*: `bool`
304+
305+
306+
=== `queries`
307+
308+
A list of statements to run in addition to `query`. When specifying multiple statements, they are all executed within a transaction. The output of the processor is always the last query that runs, unless `exec_only` is used.
309+
310+
311+
*Type*: `array`
312+
313+
314+
=== `queries[].query`
315+
316+
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:
317+
318+
| Driver | Placeholder Style |
319+
|---|---|
320+
| `clickhouse` | Dollar sign |
321+
| `mysql` | Question mark |
322+
| `postgres` | Dollar sign |
323+
| `mssql` | Question mark |
324+
| `sqlite` | Question mark |
325+
| `oracle` | Colon |
326+
| `snowflake` | Question mark |
327+
| `trino` | Question mark |
328+
| `gocosmos` | Colon |
329+
330+
331+
*Type*: `string`
332+
333+
334+
=== `queries[].args_mapping`
335+
336+
An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.
337+
338+
339+
*Type*: `string`
340+
341+
342+
```yml
343+
# Examples
344+
345+
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
346+
347+
args_mapping: root = [ meta("user.id") ]
348+
```
349+
350+
=== `queries[].exec_only`
351+
352+
Whether the query result should be discarded. When set to `true` the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc. By default this is true for the last query, and previous queries don't change the results. If set to true for any query but the last one, the subsequent `args_mappings` input is overwritten.
271353
272354
273355
*Type*: `bool`
274356
275-
*Default*: `false`
276357
277358
=== `init_files`
278359

internal/impl/sql/conn_fields.go

+17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sync"
2424
"time"
2525

26+
"github.com/redpanda-data/benthos/v4/public/bloblang"
2627
"github.com/redpanda-data/benthos/v4/public/service"
2728
)
2829

@@ -136,6 +137,14 @@ CREATE TABLE IF NOT EXISTS some_table (
136137
}
137138
}
138139

140+
type rawQueryStatement struct {
141+
static string
142+
dynamic *service.InterpolatedString
143+
144+
argsMapping *bloblang.Executor // optional
145+
execOnly bool
146+
}
147+
139148
func rawQueryField() *service.ConfigField {
140149
return service.NewStringField("query").
141150
Description("The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:" + `
@@ -154,6 +163,14 @@ func rawQueryField() *service.ConfigField {
154163
`)
155164
}
156165

166+
func rawQueryArgsMappingField() *service.ConfigField {
167+
return service.NewBloblangField("args_mapping").
168+
Description("An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.").
169+
Example("root = [ this.cat.meow, this.doc.woofs[0] ]").
170+
Example(`root = [ meta("user.id") ]`).
171+
Optional()
172+
}
173+
157174
type connSettings struct {
158175
connMaxLifetime time.Duration
159176
connMaxIdleTime time.Duration

internal/impl/sql/input_sql_raw.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
func sqlRawInputConfig() *service.ConfigSpec {
30-
spec := service.NewConfigSpec().
30+
return service.NewConfigSpec().
3131
Beta().
3232
Categories("Services").
3333
Summary("Executes a select query and creates a message for each row received.").
@@ -36,17 +36,9 @@ func sqlRawInputConfig() *service.ConfigSpec {
3636
Field(dsnField).
3737
Field(rawQueryField().
3838
Example("SELECT * FROM footable WHERE user_id = $1;")).
39-
Field(service.NewBloblangField("args_mapping").
40-
Description("A xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of columns specified.").
41-
Example("root = [ this.cat.meow, this.doc.woofs[0] ]").
42-
Example(`root = [ meta("user.id") ]`).
43-
Optional()).
44-
Field(service.NewAutoRetryNacksToggleField())
45-
for _, f := range connFields() {
46-
spec = spec.Field(f)
47-
}
48-
49-
spec = spec.
39+
Field(rawQueryArgsMappingField()).
40+
Field(service.NewAutoRetryNacksToggleField()).
41+
Fields(connFields()...).
5042
Version("4.10.0").
5143
Example("Consumes an SQL table using a query as an input.",
5244
`
@@ -63,7 +55,6 @@ input:
6355
]
6456
`,
6557
)
66-
return spec
6758
}
6859

6960
func init() {

0 commit comments

Comments
 (0)