Skip to content

Commit 3c58268

Browse files
authored
Replication job to trigger setup and carbon flow for replica tables [WIP] (#276)
## Summary New job workflow to run Replication setup process on Airflow. Applies to primary tables with defined ReplicationConfig. Design decisions: 1. The Replication job does not use JobsClient to trigger a job. Instead it will use Airflow client to trigger and manage lifecycle of a Airflow job. 2. Replication will have the task definition in Li-Openhouse side since it needs to leverage AirflowClient. 3. As there can be multiple ReplicationConfigs for a table. The ReplicationTask goes over each config sequentially to trigger a setup job corresponding to the config. Future work: 1. Develop AirflowClient which will allow triggering, managing state of Airflow jobs and integrate with Replication job run. 2. Develop CarbonClient which can trigger carbon jobs to setup scheduled replication flows. 3. Integrate CarbonClient with Replica table setup job ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. Tested on Local docker setup: Ran the new Replication Job with Local Docker setup. Added a table with replicationConfig. Observed: 1. The new job gets picked up by the JobScheduler and follows the task flow 2. Only Primary tables with defined replicationConfig are considered and others are filtered out <img width="1900" alt="Screenshot 2025-01-08 at 10 17 11 PM" src="https://github.com/user-attachments/assets/e46b210a-650c-43f5-9759-840257206595" /> <img width="1598" alt="Screenshot 2025-01-08 at 3 30 15 PM" src="https://github.com/user-attachments/assets/8ae869c1-5ebc-4fd0-b95a-ae5af2c03b74" /> For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request.
1 parent 3e8d387 commit 3c58268

File tree

6 files changed

+116
-1
lines changed

6 files changed

+116
-1
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java

+33
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
55
import com.linkedin.openhouse.jobs.util.DatabaseTableFilter;
66
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
7+
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
78
import com.linkedin.openhouse.jobs.util.RetentionConfig;
89
import com.linkedin.openhouse.jobs.util.RetryUtil;
910
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
@@ -15,6 +16,7 @@
1516
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
1617
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
1718
import com.linkedin.openhouse.tables.client.model.Policies;
19+
import com.linkedin.openhouse.tables.client.model.Replication;
1820
import java.time.Duration;
1921
import java.util.AbstractMap;
2022
import java.util.ArrayList;
@@ -56,6 +58,11 @@ public Optional<RetentionConfig> getTableRetention(TableMetadata tableMetadata)
5658
return getTableRetention(response);
5759
}
5860

61+
public Optional<List<ReplicationConfig>> getTableReplication(TableMetadata tableMetadata) {
62+
GetTableResponseBody response = getTable(tableMetadata);
63+
return getTableReplication(response);
64+
}
65+
5966
private Optional<RetentionConfig> getTableRetention(GetTableResponseBody response) {
6067
// timePartitionSpec or retention.ColumnPattern should be present to run Retention job on a
6168
// table.
@@ -86,6 +93,31 @@ private Optional<RetentionConfig> getTableRetention(GetTableResponseBody respons
8693
.build());
8794
}
8895

96+
private Optional<List<ReplicationConfig>> getTableReplication(GetTableResponseBody response) {
97+
// At least one replication config must be present
98+
if (response == null
99+
|| response.getPolicies() == null
100+
|| response.getPolicies().getReplication() == null
101+
|| response.getPolicies().getReplication().getConfig().size() <= 0) {
102+
return Optional.empty();
103+
}
104+
List<ReplicationConfig> replicationConfigList = new ArrayList<>();
105+
Replication replication = response.getPolicies().getReplication();
106+
List<com.linkedin.openhouse.tables.client.model.ReplicationConfig> replicationConfig =
107+
replication.getConfig();
108+
109+
replicationConfig.forEach(
110+
rc ->
111+
replicationConfigList.add(
112+
ReplicationConfig.builder()
113+
.cluster(rc.getDestination())
114+
.tableOwner(response.getTableCreator())
115+
.schedule(rc.getCronSchedule())
116+
.build()));
117+
// since replicationConfigList is initialized, it cannot be null.
118+
return Optional.of(replicationConfigList);
119+
}
120+
89121
protected GetTableResponseBody getTable(TableMetadata tableMetadata) {
90122
return getTable(tableMetadata.getDbName(), tableMetadata.getTableName());
91123
}
@@ -281,6 +313,7 @@ protected Optional<TableMetadata> mapTableResponseToTableMetadata(
281313
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
282314
.isClustered(tableResponseBody.getClustering() != null)
283315
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
316+
.replicationConfig(getTableReplication(tableResponseBody).orElse(null))
284317
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody));
285318
builder.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
286319
return Optional.of(builder.build());

apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java

+15
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ private List<OperationTask<?>> prepareTableOperationTaskList(JobConf.JobTypeEnum
4848
return processMetadataList(tableMetadataList, jobType);
4949
}
5050

51+
private List<OperationTask<?>> prepareReplicationOperationTaskList(JobConf.JobTypeEnum jobType) {
52+
List<TableMetadata> replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
53+
// filters tables which are primary and hava replication config defined
54+
replicationSetupTableMetadataList =
55+
replicationSetupTableMetadataList.stream()
56+
.filter(m -> m.isPrimary() && (m.getReplicationConfig() != null))
57+
.collect(Collectors.toList());
58+
log.info(
59+
"Fetched metadata for {} tables for replication setup task",
60+
replicationSetupTableMetadataList.size());
61+
return processMetadataList(replicationSetupTableMetadataList, jobType);
62+
}
63+
5164
private List<OperationTask<?>> prepareTableDirectoryOperationTaskList(
5265
JobConf.JobTypeEnum jobType) {
5366
List<DirectoryMetadata> directoryMetadataList = tablesClient.getOrphanTableDirectories();
@@ -152,6 +165,8 @@ public List<OperationTask<?>> buildOperationTaskList(
152165
case STAGED_FILES_DELETION:
153166
case DATA_LAYOUT_STRATEGY_GENERATION:
154167
return prepareTableOperationTaskList(jobType);
168+
case REPLICATION:
169+
return prepareReplicationOperationTaskList(jobType);
155170
case DATA_LAYOUT_STRATEGY_EXECUTION:
156171
return prepareDataLayoutOperationTaskList(jobType, properties, meter);
157172
case ORPHAN_DIRECTORY_DELETION:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.linkedin.openhouse.jobs.util;
2+
3+
import lombok.Builder;
4+
import lombok.EqualsAndHashCode;
5+
import lombok.Getter;
6+
import lombok.ToString;
7+
8+
/** Table retention config class. This is app side representation of /tables policies->retention */
9+
@Builder
10+
@Getter
11+
@EqualsAndHashCode
12+
@ToString
13+
public class ReplicationConfig {
14+
private final String schedule;
15+
private final String tableOwner;
16+
private final String cluster;
17+
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadata.java

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.openhouse.jobs.util;
22

33
import java.util.HashMap;
4+
import java.util.List;
45
import java.util.Map;
56
import javax.annotation.Nullable;
67
import lombok.Builder;
@@ -25,6 +26,7 @@ public class TableMetadata extends Metadata {
2526
@Builder.Default protected @NonNull Map<String, String> jobExecutionProperties = new HashMap<>();
2627
protected @Nullable RetentionConfig retentionConfig;
2728
protected @Nullable HistoryConfig historyConfig;
29+
protected @Nullable List<ReplicationConfig> replicationConfig;
2830

2931
public String fqtn() {
3032
return String.format("%s.%s", dbName, tableName);

apps/spark/src/test/java/com/linkedin/openhouse/jobs/clients/TablesClientTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
1919
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
2020
import com.linkedin.openhouse.tables.client.model.Policies;
21+
import com.linkedin.openhouse.tables.client.model.Replication;
22+
import com.linkedin.openhouse.tables.client.model.ReplicationConfig;
2123
import com.linkedin.openhouse.tables.client.model.Retention;
2224
import com.linkedin.openhouse.tables.client.model.RetentionColumnPattern;
2325
import com.linkedin.openhouse.tables.client.model.TimePartitionSpec;
@@ -415,6 +417,33 @@ void testNonPartitionedTableWithPatternGetRetentionConfig() {
415417
Mockito.verify(apiMock, Mockito.times(1)).getTableV1(testDbName, testTableNamePartitioned);
416418
}
417419

420+
@Test
421+
void testPrimaryTableWithReplicationConfig() {
422+
GetTableResponseBody primaryTableWithReplicationConfigResponseBodyMock =
423+
createPrimaryTableWithReplicationPolicyResponseBodyMock(
424+
testDbName, testTableName, "schedule", "interval", "cluster");
425+
Mono<GetTableResponseBody> responseMock = (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
426+
Mockito.when(responseMock.block(any(Duration.class)))
427+
.thenReturn(primaryTableWithReplicationConfigResponseBodyMock);
428+
Mockito.when(apiMock.getTableV1(testDbName, testTableName)).thenReturn(responseMock);
429+
Optional<List<com.linkedin.openhouse.jobs.util.ReplicationConfig>> result =
430+
client.getTableReplication(
431+
TableMetadata.builder().dbName(testDbName).tableName(testTableName).build());
432+
Assertions.assertTrue(
433+
result.isPresent(), "Retention config must be present for a test partitioned table");
434+
List<com.linkedin.openhouse.jobs.util.ReplicationConfig> replicationConfigs = new ArrayList<>();
435+
com.linkedin.openhouse.jobs.util.ReplicationConfig replicationConfig =
436+
com.linkedin.openhouse.jobs.util.ReplicationConfig.builder()
437+
.schedule("schedule")
438+
.cluster("cluster")
439+
.tableOwner("")
440+
.build();
441+
replicationConfigs.add(replicationConfig);
442+
Assertions.assertEquals(replicationConfigs, result.orElse(null));
443+
Mockito.verify(responseMock, Mockito.times(1)).block(any(Duration.class));
444+
Mockito.verify(apiMock, Mockito.times(1)).getTableV1(testDbName, testTableName);
445+
}
446+
418447
@Test
419448
void getDatabases() {
420449
GetAllDatabasesResponseBody allDatabasesResponseBodyMock =
@@ -535,6 +564,24 @@ private GetTableResponseBody createNonPartitionedTableWithPatternResponseBodyMoc
535564
return setUpResponseBodyMock(dbName, tableName, null, policies);
536565
}
537566

567+
private GetTableResponseBody createPrimaryTableWithReplicationPolicyResponseBodyMock(
568+
String dbName, String tableName, String schedule, String interval, String cluster) {
569+
Policies policies = Mockito.mock(Policies.class);
570+
Replication replication = Mockito.mock(Replication.class);
571+
List<ReplicationConfig> replicationConfigs = new ArrayList<>();
572+
ReplicationConfig replicationConfig = Mockito.mock(ReplicationConfig.class);
573+
replicationConfigs.add(replicationConfig);
574+
replication.setConfig(replicationConfigs);
575+
576+
policies.setReplication(replication);
577+
Mockito.when(replication.getConfig()).thenReturn(replicationConfigs);
578+
Mockito.when(policies.getReplication()).thenReturn(replication);
579+
Mockito.when(replicationConfig.getCronSchedule()).thenReturn(schedule);
580+
Mockito.when(replicationConfig.getDestination()).thenReturn(cluster);
581+
Mockito.when(replicationConfig.getInterval()).thenReturn(interval);
582+
return setUpResponseBodyMock(dbName, tableName, null, policies);
583+
}
584+
538585
private GetTableResponseBody createPartitionedTableNullPoliciesResponseBodyMock(
539586
String dbName, String tableName, String partitionColummName) {
540587
TimePartitionSpec partitionSpec = Mockito.mock(TimePartitionSpec.class);

services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public enum JobType {
3333
ORPHAN_DIRECTORY_DELETION,
3434
TABLE_STATS_COLLECTION,
3535
DATA_LAYOUT_STRATEGY_GENERATION,
36-
DATA_LAYOUT_STRATEGY_EXECUTION
36+
DATA_LAYOUT_STRATEGY_EXECUTION,
37+
REPLICATION
3738
}
3839
}

0 commit comments

Comments
 (0)