Skip to content

Commit 3a045cd

Browse files
authored
[GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (apache#3694)
* Use FlowexecutionId as job id instead of generated timestamp if the properties has a flow execution ID property * Add flowexecution ID to kafkajob monitor for cancelled flows if applicable * Address review * Add enhanced logs
1 parent 7bbf676 commit 3a045cd

File tree

4 files changed

+143
-49
lines changed

4 files changed

+143
-49
lines changed

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.gobblin.util.ClassAliasResolver;
3737
import org.apache.gobblin.util.ConfigUtils;
3838
import org.apache.gobblin.util.JobLauncherUtils;
39+
import org.apache.gobblin.util.PropertiesUtils;
3940

4041

4142
/**
@@ -96,12 +97,14 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {
9697

9798
public static String createPlanningJobId (Properties jobPlanningProps) {
9899
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
99-
+ JobState.getJobNameFromProps(jobPlanningProps));
100+
+ JobState.getJobNameFromProps(jobPlanningProps),
101+
PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
100102
}
101103

102104
public static String createActualJobId (Properties jobProps) {
103-
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
104-
+ JobState.getJobNameFromProps(jobProps));
105+
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
106+
+ JobState.getJobNameFromProps(jobProps),
107+
PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
105108
}
106109

107110
@Nullable

gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java

+98-9
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,26 @@
2929
import org.apache.commons.lang3.tuple.Pair;
3030
import org.testng.Assert;
3131
import org.testng.annotations.AfterSuite;
32+
import org.testng.annotations.BeforeClass;
3233
import org.testng.annotations.BeforeSuite;
3334
import org.testng.annotations.Test;
3435

3536
import com.google.common.io.Closer;
3637
import com.typesafe.config.Config;
3738

39+
import lombok.extern.slf4j.Slf4j;
40+
3841
import org.apache.gobblin.configuration.ConfigurationKeys;
3942
import org.apache.gobblin.kafka.KafkaTestBase;
4043
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
4144
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
4245
import org.apache.gobblin.runtime.api.JobSpec;
4346
import org.apache.gobblin.runtime.api.Spec;
47+
import org.apache.gobblin.runtime.api.SpecExecutor;
4448
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
4549
import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
4650
import org.apache.gobblin.util.ConfigUtils;
4751
import org.apache.gobblin.writer.WriteResponse;
48-
import org.apache.gobblin.runtime.api.SpecExecutor;
49-
50-
import lombok.extern.slf4j.Slf4j;
5152

5253

5354
@Slf4j
@@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
6364
private String _kafkaBrokers;
6465
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
6566
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
67+
String flowSpecUriString = "/flowgroup/flowname/spec";
68+
Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
6669
String specUriString = "/foo/bar/spec";
6770
Spec spec = initJobSpec(specUriString);
6871

72+
6973
@BeforeSuite
7074
public void beforeSuite() {
7175
log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
@@ -92,9 +96,8 @@ private void cleanupTestDir() {
9296
}
9397
}
9498
}
95-
96-
@Test
97-
public void testAddSpec() throws Exception {
99+
@BeforeClass
100+
public void setup() throws Exception {
98101
_closer = Closer.create();
99102
_properties = new Properties();
100103

@@ -116,16 +119,20 @@ public void testAddSpec() throws Exception {
116119
// SEI Producer
117120
_seip = _closer.register(new SimpleKafkaSpecProducer(config));
118121

119-
WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
120-
log.info("WriteResponse: " + writeResponse);
121-
122122
_jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
123123
_jobCatalog.startAsync().awaitRunning();
124124

125125
// SEI Consumer
126126
_seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
127127
_seic.startAsync().awaitRunning();
128128

129+
}
130+
131+
@Test
132+
public void testAddSpec() throws Exception {
133+
WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
134+
log.info("WriteResponse: " + writeResponse);
135+
129136
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
130137
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
131138

@@ -165,6 +172,78 @@ public void testDeleteSpec() throws Exception {
165172
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
166173
}
167174

175+
@Test(dependsOnMethods = "testDeleteSpec")
176+
public void testCancelSpec() throws Exception {
177+
// Cancel an existing spec that was added
178+
_seip.addSpec(spec).get();
179+
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get();
180+
log.info("WriteResponse: " + writeResponse);
181+
182+
// Wait for the cancellation to be processed
183+
Thread.sleep(5000);
184+
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
185+
Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
186+
187+
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
188+
log.info(consumedSpecAction.getKey().toString());
189+
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
190+
Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
191+
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
192+
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
193+
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
194+
}
195+
196+
@Test (dependsOnMethods = "testCancelSpec")
197+
public void testCancelSpecNoopDefault() throws Exception {
198+
_seip.addSpec(flowSpec).get();
199+
Properties props = new Properties();
200+
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job
201+
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
202+
log.info("WriteResponse: " + writeResponse);
203+
// Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match
204+
Thread.sleep(5000);
205+
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
206+
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
207+
208+
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
209+
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
210+
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
211+
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
212+
213+
_seip.cancelJob(new URI(flowSpecUriString), new Properties()).get();
214+
Thread.sleep(5000);
215+
consumedEvent = _seic.changedSpecs().get();
216+
Assert.assertTrue(consumedEvent.size() == 2, "Should emit cancellation event if no flow ID provided");
217+
consumedSpecAction = consumedEvent.get(1);
218+
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
219+
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
220+
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
221+
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
222+
}
223+
224+
@Test(dependsOnMethods = "testCancelSpecNoopDefault")
225+
public void testCancelSpecWithFlowExecutionId() throws Exception {
226+
_seip.addSpec(flowSpec).get();
227+
Properties props = new Properties();
228+
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345");
229+
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
230+
log.info("WriteResponse: " + writeResponse);
231+
232+
// Wait for the cancellation to be processed
233+
Thread.sleep(5000);
234+
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
235+
Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
236+
237+
Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
238+
log.info(consumedSpecAction.getKey().toString());
239+
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
240+
Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
241+
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
242+
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
243+
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
244+
}
245+
246+
168247
private static JobSpec initJobSpec(String specUri) {
169248
Properties properties = new Properties();
170249
return JobSpec.builder(specUri)
@@ -174,6 +253,16 @@ private static JobSpec initJobSpec(String specUri) {
174253
.build();
175254
}
176255

256+
private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
257+
Properties properties = new Properties();
258+
properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
259+
return JobSpec.builder(specUri)
260+
.withConfig(ConfigUtils.propertiesToConfig(properties))
261+
.withVersion("1")
262+
.withDescription("Spec Description")
263+
.build();
264+
}
265+
177266
@AfterSuite
178267
public void after() {
179268
try {

gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java

+14-32
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,30 @@
1717

1818
package org.apache.gobblin.service;
1919

20-
import com.google.common.base.Joiner;
2120
import java.io.Closeable;
2221
import java.io.IOException;
2322
import java.lang.reflect.InvocationTargetException;
2423
import java.net.URI;
2524
import java.net.URISyntaxException;
2625
import java.util.List;
27-
import java.util.concurrent.Future;
2826
import java.util.Properties;
27+
import java.util.concurrent.Future;
2928

3029
import org.apache.commons.lang3.reflect.ConstructorUtils;
31-
import org.apache.gobblin.configuration.ConfigurationKeys;
3230
import org.slf4j.Logger;
3331

3432
import com.codahale.metrics.Meter;
3533
import com.codahale.metrics.MetricRegistry;
34+
import com.google.common.base.Joiner;
3635
import com.google.common.base.Optional;
3736
import com.google.common.collect.ImmutableMap;
3837
import com.google.common.collect.Maps;
3938
import com.typesafe.config.Config;
4039

40+
import javax.annotation.concurrent.NotThreadSafe;
41+
import lombok.extern.slf4j.Slf4j;
42+
43+
import org.apache.gobblin.configuration.ConfigurationKeys;
4144
import org.apache.gobblin.configuration.State;
4245
import org.apache.gobblin.instrumented.Instrumented;
4346
import org.apache.gobblin.metrics.MetricContext;
@@ -54,9 +57,6 @@
5457
import org.apache.gobblin.writer.AsyncDataWriter;
5558
import org.apache.gobblin.writer.WriteCallback;
5659

57-
import javax.annotation.concurrent.NotThreadSafe;
58-
import lombok.extern.slf4j.Slf4j;
59-
6060
@Slf4j
6161
@NotThreadSafe
6262
public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@@ -105,19 +105,6 @@ private Meter createMeter(String suffix) {
105105
return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
106106
}
107107

108-
private Spec addExecutionIdToJobSpecUri(Spec spec) {
109-
JobSpec newSpec = (JobSpec)spec;
110-
if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
111-
try {
112-
newSpec.setUri(new URI(Joiner.on("/").
113-
join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
114-
} catch (URISyntaxException e) {
115-
log.error("Cannot create job uri to cancel job", e);
116-
}
117-
}
118-
return newSpec;
119-
}
120-
121108
private URI getURIWithExecutionId(URI originalURI, Properties props) {
122109
URI result = originalURI;
123110
if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
@@ -133,48 +120,43 @@ private URI getURIWithExecutionId(URI originalURI, Properties props) {
133120

134121
@Override
135122
public Future<?> addSpec(Spec addedSpec) {
136-
Spec spec = addExecutionIdToJobSpecUri(addedSpec);
137-
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
123+
AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
138124

139-
log.info("Adding Spec: " + spec + " using Kafka.");
125+
log.info("Adding Spec: " + addedSpec + " using Kafka.");
140126
this.addSpecMeter.mark();
141127

142128
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
143129
}
144130

145131
@Override
146132
public Future<?> updateSpec(Spec updatedSpec) {
147-
Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
148-
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
133+
AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
149134

150-
log.info("Updating Spec: " + spec + " using Kafka.");
135+
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
151136
this.updateSpecMeter.mark();
152137

153138
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
154139
}
155140

156141
@Override
157142
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
158-
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);
159-
160-
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
143+
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
161144
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
162145
.setProperties(Maps.fromProperties(headers)).build();
163146

164-
log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka.");
147+
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
165148
this.deleteSpecMeter.mark();
166149

167150
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
168151
}
169152

170153
@Override
171154
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
172-
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
173-
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
155+
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
174156
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
175157
.setProperties(Maps.fromProperties(properties)).build();
176158

177-
log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka.");
159+
log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
178160
this.cancelSpecMeter.mark();
179161

180162
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import lombok.Getter;
2929
import lombok.extern.slf4j.Slf4j;
3030

31+
import org.apache.gobblin.configuration.ConfigurationKeys;
3132
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
3233
import org.apache.gobblin.metastore.DatasetStateStore;
3334
import org.apache.gobblin.metrics.ContextAwareMeter;
3435
import org.apache.gobblin.runtime.api.JobSpec;
3536
import org.apache.gobblin.runtime.api.JobSpecMonitor;
37+
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
3638
import org.apache.gobblin.runtime.api.MutableJobCatalog;
3739
import org.apache.gobblin.runtime.api.SpecExecutor;
3840
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -136,14 +138,32 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
136138
break;
137139
case DELETE:
138140
this.removedSpecs.mark();
139-
URI jobSpecUri = parsedMessage.getUri();
140-
this.jobCatalog.remove(jobSpecUri);
141+
this.jobCatalog.remove(parsedMessage.getUri());
141142
// Delete the job state if it is a delete spec request
142-
deleteStateStore(jobSpecUri);
143+
deleteStateStore(parsedMessage.getUri());
143144
break;
144145
case CANCEL:
145-
this.cancelledSpecs.mark();
146-
this.jobCatalog.remove(parsedMessage.getUri(), true);
146+
URI specUri = parsedMessage.getUri();
147+
try {
148+
JobSpec spec = this.jobCatalog.getJobSpec(specUri);
149+
// If incoming job or existing job does not have an associated flow execution ID, default to cancelling the job
150+
if (!spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) || !parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
151+
this.cancelledSpecs.mark();
152+
this.jobCatalog.remove(specUri, true);
153+
} else {
154+
// Validate that the flow execution ID of the running flow matches the one in the incoming job spec
155+
String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
156+
if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) {
157+
this.cancelledSpecs.mark();
158+
this.jobCatalog.remove(specUri, true);
159+
} else {
160+
log.warn("Job spec {} that has flow execution ID {} could not be cancelled, incoming request expects to cancel flow execution ID {}", specUri,
161+
spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowIdToCancel);
162+
}
163+
}
164+
} catch (JobSpecNotFoundException e) {
165+
log.warn("Could not find job spec {} to cancel in job catalog", specUri);
166+
}
147167
break;
148168
default:
149169
log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);

0 commit comments

Comments
 (0)