Skip to content

Commit ac653fb

Browse files
authored
[GOBBLIN-2074] add dag action store inside DagManagementStateStore (apache#3954)
* remove unused fields/APIs * add dag action store inside DMSS * address review comment
1 parent ea1233a commit ac653fb

32 files changed

+231
-150
lines changed

gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
import org.apache.gobblin.runtime.api.SpecNotFoundException;
4242
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
4343
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
44+
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
4445
import org.apache.gobblin.service.modules.orchestration.DagManager;
45-
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
4646
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
4747
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
4848
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
@@ -85,13 +85,13 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
8585

8686
public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
8787
boolean isMultiActiveSchedulerEnabled) {
88-
this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
88+
this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
8989
}
9090

9191
public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled,
92-
DagActionStore dagActionStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
92+
DagManagementStateStore dagManagementStateStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
9393
super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
94-
dagActionStore, isMultiActiveSchedulerEnabled);
94+
dagManagementStateStore, isMultiActiveSchedulerEnabled);
9595
}
9696

9797
protected void processMessageForTest(DecodeableKafkaRecord record) {
@@ -238,8 +238,7 @@ public void testStartupSequenceHandlesFailures() throws Exception {
238238
String jobName = "testJobName";
239239
String flowExecutionId = "12345677";
240240

241-
MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
242-
mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
241+
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
243242

244243
Config monitorConfig = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
245244
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
@@ -251,7 +250,7 @@ public void testStartupSequenceHandlesFailures() throws Exception {
251250
// Throw an uncaught exception during startup sequence
252251
when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new RuntimeException("Uncaught exception"));
253252
mockDagActionStoreChangeMonitor = new MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
254-
true, mysqlDagActionStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
253+
true, dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
255254
try {
256255
mockDagActionStoreChangeMonitor.setActive();
257256
} catch (Exception e) {

gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151

5252
import kafka.consumer.ConsumerIterator;
5353
import kafka.message.MessageAndMetadata;
54-
5554
import lombok.Getter;
5655

5756
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -75,8 +74,7 @@
7574
import org.apache.gobblin.service.ExecutionStatus;
7675
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
7776
import org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
78-
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
79-
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
77+
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
8078
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
8179
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
8280
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -103,7 +101,7 @@ public class KafkaAvroJobStatusMonitorTest {
103101
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
104102
private MetricContext context;
105103
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
106-
private MysqlDagActionStore mysqlDagActionStore;
104+
private DagManagementStateStore dagManagementStateStore;
107105
private final MockedStatic<GobblinServiceManager> mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
108106

109107
@BeforeClass
@@ -123,9 +121,8 @@ public void setUp() throws Exception {
123121
builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
124122
builder = builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
125123
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
126-
this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
124+
this.dagManagementStateStore = mock(DagManagementStateStore.class);
127125
this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
128-
this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
129126
}
130127

131128
@Test
@@ -787,7 +784,7 @@ class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
787784
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
788785
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, GaaSJobObservabilityEventProducer producer)
789786
throws IOException, ReflectiveOperationException {
790-
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, mysqlDagActionStore);
787+
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, dagManagementStateStore);
791788
shouldThrowFakeExceptionInParseJobStatus = shouldThrowFakeExceptionInParseJobStatusToggle;
792789
}
793790

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java

+74-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.net.URI;
22+
import java.sql.SQLException;
2223
import java.util.Collection;
2324
import java.util.List;
2425
import java.util.Optional;
@@ -146,12 +147,6 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
146147
*/
147148
List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) throws IOException;
148149

149-
/**
150-
* Returns the {@link Dag} the provided {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
151-
* or Optional.absent if it is not found.
152-
*/
153-
Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);
154-
155150
/**
156151
* Deletes the dag node state that was added through {@link DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
157152
* No-op if the dag node is not found in the store.
@@ -201,4 +196,77 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
201196
* has any running job, false otherwise.
202197
*/
203198
public boolean hasRunningJobs(DagManager.DagId dagId);
199+
200+
/**
201+
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
202+
* @param flowGroup flow group for the dag action
203+
* @param flowName flow name for the dag action
204+
* @param flowExecutionId flow execution for the dag action
205+
* @param jobName job name for the dag action
206+
* @param dagActionType the value of the dag action
207+
* @throws IOException
208+
*/
209+
boolean existsJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
210+
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;
211+
212+
/**
213+
* Check if an action exists in dagAction store by flow group, flow name, and flow execution id, it assumes jobName is
214+
* empty ("").
215+
* @param flowGroup flow group for the dag action
216+
* @param flowName flow name for the dag action
217+
* @param flowExecutionId flow execution for the dag action
218+
* @param dagActionType the value of the dag action
219+
* @throws IOException
220+
*/
221+
boolean existsFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
222+
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;
223+
224+
/** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} for durability */
225+
default void addDagAction(DagActionStore.DagAction dagAction) throws IOException {
226+
addJobDagAction(
227+
dagAction.getFlowGroup(),
228+
dagAction.getFlowName(),
229+
dagAction.getFlowExecutionId(),
230+
dagAction.getJobName(),
231+
dagAction.getDagActionType());
232+
}
233+
234+
/**
235+
* Persist the dag action in {@link DagActionStore} for durability
236+
* @param flowGroup flow group for the dag action
237+
* @param flowName flow name for the dag action
238+
* @param flowExecutionId flow execution for the dag action
239+
* @param jobName job name for the dag action
240+
* @param dagActionType the value of the dag action
241+
* @throws IOException
242+
*/
243+
void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
244+
DagActionStore.DagActionType dagActionType) throws IOException;
245+
246+
/**
247+
* Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName.
248+
* @param flowGroup flow group for the dag action
249+
* @param flowName flow name for the dag action
250+
* @param flowExecutionId flow execution for the dag action
251+
* @param dagActionType the value of the dag action
252+
* @throws IOException
253+
*/
254+
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
255+
DagActionStore.DagActionType dagActionType) throws IOException {
256+
addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
257+
}
258+
259+
/**
260+
* delete the dag action from {@link DagActionStore}
261+
* @param dagAction containing all information needed to identify dag and specific action value
262+
* @throws IOException
263+
* @return true if we successfully delete one record, return false if the record does not exist
264+
*/
265+
boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException;
266+
267+
/***
268+
* Get all {@link DagActionStore.DagAction}s from the {@link DagActionStore}.
269+
* @throws IOException Exception in retrieving {@link DagActionStore.DagAction}s.
270+
*/
271+
Collection<DagActionStore.DagAction> getDagActions() throws IOException;
204272
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@
7676
public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream {
7777
private final Config config;
7878
@Getter private final EventSubmitter eventSubmitter;
79-
80-
@Inject(optional=true)
81-
protected Optional<DagActionStore> dagActionStore;
8279
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
8380
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
8481
private final boolean isMultiActiveExecutionEnabled;
@@ -102,7 +99,6 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
10299
throw new RuntimeException(String.format("DagProcessingEngine requires %s to be instantiated.",
103100
DagActionReminderScheduler.class.getSimpleName()));
104101
}
105-
this.dagActionStore = dagActionStore;
106102
this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
107103
this.dagActionReminderScheduler = dagActionReminderScheduler;
108104
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
@@ -214,17 +210,17 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt
214210

215211
switch (dagActionType) {
216212
case ENFORCE_FLOW_FINISH_DEADLINE:
217-
return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
213+
return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
218214
case ENFORCE_JOB_START_DEADLINE:
219-
return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
215+
return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
220216
case KILL:
221-
return new KillDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
217+
return new KillDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
222218
case LAUNCH:
223-
return new LaunchDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
219+
return new LaunchDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
224220
case REEVALUATE:
225-
return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
221+
return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
226222
case RESUME:
227-
return new ResumeDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
223+
return new ResumeDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
228224
default:
229225
throw new UnsupportedOperationException(dagActionType + " not yet implemented");
230226
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
@Slf4j
7070
public class FlowLaunchHandler {
7171
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
72-
private DagActionStore dagActionStore;
72+
private DagManagementStateStore dagManagementStateStore;
7373
private final MetricContext metricContext;
7474
private final int schedulerMaxBackoffMillis;
7575
private static Random random = new Random();
@@ -81,13 +81,13 @@ public class FlowLaunchHandler {
8181
@Inject
8282
public FlowLaunchHandler(Config config,
8383
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter leaseArbiter,
84-
SchedulerService schedulerService, com.google.common.base.Optional<DagActionStore> optDagActionStore) {
84+
SchedulerService schedulerService, com.google.common.base.Optional<DagManagementStateStore> dagManagementStateStoreOpt) {
8585
this.multiActiveLeaseArbiter = leaseArbiter;
8686

87-
if (!optDagActionStore.isPresent()) {
87+
if (!dagManagementStateStoreOpt.isPresent()) {
8888
throw new RuntimeException("DagActionStore MUST be present for flow launch handling!");
8989
}
90-
this.dagActionStore = optDagActionStore.get();
90+
this.dagManagementStateStore = dagManagementStateStoreOpt.get();
9191

9292
this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
9393
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
@@ -140,8 +140,8 @@ private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherSt
140140
private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
141141
DagActionStore.DagAction launchDagAction = leaseStatus.getConsensusDagAction();
142142
try {
143-
this.dagActionStore.addDagAction(launchDagAction);
144-
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
143+
this.dagManagementStateStore.addDagAction(launchDagAction);
144+
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, launchDagAction);
145145
this.numFlowsSubmitted.mark();
146146
// after successfully persisting, close the lease
147147
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);

0 commit comments

Comments
 (0)