Skip to content

Commit 1d77d58

Browse files
authored
SQL support for unsetting replication policy (#297)
## Summary Replication policy allows users to set configs for table replication. While updates can be done on it, there is not provision to delete the config in case users want to disable the replication process. This PR introduces SQL support for purging the replication policy. Syntax: `ALTER table <db>.<table> UNSET policy (REPLICATION)` The syntax is inline with iceberg support for unsetting the table properties. ref: https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table ## Changes - [x] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done Docker testing steps: 1. Create table with replication, retention policy using alter table statement 2. Unset replication policy. Show tblProperties has policy Json as: ``` |policies |{ "retention": { "count": 20, "granularity": "DAY", "columnPattern": { "columnName": "name", "pattern": "'yyyy-MM-dd-HH-mm'" } }, "sharingEnabled": false, "replication": { "config": [] } }| ``` 3. Set table replication again. Check tblProperties: ``` |policies |{ "retention": { "count": 20, "granularity": "DAY", "columnPattern": { "columnName": "name", "pattern": "'yyyy-MM-dd'" } }, "sharingEnabled": false, "replication": { "config": [ { "destination": "'WAR'", "interval": "12H", "cronSchedule": "0 30 13/12 ? * * *" } ] } }| ``` 4. Update retention policy to validate any other policy can be updated as expected. <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent d663a43 commit 1d77d58

File tree

12 files changed

+199
-3
lines changed

12 files changed

+199
-3
lines changed

Diff for: integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/catalogtest/CatalogOperationTest.java

+67
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package com.linkedin.openhouse.spark.catalogtest;
22

33
import com.google.common.collect.Sets;
4+
import com.google.gson.Gson;
5+
import com.google.gson.GsonBuilder;
6+
import com.linkedin.openhouse.gen.tables.client.model.Policies;
47
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
58
import java.util.HashMap;
9+
import java.util.List;
610
import java.util.Map;
711
import java.util.concurrent.atomic.AtomicReference;
12+
import java.util.stream.Collectors;
813
import org.apache.iceberg.CatalogUtil;
914
import org.apache.iceberg.DataFile;
1015
import org.apache.iceberg.DataFiles;
@@ -15,6 +20,7 @@
1520
import org.apache.iceberg.catalog.Catalog;
1621
import org.apache.iceberg.catalog.TableIdentifier;
1722
import org.apache.iceberg.types.Types;
23+
import org.apache.spark.sql.Row;
1824
import org.apache.spark.sql.SparkSession;
1925
import org.apache.spark.sql.types.DateType;
2026
import org.apache.spark.sql.types.StructField;
@@ -127,6 +133,58 @@ public void testCreateReplicaSkipFieldIdReassignmentUnPartitionedTable() throws
127133
}
128134
}
129135

136+
@Test
137+
public void testAlterTableUnsetReplicationPolicy() throws Exception {
138+
try (SparkSession spark = getSparkSession()) {
139+
spark.sql("CREATE TABLE openhouse.d1.`ttt1` (name string)");
140+
spark.sql("INSERT INTO openhouse.d1.ttt1 VALUES ('foo')");
141+
spark.sql(
142+
"ALTER TABLE openhouse.d1.ttt1 SET POLICY (REPLICATION=({destination:'WAR', interval:12h}))");
143+
spark.sql(
144+
"ALTER TABLE openhouse.d1.ttt1 SET POLICY (RETENTION= 30d on column name where pattern='yyyy-MM-dd')");
145+
Policies policies = getPoliciesObj("openhouse.d1.ttt1", spark);
146+
Assertions.assertNotNull(policies);
147+
Assertions.assertEquals(
148+
"'WAR'", policies.getReplication().getConfig().get(0).getDestination());
149+
Assertions.assertNotNull(policies.getRetention());
150+
Assertions.assertEquals(
151+
"'yyyy-MM-dd'", policies.getRetention().getColumnPattern().getPattern());
152+
153+
// unset replication policy
154+
spark.sql("ALTER TABLE openhouse.d1.ttt1 UNSET POLICY (REPLICATION)");
155+
Policies updatedPolicy = getPoliciesObj("openhouse.d1.ttt1", spark);
156+
Assertions.assertEquals(updatedPolicy.getReplication().getConfig().size(), 0);
157+
// assert that other policies, retention is not modified after unsetting replication
158+
Assertions.assertNotNull(updatedPolicy.getRetention());
159+
Assertions.assertEquals(
160+
"'yyyy-MM-dd'", updatedPolicy.getRetention().getColumnPattern().getPattern());
161+
162+
// assert retention can be set after unsetting replication
163+
spark.sql(
164+
"ALTER TABLE openhouse.d1.ttt1 SET POLICY (RETENTION = 30D on COLUMN name WHERE pattern = 'yyyy')");
165+
Policies policyWithRetention = getPoliciesObj("openhouse.d1.ttt1", spark);
166+
Assertions.assertNotNull(policyWithRetention);
167+
Assertions.assertEquals(
168+
"'yyyy'", policyWithRetention.getRetention().getColumnPattern().getPattern());
169+
Assertions.assertEquals(0, policyWithRetention.getReplication().getConfig().size());
170+
171+
// assert replication can be set again after retention policy
172+
spark.sql(
173+
"ALTER TABLE openhouse.d1.ttt1 SET POLICY (REPLICATION=({destination:'WAR', interval:12h}))");
174+
Policies policyWithReplication = getPoliciesObj("openhouse.d1.ttt1", spark);
175+
Assertions.assertNotNull(policyWithReplication);
176+
Assertions.assertEquals(
177+
"'WAR'", policyWithReplication.getReplication().getConfig().get(0).getDestination());
178+
179+
// UNSET policy for table without replication
180+
spark.sql("CREATE TABLE openhouse.d1.`tttest1` (name string)");
181+
spark.sql("INSERT INTO openhouse.d1.tttest1 VALUES ('foo')");
182+
spark.sql("ALTER TABLE openhouse.d1.tttest1 UNSET POLICY (REPLICATION)");
183+
Policies policytttest1 = getPoliciesObj("openhouse.d1.tttest1", spark);
184+
Assertions.assertEquals(0, policytttest1.getReplication().getConfig().size());
185+
}
186+
}
187+
130188
@Test
131189
public void testCreateReplicaSkipFieldIdReassignmentPartitionedTable() throws Exception {
132190
try (SparkSession spark = getSparkSession()) {
@@ -185,4 +243,13 @@ private Catalog getOpenHouseCatalog(SparkSession spark) {
185243
catalogProperties,
186244
spark.sparkContext().hadoopConfiguration());
187245
}
246+
247+
private Policies getPoliciesObj(String tableName, SparkSession spark) {
248+
List<Row> props = spark.sql(String.format("show tblProperties %s", tableName)).collectAsList();
249+
Map<String, String> collect =
250+
props.stream().collect(Collectors.toMap(r -> r.getString(0), r -> r.getString(1)));
251+
String policiesStr = String.valueOf(collect.get("policies"));
252+
Gson gson = new GsonBuilder().setPrettyPrinting().create();
253+
return gson.fromJson(policiesStr, Policies.class);
254+
}
188255
}

Diff for: integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ public void testSimpleSetReplicationPolicy() {
6060
assert isPlanValid(ds, replicationConfigJson);
6161
}
6262

63+
@Test
64+
public void testSimpleUnSetReplicationPolicy() {
65+
String replicationConfigJson = "{replication: null}";
66+
Dataset<Row> ds = spark.sql("ALTER TABLE openhouse.db.table UNSET POLICY (REPLICATION)");
67+
assert isUnSetPlanValid(ds, replicationConfigJson);
68+
}
69+
6370
@Test
6471
public void testSimpleSetReplicationPolicyOptionalInterval() {
6572
// Test with optional interval
@@ -241,4 +248,11 @@ private boolean isPlanValid(Dataset<Row> dataframe, String replicationConfigJson
241248
}
242249
return isValid;
243250
}
251+
252+
@SneakyThrows
253+
private boolean isUnSetPlanValid(Dataset<Row> dataframe, String replicationConfigJson) {
254+
String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple"));
255+
JsonObject json = new Gson().fromJson(replicationConfigJson, JsonObject.class);
256+
return queryStr.contains("REPLICATION") && json.has("replication");
257+
}
244258
}

Diff for: integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4

+7-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ singleStatement
2525
statement
2626
: ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy
2727
| ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy
28+
| ALTER TABLE multipartIdentifier UNSET POLICY '(' replication ')' #unSetReplicationPolicy
2829
| ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy
2930
| ALTER TABLE multipartIdentifier SET POLICY '(' historyPolicy ')' #setHistoryPolicy
3031
| ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag
@@ -86,8 +87,12 @@ columnRetentionPolicy
8687
: ON columnNameClause (columnRetentionPolicyPatternClause)?
8788
;
8889

90+
replication
91+
: REPLICATION
92+
;
93+
8994
replicationPolicy
90-
: REPLICATION '=' tableReplicationPolicy
95+
: replication '=' tableReplicationPolicy
9196
;
9297

9398
tableReplicationPolicy
@@ -170,6 +175,7 @@ versions
170175
ALTER: 'ALTER';
171176
TABLE: 'TABLE';
172177
SET: 'SET';
178+
UNSET: 'UNSET';
173179
POLICY: 'POLICY';
174180
RETENTION: 'RETENTION';
175181
REPLICATION: 'REPLICATION';

Diff for: integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSparkSqlExtensionsParser.scala

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class OpenhouseSparkSqlExtensionsParser (delegate: ParserInterface) extends Pars
6767
.trim()
6868
(normalized.startsWith("alter table") &&
6969
(normalized.contains("set policy")) ||
70+
(normalized.contains("unset policy")) ||
7071
(normalized.contains("modify column") &&
7172
normalized.contains("set tag"))) ||
7273
normalized.startsWith("grant") ||

Diff for: integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions
22

33
import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes
44
import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._
5-
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
5+
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement, UnSetReplicationPolicy}
66
import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType
77
import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec
88
import org.antlr.v4.runtime.tree.ParseTree
@@ -33,6 +33,12 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
3333
SetReplicationPolicy(tableName, replicationPolicies)
3434
}
3535

36+
override def visitUnSetReplicationPolicy(ctx: UnSetReplicationPolicyContext): UnSetReplicationPolicy = {
37+
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
38+
val replicationPolicies = typedVisit[String](ctx.replication())
39+
UnSetReplicationPolicy(tableName, replicationPolicies)
40+
}
41+
3642
override def visitSetSharingPolicy(ctx: SetSharingPolicyContext): SetSharingPolicy = {
3743
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
3844
val sharing = typedVisit[String](ctx.sharingPolicy())
@@ -132,6 +138,11 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
132138
ctx.retentionColumnPatternClause().STRING().getText
133139
}
134140

141+
override def visitReplication(ctx: ReplicationContext): String =
142+
{
143+
ctx.REPLICATION().getText
144+
}
145+
135146
override def visitSharingPolicy(ctx: SharingPolicyContext): String = {
136147
ctx.BOOLEAN().getText
137148
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical
2+
3+
import org.apache.spark.sql.catalyst.plans.logical.Command
4+
5+
case class UnSetReplicationPolicy(tableName: Seq[String], replicationPolicies: String) extends Command {
6+
override def simpleString(maxFields: Int): String = {
7+
s"UnSetReplicationPolicy: ${tableName}"
8+
}
9+
}

Diff for: integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.linkedin.openhouse.spark.sql.execution.datasources.v2
22

3-
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
3+
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement, UnSetReplicationPolicy}
44
import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog}
55
import org.apache.spark.sql.{SparkSession, Strategy}
66
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
@@ -17,6 +17,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w
1717
SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil
1818
case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) =>
1919
SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil
20+
case UnSetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) =>
21+
UnSetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil
2022
case SetHistoryPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, maxAge, versions) =>
2123
SetHistoryPolicyExec(catalog, ident, granularity, maxAge, versions) :: Nil
2224
case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.linkedin.openhouse.spark.sql.execution.datasources.v2
2+
3+
import org.apache.iceberg.spark.source.SparkTable
4+
import org.apache.spark.sql.catalyst.InternalRow
5+
import org.apache.spark.sql.catalyst.expressions.Attribute
6+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
7+
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
8+
9+
case class UnSetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replication: (String)) extends V2CommandExec{
10+
override protected def run(): Seq[InternalRow] = {
11+
catalog.loadTable(ident) match {
12+
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
13+
val key = "updated.openhouse.policy"
14+
val value = s"""{"replication": {}}"""
15+
iceberg.table().updateProperties()
16+
.set(key, value)
17+
.commit()
18+
19+
case table =>
20+
throw new UnsupportedOperationException(s"Cannot unset replication policy for non-Openhouse table: $table")
21+
}
22+
Nil
23+
}
24+
25+
override def output: Seq[Attribute] = Nil
26+
}

Diff for: integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSparkSqlExtensionsParser.scala

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class OpenhouseSparkSqlExtensionsParser (delegate: ParserInterface) extends Pars
6767
.trim()
6868
(normalized.startsWith("alter table") &&
6969
(normalized.contains("set policy")) ||
70+
(normalized.contains("unset policy")) ||
7071
(normalized.contains("modify column") &&
7172
normalized.contains("set tag"))) ||
7273
normalized.startsWith("grant") ||
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical
2+
3+
import org.apache.spark.sql.catalyst.plans.logical.LeafCommand
4+
5+
case class UnSetReplicationPolicy(tableName: Seq[String], replicationPolicies: String) extends LeafCommand {
6+
override def simpleString(maxFields: Int): String = {
7+
s"UnSetReplicationPolicy: ${tableName}"
8+
}
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.linkedin.openhouse.spark.sql.execution.datasources.v2
2+
3+
import org.apache.iceberg.spark.source.SparkTable
4+
import org.apache.spark.sql.catalyst.InternalRow
5+
import org.apache.spark.sql.catalyst.expressions.Attribute
6+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
7+
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
8+
9+
case class UnSetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replication: (String)) extends LeafV2CommandExec{
10+
override protected def run(): Seq[InternalRow] = {
11+
catalog.loadTable(ident) match {
12+
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
13+
val key = "updated.openhouse.policy"
14+
val value = s"""{"replication": {}}"""
15+
iceberg.table().updateProperties()
16+
.set(key, value)
17+
.commit()
18+
19+
case table =>
20+
throw new UnsupportedOperationException(s"Cannot unset replication policy for non-Openhouse table: $table")
21+
}
22+
Nil
23+
}
24+
25+
override def output: Seq[Attribute] = Nil
26+
}

Diff for: services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,30 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
10631063
Assertions.assertTrue(
10641064
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));
10651065

1066+
Replication nullReplication = Replication.builder().config(new ArrayList<>()).build();
1067+
Policies newPoliciesNullRepl = Policies.builder().replication(nullReplication).build();
1068+
1069+
GetTableResponseBody newContainer =
1070+
GetTableResponseBody.builder().policies(newPoliciesNullRepl).build();
1071+
GetTableResponseBody addNullProp = buildGetTableResponseBody(mvcResult, newContainer);
1072+
mvcResult =
1073+
mvc.perform(
1074+
MockMvcRequestBuilders.put(
1075+
String.format(
1076+
ValidationUtilities.CURRENT_MAJOR_VERSION_PREFIX
1077+
+ "/databases/%s/tables/%s",
1078+
addProp.getDatabaseId(),
1079+
addProp.getTableId()))
1080+
.contentType(MediaType.APPLICATION_JSON)
1081+
.content(buildCreateUpdateTableRequestBody(addNullProp).toJson())
1082+
.accept(MediaType.APPLICATION_JSON))
1083+
.andExpect(status().isOk())
1084+
.andReturn();
1085+
1086+
LinkedHashMap<String, String> updatedNullReplication =
1087+
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies.replication");
1088+
1089+
Assertions.assertTrue(updatedNullReplication.containsKey("config"));
10661090
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
10671091
}
10681092

0 commit comments

Comments
 (0)