Skip to content

Commit

Permalink
Implement strategy types (SchemaStore#3608)
Browse files Browse the repository at this point in the history
* Implement strategy types

* fix typo

* prettier

---------

Co-authored-by: Hayssam Saleh <[email protected]>
  • Loading branch information
hayssams and Hayssam Saleh authored Feb 26, 2024
1 parent 6aa1c50 commit 492eff3
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 82 deletions.
208 changes: 134 additions & 74 deletions src/schemas/json/starlake.json
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@
"const": "APPEND",
"description": "Append the data to an existing table or create it if it does not exist"
},
{
"const": "MERGE",
"description": "Merge the data to an existing table or create it if it does not exist. Merge attribute should be defined."
},
{
"const": "ERROR_IF_EXISTS",
"description": "Fail if the table already exist"
Expand Down Expand Up @@ -245,6 +249,36 @@
}
]
},
"WriteStrategyType": {
"description": "How the data is written to the target datawarehouse. Default is APPEND",
"type": "string",
"oneOf": [
{
"const": "APPEND",
"description": "Append the data to an existing table or create it if it does not exist"
},
{
"const": "OVERWRITE",
"description": "That data will overwrite the existing data or create it if it does not exist"
},
{
"const": "UPSERT_BY_KEY",
"description": "Merge the data to an existing table or create it if it does not exist. Key attribute should be defined."
},
{
"const": "UPSERT_BY_KEY_AND_TIMESTAMP",
"description": "Merge the data to an existing table or create it if it does not exist. Key and timestamp attributes should be defined."
},
{
"const": "SCD2",
"description": "Implement Slow changing Dime sio n of type 2. Key and timestamp attributes should be defined."
},
{
"const": "OVERWRITE_BY_PARTITION",
"description": "Dynamic Partition Overwrite: Overwrite the data in the target table partition if the partition exists, append the data otherwise. Partition attribute should be defined."
}
]
},
"Trim": {
"description": "How to trim the input string",
"type": "string",
Expand Down Expand Up @@ -355,24 +389,6 @@
},
"required": ["name", "pattern", "primitiveType"]
},
"Partition": {
"description": "Partition columns, no partitioning by default",
"type": "object",
"properties": {
"sampling": {
"type": "number",
"description": "0.0 means no sampling, > 0 && < 1 means sample dataset, >=1 absolute number of partitions. Used exclusively on Hadoop like warehouses"
},
"attributes": {
"type": "array",
"items": {
"type": "string",
"description": "Attributes used to partition de dataset."
}
}
},
"required": []
},
"Position": {
"description": "First and last char positions of an attribute in a fixed length record",
"type": "object",
Expand All @@ -388,21 +404,6 @@
},
"required": ["first", "last"]
},
"ExpectationItem": {
"description": "Expectation",
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
},
"expect": {
"type": "string",
"description": "What outcome is expected"
}
},
"required": ["query", "expect"]
},
"Connection": {
"description": "Connection properties to a datawarehouse.",
"type": "object",
Expand Down Expand Up @@ -434,7 +435,7 @@
"type": "string",
"description": "Dag template to use for this config. Usually a .py.j2 file"
},
"filnename": {
"filename": {
"type": "string",
"description": "{schedule}, {domain}, {table} in the file name are used for DAG generation purposes"
},
Expand All @@ -443,7 +444,21 @@
"description": "DAG generation options"
}
},
"required": ["type"]
"required": ["template", "filename"]
},
"DagRef": {
"description": "Dag templates for load and transform.",
"type": "object",
"properties": {
"load": {
"type": "string",
"description": "Default DAG template for load operations"
},
"transform": {
"type": "string",
"description": "Default DAG template for transform operations"
}
}
},
"RowLevelSecurity": {
"description": "Row level security policy to apply to the output data.",
Expand Down Expand Up @@ -489,9 +504,13 @@
},
"required": ["role", "grants"]
},
"MergeOptions": {
"WriteStrategy": {
"type": "object",
"properties": {
"type": {
"$ref": "#/definitions/WriteStrategyType",
"description": "Timestamp column used to identify last version, if not specified currently ingested row is considered the last"
},
"key": {
"description": "list of attributes to join an existing and incoming dataset. Use renamed columns if any here.",
"type": "array",
Expand All @@ -506,9 +525,14 @@
"queryFilter": {
"type": "string",
"description": "Useful when you want to merge only on a subset of the existing partitions, thus improving performance and reducing costs.\nYou may use here:\n- Any SQL condition\n- latest which will be translated to the last existing partition\n- column in last(10) which will apply the merge on the last 10 partitions of your dataset.\n last and latest assume that your table is partitioned by day."
},
"on": {
"type": "string",
"description": "Should we check for duplicate in the source and target tables or only in the target one ? SOURCE_AND_TARGET by default.",
"enum": ["SOURCE_AND_TARGET", "TARGET"]
}
},
"required": ["key"]
"required": ["type"]
},
"Format": {
"description": "DSV by default. Supported file formats are :\\n- DSV : Delimiter-separated values file. Delimiter value is specified in the \"separator\" field.\\n- POSITION : FIXED format file where values are located at an exact position in each line.\\n- SIMPLE_JSON : For optimisation purpose, we differentiate JSON with top level values from JSON\\n with deep level fields. SIMPLE_JSON are JSON files with top level fields only.\\n- JSON : Deep JSON file. Use only when your json documents contain sub-documents, otherwise prefer to\\n use SIMPLE_JSON since it is much faster.\\n- XML : XML files",
Expand Down Expand Up @@ -554,13 +578,6 @@
"$ref": "#/definitions/Connection"
}
},
"MapExpectationItem": {
"type": "object",
"description": "Map of connections",
"additionalProperties": {
"$ref": "#/definitions/ExpectationItem"
}
},
"MapJdbcEngine": {
"type": "object",
"description": "Map of jdbc engines",
Expand Down Expand Up @@ -706,10 +723,6 @@
"type": "string",
"description": "ES: Attribute to use as id of the document. Generated by Elasticsearch if not specified."
},
"timestamp": {
"type": "string",
"description": "ES or BQ: The timestamp column to use for table partitioning if any. No partitioning by default\\nES:Timestamp field format as expected by Elasticsearch (\"{beginTs|yyyy.MM.dd}\" for example)."
},
"location": {
"type": "string",
"description": "BQ: Database location (EU, US, ...)"
Expand Down Expand Up @@ -750,8 +763,11 @@
"description": "FS: File extension"
},
"partition": {
"$ref": "#/definitions/Partition",
"description": "FS or BQ: List of partition attributes"
"type": "array",
"items": {
"type": "string",
"description": "FS or BQ: List of partition attributes. ES or BQ: The timestamp column to use for table partitioning if any. No partitioning by default\\nES:Timestamp field format as expected by Elasticsearch (\"{beginTs|yyyy.MM.dd}\" for example)."
}
},
"connectionRef": {
"type": "string",
Expand All @@ -760,10 +776,6 @@
"coalesce": {
"type": "boolean",
"description": "When outputting files, should we coalesce it to a single file. Useful when CSV is the output format."
},
"dynamicPartitionOverwrite": {
"type": "boolean",
"description": "When outputting files, should we overwrite existing partitions ?"
}
}
},
Expand Down Expand Up @@ -796,7 +808,7 @@
},
"withHeader": {
"type": "boolean",
"description": "does the dataset has a header ? true by default"
"description": "does the dataset has a header ? true bu default"
},
"separator": {
"type": "string",
Expand All @@ -810,12 +822,11 @@
"type": "string",
"description": "escaping char '\\' by default"
},
"write": {
"$ref": "#/definitions/WriteMode",
"description": "Write mode, APPEND by default"
},
"partition": {
"$ref": "#/definitions/Partition"
"type": "array",
"items": {
"type": "string"
}
},
"sink": {
"$ref": "#/definitions/Sink"
Expand Down Expand Up @@ -870,6 +881,9 @@
"dagRef": {
"type": "string",
"description": "Cron expression to use for this domain/table"
},
"writeStrategy": {
"$ref": "#/definitions/WriteStrategy"
}
}
},
Expand Down Expand Up @@ -985,9 +999,6 @@
"$ref": "#/definitions/Metadata",
"description": "Dataset metadata"
},
"merge": {
"$ref": "#/definitions/MergeOptions"
},
"comment": {
"type": "string",
"description": "free text"
Expand Down Expand Up @@ -1021,8 +1032,11 @@
}
},
"expectations": {
"$ref": "#/definitions/MapString",
"description": "Expectations to check after Load / Transform has succeeded"
"type": "array",
"items": {
"type": "string"
},
"description": "Expectations to check after Load / Transform has succeeded expectation(params) => condition"
},
"primaryKey": {
"description": "List of columns that make up the primary key",
Expand All @@ -1049,6 +1063,10 @@
"patternSample": {
"description": "Sample of filename matching this schema",
"type": "string"
},
"alias": {
"description": "Alias allowed in SQL queries",
"type": "string"
}
},
"required": ["name", "pattern"]
Expand Down Expand Up @@ -1184,7 +1202,10 @@
}
},
"expectations": {
"$ref": "#/definitions/MapString",
"type": "array",
"items": {
"type": "string"
},
"description": "Expectations to check after Load / Transform has succeeded"
},
"acl": {
Expand Down Expand Up @@ -1220,8 +1241,8 @@
"type": "string"
}
},
"merge": {
"$ref": "#/definitions/MergeOptions"
"writeStrategy": {
"$ref": "#/definitions/WriteStrategy"
},
"schedule": {
"type": "string",
Expand Down Expand Up @@ -1441,6 +1462,9 @@
"columnRemarks": {
"type": "string"
},
"filter": {
"type": "string"
},
"template": {
"type": "string",
"description": "Metadata to use for the generated YAML file."
Expand Down Expand Up @@ -1586,7 +1610,7 @@
"lock": {
"$ref": "#/definitions/Lock"
},
"defaultFormat": {
"defaultWriteFormat": {
"type": "string",
"description": "Default write format in Spark. parquet is the default"
},
Expand Down Expand Up @@ -1690,7 +1714,7 @@
"$ref": "#/definitions/AccessPolicies",
"description": "Access policies configuration"
},
"scheduling": {
"sparkScheduling": {
"$ref": "#/definitions/JobScheduling",
"description": "Spark Job scheduling configuration"
},
Expand Down Expand Up @@ -1769,6 +1793,46 @@
"maxParTask": {
"type": "integer",
"description": "How many job to run simultaneously in dev mode (experimental)"
},
"dagRef": {
"$ref": "#/definitions/DagRef",
"description": "DAG Templates"
},
"forceHalt": {
"type": "boolean",
"description": "Explicitly exit Main using system exit()"
},
"jobIdEnvName": {
"type": "boolean",
"description": "Use this job id instead of the one generated by default"
},
"archiveTablePattern": {
"type": "boolean",
"description": "domain and table name pattern to use. For example: {{domain}}_backup.{{table}}"
},
"archiveTable": {
"type": "boolean",
"description": "Should we save the loaded file as a table in the datawarehouse ?"
},
"version": {
"type": "boolean",
"description": "Set explicit version in application.sl.yml"
},
"autoExportSchema": {
"type": "boolean",
"description": "Export schema of transforms into the load/external folder ? Useful in dev mode for auto completion in the VS Code plugin"
},
"longJobTimeoutMs": {
"type": "integer",
"description": ""
},
"shortJobTimeoutMs": {
"type": "integer",
"description": ""
},
"createSchemaIfNotExists": {
"type": "boolean",
"description": "Should we create the schema if it does not already exist ?"
}
}
}
Expand Down Expand Up @@ -1843,10 +1907,6 @@
"env": {
"$ref": "#/definitions/MapString"
},
"expectations": {
"$ref": "#/definitions/MapExpectationItem",
"description": "Expectations library defined as a map name(params) -> sql request that should return 0 record"
},
"table": {
"$ref": "#/definitions/Table"
},
Expand Down
Loading

0 comments on commit 492eff3

Please sign in to comment.