Skip to content

Commit 4ceabdd

Browse files
committed
Replication job to trigger setup and carbon flow for replica tables
1 parent 3e8d387 commit 4ceabdd

File tree

9 files changed

+234
-2
lines changed

9 files changed

+234
-2
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java

+28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
55
import com.linkedin.openhouse.jobs.util.DatabaseTableFilter;
66
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
7+
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
78
import com.linkedin.openhouse.jobs.util.RetentionConfig;
89
import com.linkedin.openhouse.jobs.util.RetryUtil;
910
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
@@ -15,6 +16,7 @@
1516
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
1617
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
1718
import com.linkedin.openhouse.tables.client.model.Policies;
19+
import com.linkedin.openhouse.tables.client.model.Replication;
1820
import java.time.Duration;
1921
import java.util.AbstractMap;
2022
import java.util.ArrayList;
@@ -86,6 +88,31 @@ private Optional<RetentionConfig> getTableRetention(GetTableResponseBody respons
8688
.build());
8789
}
8890

91+
private Optional<List<ReplicationConfig>> getTableReplication(GetTableResponseBody response) {
92+
// At least one replication config must be present
93+
if (response == null
94+
|| response.getPolicies() == null
95+
|| response.getPolicies().getReplication() == null
96+
|| response.getPolicies().getReplication().getConfig().size() <= 0) {
97+
return Optional.empty();
98+
}
99+
List<ReplicationConfig> replicationConfigList = new ArrayList<>();
100+
Replication conf = response.getPolicies().getReplication();
101+
List<com.linkedin.openhouse.tables.client.model.ReplicationConfig> replicationConfig =
102+
conf.getConfig();
103+
104+
replicationConfig.forEach(
105+
rc ->
106+
replicationConfigList.add(
107+
ReplicationConfig.builder()
108+
.cluster(rc.getDestination())
109+
.proxyUser(response.getTableCreator())
110+
.schedule(rc.getCronSchedule())
111+
.build()));
112+
// since replicationConfigList is initialized, it cannot be null.
113+
return Optional.of(replicationConfigList);
114+
}
115+
89116
protected GetTableResponseBody getTable(TableMetadata tableMetadata) {
90117
return getTable(tableMetadata.getDbName(), tableMetadata.getTableName());
91118
}
@@ -281,6 +308,7 @@ protected Optional<TableMetadata> mapTableResponseToTableMetadata(
281308
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
282309
.isClustered(tableResponseBody.getClustering() != null)
283310
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
311+
.replicationConfig(getTableReplication(tableResponseBody).orElse(null))
284312
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody));
285313
builder.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
286314
return Optional.of(builder.build());

apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java

+7
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ protected static CommandLine parseArgs(String[] args) {
291291
.longOpt("tableFilter")
292292
.desc("Regexp for filtering tables, defaults to .*")
293293
.build());
294+
options.addOption(
295+
Option.builder(null)
296+
.required(false)
297+
.hasArg()
298+
.longOpt("tableTypeFilter")
299+
.desc("Option for filtering tables based on tableType. Defaults to None")
300+
.build());
294301
options.addOption(
295302
Option.builder(null)
296303
.required(false)

apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java

+15
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ private List<OperationTask<?>> prepareTableOperationTaskList(JobConf.JobTypeEnum
4848
return processMetadataList(tableMetadataList, jobType);
4949
}
5050

51+
private List<OperationTask<?>> prepareReplicationOperationTaskList(JobConf.JobTypeEnum jobType) {
52+
List<TableMetadata> replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
53+
// filters tables which are primary and hava replication config defined
54+
replicationSetupTableMetadataList =
55+
replicationSetupTableMetadataList.stream()
56+
.filter(m -> m.isPrimary() && (m.getReplicationConfig() != null))
57+
.collect(Collectors.toList());
58+
log.info(
59+
"Fetched metadata for {} tables for replication setup task",
60+
replicationSetupTableMetadataList.size());
61+
return processMetadataList(replicationSetupTableMetadataList, jobType);
62+
}
63+
5164
private List<OperationTask<?>> prepareTableDirectoryOperationTaskList(
5265
JobConf.JobTypeEnum jobType) {
5366
List<DirectoryMetadata> directoryMetadataList = tablesClient.getOrphanTableDirectories();
@@ -152,6 +165,8 @@ public List<OperationTask<?>> buildOperationTaskList(
152165
case STAGED_FILES_DELETION:
153166
case DATA_LAYOUT_STRATEGY_GENERATION:
154167
return prepareTableOperationTaskList(jobType);
168+
case REPLICATION:
169+
return prepareReplicationOperationTaskList(jobType);
155170
case DATA_LAYOUT_STRATEGY_EXECUTION:
156171
return prepareDataLayoutOperationTaskList(jobType, properties, meter);
157172
case ORPHAN_DIRECTORY_DELETION:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.linkedin.openhouse.jobs.scheduler.tasks;
2+
3+
import com.linkedin.openhouse.common.JobState;
4+
import com.linkedin.openhouse.jobs.client.JobsClient;
5+
import com.linkedin.openhouse.jobs.client.TablesClient;
6+
import com.linkedin.openhouse.jobs.client.model.JobConf;
7+
import com.linkedin.openhouse.jobs.util.AppConstants;
8+
import com.linkedin.openhouse.jobs.util.OtelConfig;
9+
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
10+
import com.linkedin.openhouse.jobs.util.TableMetadata;
11+
import io.opentelemetry.api.common.AttributeKey;
12+
import io.opentelemetry.api.common.Attributes;
13+
import io.opentelemetry.api.metrics.Meter;
14+
import java.util.List;
15+
import java.util.Optional;
16+
import lombok.extern.slf4j.Slf4j;
17+
18+
/** A task to apply replication to a table. */
19+
@Slf4j
20+
public class TableReplicationTask extends TableOperationTask<TableMetadata> {
21+
public static final JobConf.JobTypeEnum OPERATION_TYPE = JobConf.JobTypeEnum.REPLICATION;
22+
private static final Meter METER = OtelConfig.getMeter(OperationTask.class.getName());
23+
24+
protected TableReplicationTask(
25+
JobsClient jobsClient, TablesClient tablesClient, TableMetadata tableMetadata) {
26+
super(jobsClient, tablesClient, tableMetadata);
27+
}
28+
29+
@Override
30+
public JobConf.JobTypeEnum getType() {
31+
return OPERATION_TYPE;
32+
}
33+
34+
@Override
35+
protected List<String> getArgs() {
36+
return null;
37+
}
38+
39+
/* Returns empty value iff the callable was interrupted by future cancel. */
40+
@Override
41+
public Optional<JobState> call() {
42+
if (!shouldRun()) {
43+
log.info("Skipping job for {}, since the operation doesn't need to be run", metadata);
44+
return Optional.empty();
45+
}
46+
List<ReplicationConfig> replicationConfigs = metadata.getReplicationConfig();
47+
for (ReplicationConfig config : replicationConfigs) {
48+
log.info("Launching job for {}", metadata);
49+
Attributes typeAttributes =
50+
Attributes.of(
51+
AttributeKey.stringKey(AppConstants.TYPE),
52+
getType().getValue(),
53+
(metadata.getClass().equals(TableMetadata.class)
54+
? AttributeKey.stringKey(AppConstants.TABLE_NAME)
55+
: AttributeKey.stringKey(AppConstants.DATABASE_NAME)),
56+
metadata.getEntityName());
57+
try {
58+
OtelConfig.executeWithStats(
59+
() -> {
60+
// this is a wrapper to convert boolean false to an exception
61+
if (!launchJob(config)) {
62+
throw new Exception();
63+
}
64+
return null;
65+
},
66+
METER,
67+
"submit",
68+
typeAttributes);
69+
} catch (Exception e) {
70+
log.error(
71+
"Could not launch job {} for {}. Exception {}", getType(), metadata, e.getMessage());
72+
return Optional.empty();
73+
}
74+
log.info("Launched a job for {}", metadata);
75+
// TODO: implement wait loop for job to finish and update metrics and job state
76+
// TODO: update the jobState with returned value from Airflow client
77+
}
78+
return Optional.of(Enum.valueOf(JobState.class, JobState.FAILED.name()));
79+
}
80+
81+
protected boolean launchJob(ReplicationConfig config) {
82+
String jobName =
83+
String.format(
84+
"%s_%s_%s_%s",
85+
getType(), config.getCluster(), metadata.getDbName(), metadata.getTableName());
86+
// TODO: Trigger Airflow job using airflow job client. Config can be used to create airflow
87+
// client params
88+
// TODO: Poll for job ID
89+
log.info("Triggering Replication job: {} via airflow client", jobName);
90+
return false;
91+
}
92+
93+
@Override
94+
protected boolean shouldRun() {
95+
return metadata.isPrimary() && metadata.getReplicationConfig() != null;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.linkedin.openhouse.jobs.spark;
2+
3+
import com.linkedin.openhouse.jobs.spark.state.StateManager;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.commons.cli.CommandLine;
8+
import org.apache.commons.cli.Option;
9+
10+
/**
11+
* Class with main entry point to replication job to trigger airflow run to setup replication for a
12+
* table with defined replication config
13+
*
14+
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.ReplicationSparkApp --tableName
15+
* db.testTable
16+
*/
17+
@Slf4j
18+
public class ReplicationSparkApp extends BaseTableSparkApp {
19+
private final String schedule;
20+
private final String cluster;
21+
private final String proxyUser;
22+
23+
public ReplicationSparkApp(
24+
String jobId,
25+
StateManager stateManager,
26+
String fqtn,
27+
String schedule,
28+
String cluster,
29+
String proxyUser) {
30+
super(jobId, stateManager, fqtn);
31+
this.schedule = schedule;
32+
this.cluster = cluster;
33+
this.proxyUser = proxyUser;
34+
}
35+
36+
@Override
37+
protected void runInner(Operations ops) {
38+
log.info(
39+
"Running ReplicationSparkApp for table {}, with parameters schedule: {}, cluster: {}, proxyUser: {}",
40+
fqtn,
41+
schedule,
42+
cluster,
43+
proxyUser);
44+
}
45+
46+
public static void main(String[] args) {
47+
List<Option> extraOptions = new ArrayList<>();
48+
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));
49+
extraOptions.add(new Option("s", "schedule", true, "Replication job schedule in cron format"));
50+
extraOptions.add(
51+
new Option("p", "proxyUser", true, "Proxy user to run carbon replication job"));
52+
extraOptions.add(new Option("p", "cluster", true, "Destination cluster for replication"));
53+
54+
CommandLine cmdLine = createCommandLine(args, extraOptions);
55+
ReplicationSparkApp app =
56+
new ReplicationSparkApp(
57+
getJobId(cmdLine),
58+
createStateManager(cmdLine),
59+
cmdLine.getOptionValue("tableName"),
60+
cmdLine.getOptionValue("schedule"),
61+
cmdLine.getOptionValue("cluster"),
62+
cmdLine.getOptionValue("proxyUser"));
63+
app.run();
64+
}
65+
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/RetentionSparkApp.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import org.apache.commons.cli.Option;
99

1010
/**
11-
* Class with main entry point to run as a table retention job. The table doesn't have to be time
11+
* Class with main entry point to run as a table replication job. The table doesn't have to be time
1212
* partitioned, but the retention column must be a time column.
1313
*
1414
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.RetentionSparkApp --columnName
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.linkedin.openhouse.jobs.util;
2+
3+
import lombok.Builder;
4+
import lombok.EqualsAndHashCode;
5+
import lombok.Getter;
6+
import lombok.ToString;
7+
8+
/** Table retention config class. This is app side representation of /tables policies->retention */
9+
@Builder
10+
@Getter
11+
@EqualsAndHashCode
12+
@ToString
13+
public class ReplicationConfig {
14+
private final String schedule;
15+
private final String proxyUser;
16+
private final String cluster;
17+
}

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadata.java

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.openhouse.jobs.util;
22

33
import java.util.HashMap;
4+
import java.util.List;
45
import java.util.Map;
56
import javax.annotation.Nullable;
67
import lombok.Builder;
@@ -25,6 +26,7 @@ public class TableMetadata extends Metadata {
2526
@Builder.Default protected @NonNull Map<String, String> jobExecutionProperties = new HashMap<>();
2627
protected @Nullable RetentionConfig retentionConfig;
2728
protected @Nullable HistoryConfig historyConfig;
29+
protected @Nullable List<ReplicationConfig> replicationConfig;
2830

2931
public String fqtn() {
3032
return String.format("%s.%s", dbName, tableName);

services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public enum JobType {
3333
ORPHAN_DIRECTORY_DELETION,
3434
TABLE_STATS_COLLECTION,
3535
DATA_LAYOUT_STRATEGY_GENERATION,
36-
DATA_LAYOUT_STRATEGY_EXECUTION
36+
DATA_LAYOUT_STRATEGY_EXECUTION,
37+
REPLICATION
3738
}
3839
}

0 commit comments

Comments
 (0)