Skip to content

Commit 8284bb7

Browse files
committed
[GOBBLIN-264] Add a SharedResourceFactory for creating shared DataPub…
Closes apache#2116 from htran1/shareable_publishers
1 parent ae0ba28 commit 8284bb7

File tree

9 files changed

+501
-32
lines changed

9 files changed

+501
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gobblin.capability;
18+
19+
import org.apache.gobblin.annotation.Alpha;
20+
21+
import lombok.Data;
22+
23+
/**
24+
* Represents a set of functionality a job-creator can ask for. Examples could include
25+
* encryption, compression, partitioning...
26+
*
27+
* Each Capability has a name and then a set of associated configuration properties. An example is
28+
* the encryption algorithm to use.
29+
*/
30+
@Alpha
31+
@Data
32+
public class Capability {
33+
/**
34+
* Threadsafe capability.
35+
*/
36+
public static final Capability THREADSAFE = new Capability("THREADSAFE", false);
37+
38+
private final String name;
39+
private final boolean critical;
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gobblin.capability;
18+
19+
import java.util.Map;
20+
21+
import org.apache.gobblin.annotation.Alpha;
22+
23+
/**
24+
* Describes an object that is aware of the capabilities it supports.
25+
*/
26+
@Alpha
27+
public interface CapabilityAware {
28+
/**
29+
* Checks if this object supports the given Capability with the given properties.
30+
*
31+
* Implementers of this should always check if their super-class may happen to support a capability
32+
* before returning false!
33+
* @param c Capability being queried
34+
* @param properties Properties specific to the capability. Properties are capability specific.
35+
* @return True if this object supports the given capability + property settings, false if not
36+
*/
37+
boolean supportsCapability(Capability c, Map<String, Object> properties);
38+
}

gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import java.io.IOException;
2222
import java.lang.reflect.Constructor;
2323
import java.util.Collection;
24+
import java.util.Map;
2425

26+
import org.apache.gobblin.capability.Capability;
27+
import org.apache.gobblin.capability.CapabilityAware;
2528
import org.apache.gobblin.configuration.ConfigurationKeys;
2629
import org.apache.gobblin.configuration.State;
2730
import org.apache.gobblin.configuration.WorkUnitState;
@@ -30,7 +33,11 @@
3033
/**
3134
* Defines how to publish data and its corresponding metadata. Can be used for either task level or job level publishing.
3235
*/
33-
public abstract class DataPublisher implements Closeable {
36+
public abstract class DataPublisher implements Closeable, CapabilityAware {
37+
/**
38+
* Reusable capability.
39+
*/
40+
public static final Capability REUSABLE = new Capability("REUSABLE", false);
3441

3542
protected final State state;
3643

@@ -125,4 +132,9 @@ public boolean canBeSkipped() {
125132
protected boolean shouldPublishMetadataFirst() {
126133
return true;
127134
}
135+
136+
@Override
137+
public boolean supportsCapability(Capability c, Map<String, Object> properties) {
138+
return false;
139+
}
128140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.publisher;
19+
20+
import java.io.IOException;
21+
import java.util.Collections;
22+
23+
import org.apache.gobblin.broker.ImmediatelyInvalidResourceEntry;
24+
import org.apache.gobblin.broker.ResourceInstance;
25+
import org.apache.gobblin.broker.iface.ConfigView;
26+
import org.apache.gobblin.broker.iface.NotConfiguredException;
27+
import org.apache.gobblin.broker.iface.ScopeType;
28+
import org.apache.gobblin.broker.iface.ScopedConfigView;
29+
import org.apache.gobblin.broker.iface.SharedResourceFactory;
30+
import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
31+
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
32+
import org.apache.gobblin.capability.Capability;
33+
import org.apache.gobblin.configuration.State;
34+
35+
import lombok.extern.slf4j.Slf4j;
36+
37+
/**
38+
* A {@link SharedResourceFactory} for creating {@link DataPublisher}s.
39+
*
40+
* The factory creates a {@link DataPublisher} with the publisher class name and state.
41+
*/
42+
@Slf4j
43+
public class DataPublisherFactory<S extends ScopeType<S>>
44+
implements SharedResourceFactory<DataPublisher, DataPublisherKey, S> {
45+
46+
public static final String FACTORY_NAME = "dataPublisher";
47+
48+
public static <S extends ScopeType<S>> DataPublisher get(String publisherClassName, State state,
49+
SharedResourcesBroker<S> broker) throws IOException {
50+
try {
51+
return broker.getSharedResource(new DataPublisherFactory<S>(), new DataPublisherKey(publisherClassName, state));
52+
} catch (NotConfiguredException nce) {
53+
throw new IOException(nce);
54+
}
55+
}
56+
57+
@Override
58+
public String getName() {
59+
return FACTORY_NAME;
60+
}
61+
62+
@Override
63+
public SharedResourceFactoryResponse<DataPublisher> createResource(SharedResourcesBroker<S> broker,
64+
ScopedConfigView<S, DataPublisherKey> config) throws NotConfiguredException {
65+
try {
66+
DataPublisherKey key = config.getKey();
67+
String publisherClassName = key.getPublisherClassName();
68+
State state = key.getState();
69+
Class<? extends DataPublisher> dataPublisherClass = (Class<? extends DataPublisher>) Class
70+
.forName(publisherClassName);
71+
72+
DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state);
73+
74+
// If the publisher is threadsafe then it is shareable, so return it as a resource instance that may be cached
75+
// by the broker.
76+
// Otherwise, it is not shareable, so return it as an immediately invalidated resource that will only be returned
77+
// once from the broker.
78+
if (publisher.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP)) {
79+
return new ResourceInstance<>(publisher);
80+
} else {
81+
return new ImmediatelyInvalidResourceEntry<>(publisher);
82+
}
83+
} catch (ReflectiveOperationException e) {
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
88+
@Override
89+
public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey> config) {
90+
return broker.selfScope().getType().rootScope();
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.publisher;
19+
20+
import org.apache.gobblin.broker.iface.SharedResourceKey;
21+
import org.apache.gobblin.configuration.State;
22+
23+
import lombok.Getter;
24+
25+
26+
/**
27+
* {@link SharedResourceKey} for requesting {@link DataPublisher}s from a
28+
* {@link org.apache.gobblin.broker.iface.SharedResourceFactory
29+
*/
30+
@Getter
31+
public class DataPublisherKey implements SharedResourceKey {
32+
private final String publisherClassName;
33+
private final State state;
34+
35+
public DataPublisherKey(String publisherClassName, State state) {
36+
this.publisherClassName = publisherClassName;
37+
this.state = state;
38+
}
39+
40+
@Override
41+
public String toConfigurationKey() {
42+
return this.publisherClassName;
43+
}
44+
45+
@Override
46+
public boolean equals(Object o) {
47+
if (this == o) {
48+
return true;
49+
}
50+
if (o == null || getClass() != o.getClass()) {
51+
return false;
52+
}
53+
54+
DataPublisherKey that = (DataPublisherKey) o;
55+
56+
return publisherClassName == null ?
57+
that.publisherClassName == null : publisherClassName.equals(that.publisherClassName);
58+
}
59+
60+
@Override
61+
public int hashCode() {
62+
return publisherClassName != null ? publisherClassName.hashCode() : 0;
63+
}
64+
}

0 commit comments

Comments
 (0)