Skip to content

Commit ab45784

Browse files
committed
sql: add the ability to execute multiple statements in sql_raw plugins
1 parent e8d6d21 commit ab45784

File tree

10 files changed

+603
-215
lines changed

10 files changed

+603
-215
lines changed

Diff for: docs/modules/components/pages/inputs/sql_raw.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ query: SELECT * FROM footable WHERE user_id = $1;
223223
224224
=== `args_mapping`
225225
226-
A xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of columns specified.
226+
An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.
227227
228228
229229
*Type*: `string`

Diff for: docs/modules/components/pages/outputs/sql_raw.adoc

+30-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,35 @@ output:
115115
]
116116
```
117117
118+
--
119+
Dynamically Creating Tables (PostgreSQL)::
120+
+
121+
--
122+
123+
Here we dynamically create output tables transactionally with inserting a record into the newly created table.
124+
125+
```yaml
126+
output:
127+
processors:
128+
- mapping: |
129+
root = this
130+
# Prevent SQL injection when using unsafe_dynamic_query
131+
meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
132+
sql_raw:
133+
driver: postgres
134+
dsn: postgres://localhost/postgres
135+
unsafe_dynamic_query: true
136+
queries:
137+
- query: |
138+
CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
139+
- query: |
140+
INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
141+
ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
142+
args_mapping: |
143+
root = [ this.id, this.document.string() ]
144+
145+
```
146+
118147
--
119148
======
120149
@@ -260,7 +289,7 @@ args_mapping: root = [ meta("user.id") ]
260289
261290
=== `max_in_flight`
262291
263-
The maximum number of inserts to run in parallel.
292+
The maximum number of statements to execute in parallel.
264293
265294
266295
*Type*: `int`

Diff for: docs/modules/components/pages/processors/sql_raw.adoc

+85-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ sql_raw:
4242
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
4343
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
4444
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
45-
exec_only: false
45+
exec_only: false # No default (optional)
46+
queries: [] # No default (optional)
4647
```
4748
4849
--
@@ -59,7 +60,8 @@ sql_raw:
5960
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
6061
unsafe_dynamic_query: false
6162
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
62-
exec_only: false
63+
exec_only: false # No default (optional)
64+
queries: [] # No default (optional)
6365
init_files: [] # No default (optional)
6466
init_statement: | # No default (optional)
6567
CREATE TABLE IF NOT EXISTS some_table (
@@ -120,6 +122,34 @@ pipeline:
120122
result_map: 'root.foo_rows = this'
121123
```
122124
125+
--
126+
Dynamically Creating Tables (PostgreSQL)::
127+
+
128+
--
129+
130+
Here we query a database for columns of footable that share a `user_id` with the message field `user.id`. A xref:components:processors/branch.adoc[`branch` processor] is used in order to insert the resulting array into the original message at the path `foo_rows`.
131+
132+
```yaml
133+
pipeline:
134+
processors:
135+
- mapping: |
136+
root = this
137+
# Prevent SQL injection when using unsafe_dynamic_query
138+
meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
139+
- sql_raw:
140+
driver: postgres
141+
dsn: postgres://localhost/postgres
142+
unsafe_dynamic_query: true
143+
queries:
144+
- query: |
145+
CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
146+
- query: |
147+
INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
148+
ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
149+
args_mapping: |
150+
root = [ this.id, this.document.string() ]
151+
```
152+
123153
--
124154
======
125155
@@ -272,7 +302,59 @@ Whether the query result should be discarded. When set to `true` the message con
272302
273303
*Type*: `bool`
274304
275-
*Default*: `false`
305+
306+
=== `queries`
307+
308+
A list of statements to run in addition to `query`. When specifying multiple statements, they are all executed within a transaction. The output of the processor is always the last query that runs, unless `exec_only` is true.
309+
310+
311+
*Type*: `array`
312+
313+
314+
=== `queries[].query`
315+
316+
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:
317+
318+
| Driver | Placeholder Style |
319+
|---|---|
320+
| `clickhouse` | Dollar sign |
321+
| `mysql` | Question mark |
322+
| `postgres` | Dollar sign |
323+
| `mssql` | Question mark |
324+
| `sqlite` | Question mark |
325+
| `oracle` | Colon |
326+
| `snowflake` | Question mark |
327+
| `trino` | Question mark |
328+
| `gocosmos` | Colon |
329+
330+
331+
*Type*: `string`
332+
333+
334+
=== `queries[].args_mapping`
335+
336+
An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.
337+
338+
339+
*Type*: `string`
340+
341+
342+
```yml
343+
# Examples
344+
345+
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
346+
347+
args_mapping: root = [ meta("user.id") ]
348+
```
349+
350+
=== `queries[].exec_only`
351+
352+
Whether the query result should be discarded. When set to `true` the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc.
353+
354+
355+
*Type*: `bool`
356+
357+
*Default*: `true`
276358
277359
=== `init_files`
278360

Diff for: internal/impl/sql/conn_fields.go

+17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sync"
2424
"time"
2525

26+
"github.com/redpanda-data/benthos/v4/public/bloblang"
2627
"github.com/redpanda-data/benthos/v4/public/service"
2728
)
2829

@@ -136,6 +137,14 @@ CREATE TABLE IF NOT EXISTS some_table (
136137
}
137138
}
138139

140+
type rawQueryStatement struct {
141+
static string
142+
dynamic *service.InterpolatedString
143+
144+
argsMapping *bloblang.Executor // optional
145+
execOnly bool
146+
}
147+
139148
func rawQueryField() *service.ConfigField {
140149
return service.NewStringField("query").
141150
Description("The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (`?`) whereas others expect incrementing dollar signs (`$1`, `$2`, and so on) or colons (`:1`, `:2` and so on). The style to use is outlined in this table:" + `
@@ -154,6 +163,14 @@ func rawQueryField() *service.ConfigField {
154163
`)
155164
}
156165

166+
func rawQueryArgsMappingField() *service.ConfigField {
167+
return service.NewBloblangField("args_mapping").
168+
Description("An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `query`.").
169+
Example("root = [ this.cat.meow, this.doc.woofs[0] ]").
170+
Example(`root = [ meta("user.id") ]`).
171+
Optional()
172+
}
173+
157174
type connSettings struct {
158175
connMaxLifetime time.Duration
159176
connMaxIdleTime time.Duration

Diff for: internal/impl/sql/input_sql_raw.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
func sqlRawInputConfig() *service.ConfigSpec {
30-
spec := service.NewConfigSpec().
30+
return service.NewConfigSpec().
3131
Beta().
3232
Categories("Services").
3333
Summary("Executes a select query and creates a message for each row received.").
@@ -36,17 +36,9 @@ func sqlRawInputConfig() *service.ConfigSpec {
3636
Field(dsnField).
3737
Field(rawQueryField().
3838
Example("SELECT * FROM footable WHERE user_id = $1;")).
39-
Field(service.NewBloblangField("args_mapping").
40-
Description("A xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of columns specified.").
41-
Example("root = [ this.cat.meow, this.doc.woofs[0] ]").
42-
Example(`root = [ meta("user.id") ]`).
43-
Optional()).
44-
Field(service.NewAutoRetryNacksToggleField())
45-
for _, f := range connFields() {
46-
spec = spec.Field(f)
47-
}
48-
49-
spec = spec.
39+
Field(rawQueryArgsMappingField()).
40+
Field(service.NewAutoRetryNacksToggleField()).
41+
Fields(connFields()...).
5042
Version("4.10.0").
5143
Example("Consumes an SQL table using a query as an input.",
5244
`
@@ -63,7 +55,6 @@ input:
6355
]
6456
`,
6557
)
66-
return spec
6758
}
6859

6960
func init() {

0 commit comments

Comments
 (0)