Skip to content

Commit c98ef9d

Browse files
homatthewy242yang
andauthored
[GOBBLIN-1915] Gobblin on Temporal proof of concept implementation (apache#3785)
* temporal poc add temporal classes remove dependency point to the right application master gobblin-temporal module * Remove some probably unintended changes * Address Comments * inconsistent use of task runners vs containers * Configurable namespace and connection string * helloworld impl * Add shut down hook and remove author comments --------- Co-authored-by: y242yang <[email protected]>
1 parent 2a1fc46 commit c98ef9d

26 files changed

+3733
-3
lines changed

gobblin-temporal/build.gradle

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
plugins {
19+
// Bump up shadow version to support Gradle 5.x https://github.com/johnrengelman/shadow
20+
id 'com.github.johnrengelman.shadow' version '5.2.0'
21+
}
22+
23+
apply plugin: 'java'
24+
25+
dependencies {
26+
implementation project(path: ':gobblin-yarn')
27+
compile project(":gobblin-api")
28+
compile project(":gobblin-cluster")
29+
compile project(":gobblin-core")
30+
compile project(":gobblin-metrics-libs:gobblin-metrics")
31+
compile project(":gobblin-metastore")
32+
compile project(":gobblin-runtime")
33+
compile project(":gobblin-utility")
34+
compile project(":gobblin-yarn")
35+
compile project(path: ':gobblin-rest-service:gobblin-rest-api', configuration: 'restClient')
36+
compile project(":gobblin-rest-service:gobblin-rest-server")
37+
38+
compile externalDependency.avro
39+
compile externalDependency.commonsConfiguration
40+
compile externalDependency.quartz
41+
compile externalDependency.guava
42+
compile externalDependency.commonsLang
43+
compile externalDependency.slf4j
44+
45+
compile externalDependency.commonsCli
46+
compile externalDependency.gson
47+
compile externalDependency.hiveCommon
48+
compile externalDependency.metricsCore
49+
compile externalDependency.metricsJvm
50+
compile externalDependency.commonsIo
51+
compile externalDependency.commonsEmail
52+
compile externalDependency.pegasus.data
53+
compile externalDependency.typesafeConfig
54+
compile externalDependency.hadoopClientCommon
55+
compile externalDependency.hadoopCommon
56+
compile externalDependency.hadoopYarnApi
57+
compile externalDependency.hadoopYarnCommon
58+
compile externalDependency.hadoopYarnClient
59+
compile externalDependency.avroMapredH2
60+
compile externalDependency.findBugsAnnotations
61+
compile (externalDependency.helix) {
62+
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
63+
}
64+
compile externalDependency."temporal-sdk"
65+
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
66+
testCompile project(":gobblin-example")
67+
68+
testCompile externalDependency.testng
69+
testCompile externalDependency.mockito
70+
testCompile externalDependency.hadoopYarnMiniCluster
71+
testCompile externalDependency.curatorFramework
72+
testCompile externalDependency.curatorTest
73+
74+
testCompile ('com.google.inject:guice:3.0') {
75+
force = true
76+
}
77+
}
78+
79+
task testJar(type: Jar, dependsOn: testClasses) {
80+
baseName = "test-${project.archivesBaseName}"
81+
from sourceSets.test.output
82+
}
83+
84+
// This line is added as a work around to fix a bug. Without this line, the build
85+
// might fail intermittently with a 'Could not find property mavenDeployer' error.
86+
// More details: check TOOLS-123257
87+
tasks.remove(tasks.uploadShadow)
88+
89+
// create a single Jar with all dependencies
90+
shadowJar {
91+
zip64 true
92+
dependencies {
93+
exclude dependency('org.eclipse.jetty:.*')
94+
exclude dependency('org.mortbay.jetty:.*')
95+
exclude dependency('org.projectlombok:.*')
96+
exclude dependency('org.codehaus.groovy:.*')
97+
exclude dependency('org.apache.kafka:.*')
98+
exclude dependency('mysql:.*')
99+
exclude dependency('com.linkedin.pegasus:.*')
100+
exclude dependency('org.bouncycastle:.*')
101+
exclude dependency('org.testng:.*')
102+
exclude dependency('org.mockito:.*')
103+
exclude dependency('org.datanucleus:.*')
104+
exclude dependency('org.apache.hive:.*')
105+
exclude dependency('com.linkedin.hive:.*')
106+
exclude dependency('org.scala-lang:scala-library:.*')
107+
exclude dependency('org.apache.derby:.*')
108+
}
109+
mergeServiceFiles()
110+
}
111+
112+
configurations {
113+
tests
114+
}
115+
116+
configurations {
117+
compile {
118+
transitive = false
119+
}
120+
121+
testRuntime {
122+
resolutionStrategy {
123+
force 'com.google.inject:guice:3.0'
124+
force 'org.apache.hadoop:hadoop-yarn-server-resourcemanager:'+hadoopVersion
125+
}
126+
}
127+
}
128+
129+
artifacts {
130+
tests testJar
131+
}
132+
133+
test {
134+
dependsOn shadowJar
135+
workingDir rootProject.rootDir
136+
maxParallelForks = 1
137+
forkEvery = 1
138+
}
139+
140+
clean {
141+
delete "../gobblin-test/locks"
142+
delete "../gobblin-test/basicTest"
143+
}
144+
145+
ext.classification="library"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal;
19+
20+
import org.apache.gobblin.annotation.Alpha;
21+
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher;
22+
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;
23+
24+
25+
/**
26+
* A central place for configuration related constants of a Gobblin Temporal.
27+
*/
28+
@Alpha
29+
public interface GobblinTemporalConfigurationKeys {
30+
31+
String PREFIX = "gobblin.temporal.";
32+
33+
String WORKER_CLASS = PREFIX + "worker";
34+
String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
35+
String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
36+
String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
37+
38+
String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
39+
String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
40+
String GOBBLIN_TEMPORAL_JOB_LAUNCHER = PREFIX + "job.launcher";
41+
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER = HelloWorldJobLauncher.class.getName();
42+
43+
/**
44+
* Number of worker processes to spin up per task runner
45+
* NOTE: If this size is too large, your container can OOM and halt execution unexpectedly. It's recommended not to touch
46+
* this parameter
47+
*/
48+
String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + "num.workers.per.container";
49+
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
50+
String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.cluster;
19+
20+
import com.typesafe.config.Config;
21+
22+
import io.temporal.client.WorkflowClient;
23+
import io.temporal.worker.Worker;
24+
import io.temporal.worker.WorkerFactory;
25+
26+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
27+
import org.apache.gobblin.util.ConfigUtils;
28+
29+
30+
public abstract class AbstractTemporalWorker {
31+
private final WorkflowClient workflowClient;
32+
private final String queueName;
33+
private final WorkerFactory workerFactory;
34+
private final Config config;
35+
36+
public AbstractTemporalWorker(Config cfg, WorkflowClient client) {
37+
config = cfg;
38+
workflowClient = client;
39+
queueName = ConfigUtils.getString(cfg,
40+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
41+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
42+
43+
// Create a Worker factory that can be used to create Workers that poll specific Task Queues.
44+
workerFactory = WorkerFactory.newInstance(workflowClient);
45+
}
46+
47+
public void start() {
48+
Worker worker = workerFactory.newWorker(queueName);
49+
// This Worker hosts both Workflow and Activity implementations.
50+
// Workflows are stateful, so you need to supply a type to create instances.
51+
worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
52+
// Activities are stateless and thread safe, so a shared instance is used.
53+
worker.registerActivitiesImplementations(getActivityImplInstances());
54+
// Start polling the Task Queue.
55+
workerFactory.start();
56+
}
57+
58+
/**
59+
* Shuts down the worker.
60+
*/
61+
public void shutdown() {
62+
workerFactory.shutdown();
63+
}
64+
65+
/** @return workflow types for *implementation* classes (not interface) */
66+
protected abstract Class<?>[] getWorkflowImplClasses();
67+
68+
/** @return activity instances; NOTE: activities must be stateless and thread-safe, so a shared instance is used. */
69+
protected abstract Object[] getActivityImplInstances();
70+
}

0 commit comments

Comments
 (0)