Skip to content

Commit d2eb328

Browse files
committedMar 7, 2025·
materialize-iceberg: serialize pyspark command inputs to a file
The maximum argument length for an EMR job is actually quite limited, around 10k characters, so if the command input for a job gets very long it will fail. This will happen if any number of significant bindings is associated with a transaction or even a single binding with a large number of fields, since all of the fields and their types need to be provided to the script in a serialized form, in addition to the query to execute. The fix here is to write out the input to a temporary cloud storage file and read that in the PySpark script. Rather than providing the input as an argument, the input is now a URI to the input file.
1 parent a37ee4f commit d2eb328

File tree

6 files changed

+50
-39
lines changed

6 files changed

+50
-39
lines changed
 

‎materialize-iceberg/emr.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func ensureEmrSecret(ctx context.Context, client *ssm.Client, parameterName, wan
8484
return nil
8585
}
8686

87-
func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, statusOutputPrefix, entryPointUri string) error {
87+
func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, workingPrefix, entryPointUri string) error {
8888
/***
8989
Available arguments to the pyspark script:
90-
| --input | Input for the program, as serialized JSON | Required |
90+
| --input-uri | Input for the program, as an s3 URI, to be parsed by the script | Required |
9191
| --status-output | Location where the final status object will be written. | Required |
9292
| --catalog-url | The catalog URL | Required |
9393
| --warehouse | REST Warehouse | Required |
@@ -97,7 +97,7 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, s
9797
***/
9898
getStatus := func() (*python.StatusOutput, error) {
9999
var status python.StatusOutput
100-
statusKey := path.Join(statusOutputPrefix, statusFile)
100+
statusKey := path.Join(workingPrefix, statusFile)
101101
if statusObj, err := t.s3Client.GetObject(ctx, &s3.GetObjectInput{
102102
Bucket: aws.String(t.cfg.Compute.Bucket),
103103
Key: aws.String(statusKey),
@@ -109,14 +109,20 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, s
109109
return &status, nil
110110
}
111111

112-
encodedInput, err := encodeInput(input)
113-
if err != nil {
112+
inputKey := path.Join(workingPrefix, "input.json")
113+
if inputBytes, err := encodeInput(input); err != nil {
114114
return fmt.Errorf("encoding input: %w", err)
115+
} else if _, err := t.s3Client.PutObject(ctx, &s3.PutObjectInput{
116+
Bucket: aws.String(t.cfg.Compute.Bucket),
117+
Key: aws.String(inputKey),
118+
Body: bytes.NewReader(inputBytes),
119+
}); err != nil {
120+
return fmt.Errorf("putting input file object: %w", err)
115121
}
116122

117123
args := []string{
118-
"--input", encodedInput,
119-
"--status-output", "s3://" + path.Join(t.cfg.Compute.Bucket, statusOutputPrefix, statusFile),
124+
"--input-uri", "s3://" + path.Join(t.cfg.Compute.Bucket, inputKey),
125+
"--status-output", "s3://" + path.Join(t.cfg.Compute.Bucket, workingPrefix, statusFile),
120126
"--catalog-url", t.cfg.URL,
121127
"--warehouse", t.cfg.Warehouse,
122128
"--region", t.cfg.Compute.Region,
@@ -184,14 +190,14 @@ func (t *transactor) runEmrJob(ctx context.Context, jobName string, input any, s
184190
}
185191
}
186192

187-
func encodeInput(in any) (string, error) {
193+
func encodeInput(in any) ([]byte, error) {
188194
var buf bytes.Buffer
189195
enc := json.NewEncoder(&buf)
190196
enc.SetAppendNewline(false)
191197
enc.SetEscapeHTML(false)
192198
if err := enc.Encode(in); err != nil {
193-
return "", err
199+
return nil, err
194200
}
195201

196-
return buf.String(), nil
202+
return buf.Bytes(), nil
197203
}

‎materialize-iceberg/python/common.py

+22-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from pyspark.sql import SparkSession
1111
from pyspark.sql.types import (
1212
ArrayType,
13-
BinaryType,
1413
BooleanType,
1514
DataType,
1615
DateType,
@@ -68,7 +67,9 @@ def fields_to_struct(fields: list[NestedField]) -> StructType:
6867
def common_args() -> argparse.Namespace:
6968
parser = argparse.ArgumentParser()
7069
parser.add_argument(
71-
"--input", required=True, help="Input for the program, as serialized JSON."
70+
"--input-uri",
71+
required=True,
72+
help="Location of the program input, as serialized JSON.",
7273
)
7374
parser.add_argument(
7475
"--status-output",
@@ -140,23 +141,33 @@ def get_spark_session(args: argparse.Namespace) -> SparkSession:
140141
def run_with_status(
141142
parsed_args: argparse.Namespace,
142143
fn,
143-
*args,
144-
**kwargs,
145144
):
146-
parsed_url = urlparse(parsed_args.status_output)
147-
bucket_name = parsed_url.netloc
148-
file_path = parsed_url.path.lstrip("/")
145+
input_uri = urlparse(parsed_args.input_uri)
146+
input_bucket_name = input_uri.netloc
147+
input_file_path = input_uri.path.lstrip("/")
148+
149+
output_uri = urlparse(parsed_args.status_output)
150+
output_bucket_name = output_uri.netloc
151+
output_file_path = output_uri.path.lstrip("/")
152+
149153
s3 = boto3.client("s3")
150154

151155
try:
152-
fn(*args, **kwargs)
156+
input = s3.get_object(Bucket=input_bucket_name, Key=input_file_path)
157+
with input["Body"] as body:
158+
input = json.loads(body.read().decode("utf-8"))
159+
s3.delete_object(Bucket=input_bucket_name, Key=input_file_path)
160+
161+
fn(input)
153162
s3.put_object(
154-
Bucket=bucket_name, Key=file_path, Body=json.dumps({"success": True})
163+
Bucket=output_bucket_name,
164+
Key=output_file_path,
165+
Body=json.dumps({"success": True}),
155166
)
156167
except Exception as e:
157168
s3.put_object(
158-
Bucket=bucket_name,
159-
Key=file_path,
169+
Bucket=output_bucket_name,
170+
Key=output_file_path,
160171
Body=json.dumps({"success": False, "error": str(e)}),
161172
)
162173
raise

‎materialize-iceberg/python/load.py

+4-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import json
2-
31
from common import (
42
NestedField,
53
common_args,
@@ -11,14 +9,12 @@
119
args = common_args()
1210
spark = get_spark_session(args)
1311

14-
input = json.loads(args.input)
15-
query = input["query"]
16-
bindings = input["bindings"]
17-
output_location = input["output_location"]
18-
status_output = args.status_output
1912

13+
def run(input):
14+
query = input["query"]
15+
bindings = input["bindings"]
16+
output_location = input["output_location"]
2017

21-
def run():
2218
for binding in bindings:
2319
bindingIdx: int = binding["binding"]
2420
keys: list[NestedField] = [NestedField(**key) for key in binding["keys"]]

‎materialize-iceberg/python/merge.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@
1111
args = common_args()
1212
spark = get_spark_session(args)
1313

14-
input = json.loads(args.input)
15-
bindings = input["bindings"]
1614

17-
18-
def run():
19-
for binding in bindings:
15+
def run(input):
16+
for binding in input["bindings"]:
2017
bindingIdx: int = binding["binding"]
2118
query: str = binding["query"]
2219
columns: list[NestedField] = [NestedField(**col) for col in binding["columns"]]

‎materialize-iceberg/python/python.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ type MergeBinding struct {
2626
}
2727

2828
type MergeInput struct {
29-
Bindings []MergeBinding `json:"bindings"`
30-
OutputLocation string `json:"output_location"`
29+
Bindings []MergeBinding `json:"bindings"`
3130
}
3231

3332
type StatusOutput struct {

‎materialize-iceberg/transactor.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
237237
func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) {
238238
outputPrefix := path.Join(t.cfg.Compute.BucketPath, uuid.NewString())
239239
checkpointClear := make(map[string]*python.MergeBinding)
240-
mergeInput := python.MergeInput{OutputLocation: "s3://" + path.Join(t.cfg.Compute.Bucket, outputPrefix)}
240+
var mergeInput python.MergeInput
241241
var allFileUris []string
242242

243243
for _, b := range t.bindings {
@@ -281,7 +281,6 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
281281
cleanupStatus := t.cleanPrefixOnceFn(ctx, t.cfg.Compute.Bucket, outputPrefix)
282282
defer cleanupStatus()
283283

284-
stateUpdate = &pf.ConnectorState{MergePatch: true}
285284
if err := t.runEmrJob(ctx, fmt.Sprintf("store for: %s", t.materializationName), mergeInput, outputPrefix, t.pyFiles.mergeURI); err != nil {
286285
return nil, fmt.Errorf("store merge job failed: %w", err)
287286
} else if err := cleanupStatus(); err != nil {
@@ -291,7 +290,10 @@ func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
291290
} else if cpUpdate, err := json.Marshal(checkpointClear); err != nil {
292291
return nil, fmt.Errorf("encoding checkpoint update: %w", err)
293292
} else {
294-
stateUpdate.UpdatedJson = cpUpdate
293+
stateUpdate = &pf.ConnectorState{
294+
UpdatedJson: cpUpdate,
295+
MergePatch: true,
296+
}
295297
}
296298
}
297299

0 commit comments

Comments
 (0)
Please sign in to comment.