Skip to content

Commit 7352cad

Browse files
Will-Losuvasude
authored and
suvasude
committed
[GOBBLIN-1304] Adds group ownership service
Closes apache#3142 from Will-Lo/add-group-ownership- flows
1 parent 64c8ad9 commit 7352cad

File tree

19 files changed

+497
-33
lines changed

19 files changed

+497
-33
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public class ConfigurationKeys {
140140
public static final String FLOW_ALLOW_CONCURRENT_EXECUTION = "flow.allowConcurrentExecution";
141141
public static final String FLOW_EXPLAIN_KEY = "flow.explain";
142142
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
143+
public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
143144

144145
/**
145146
* Common topology configuration properties.

gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java

+3
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,7 @@ public class ServiceConfigKeys {
129129

130130
public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + "forceLeader";
131131
public static final boolean DEFAULT_FORCE_LEADER = false;
132+
// Group Membership authentication service
133+
public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
134+
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
132135
}

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ record FlowConfig {
2626
*/
2727
explain: boolean = false
2828

29+
/**
30+
* Optional string name of group that the requester belongs to for group ownership of flows.
31+
*/
32+
owningGroup: optional string
33+
2934
/**
3035
* Properties for the flow. These properties are passed to the compiled Gobblin jobs.
3136
*/

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json

+5
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
"type" : "boolean",
7777
"doc" : "Return the compiled flow as a string. If enabled, the flow is not added.",
7878
"default" : false
79+
}, {
80+
"name" : "owningGroup",
81+
"type" : "string",
82+
"doc" : "Optional string name of group that the requester belongs to for group ownership of flows.",
83+
"optional" : true
7984
}, {
8085
"name" : "properties",
8186
"type" : {

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@
6767
"type" : "boolean",
6868
"doc" : "Return the compiled flow as a string. If enabled, the flow is not added.",
6969
"default" : false
70+
}, {
71+
"name" : "owningGroup",
72+
"type" : "string",
73+
"doc" : "Optional string name of group that the requester belongs to for group ownership of flows.",
74+
"optional" : true
7075
}, {
7176
"name" : "properties",
7277
"type" : {

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java

+119-8
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.gobblin.service;
1919

2020
import java.io.File;
21+
import java.nio.file.Path;
2122
import java.util.List;
2223
import java.util.Map;
2324

2425
import org.apache.commons.io.FileUtils;
2526
import org.apache.commons.io.IOUtils;
27+
import org.mortbay.jetty.HttpStatus;
2628
import org.testng.Assert;
2729
import org.testng.annotations.AfterClass;
2830
import org.testng.annotations.BeforeClass;
@@ -62,12 +64,18 @@ public class FlowConfigV2Test {
6264
private EmbeddedRestliServer _server;
6365
private File _testDirectory;
6466
private TestRequesterService _requesterService;
67+
private GroupOwnershipService groupOwnershipService;
68+
private File groupConfigFile;
6569

6670
private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
6771
private static final String TEST_GROUP_NAME = "testGroup1";
6872
private static final String TEST_FLOW_NAME = "testFlow1";
6973
private static final String TEST_FLOW_NAME_2 = "testFlow2";
7074
private static final String TEST_FLOW_NAME_3 = "testFlow3";
75+
private static final String TEST_FLOW_NAME_4 = "testFlow4";
76+
private static final String TEST_FLOW_NAME_5 = "testFlow5";
77+
private static final String TEST_FLOW_NAME_6 = "testFlow6";
78+
private static final String TEST_FLOW_NAME_7 = "testFlow7";
7179
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
7280
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
7381

@@ -90,6 +98,15 @@ public void setUp() throws Exception {
9098

9199
_requesterService = new TestRequesterService(ConfigFactory.empty());
92100

101+
this.groupConfigFile = new File(_testDirectory + "/TestGroups.json");
102+
String groups ="{\"testGroup\": \"testName,testName2\"}";
103+
Files.write(groups.getBytes(), this.groupConfigFile);
104+
Config groupServiceConfig = ConfigBuilder.create()
105+
.addPrimitive(LocalGroupOwnershipService.GROUP_MEMBER_LIST, this.groupConfigFile.getAbsolutePath())
106+
.build();
107+
108+
groupOwnershipService = new LocalGroupOwnershipService(groupServiceConfig);
109+
93110
Injector injector = Guice.createInjector(new Module() {
94111
@Override
95112
public void configure(Binder binder) {
@@ -98,6 +115,7 @@ public void configure(Binder binder) {
98115
// been made
99116
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
100117
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
118+
binder.bind(GroupOwnershipService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE)).toInstance(groupOwnershipService);
101119
}
102120
});
103121

@@ -185,20 +203,102 @@ public void testBadPartialUpdate() throws Exception {
185203
_client.partialUpdateFlowConfig(flowId, flowConfigPatch);
186204
}
187205

188-
@Test (expectedExceptions = RestLiResponseException.class)
206+
@Test
189207
public void testDisallowedRequester() throws Exception {
190-
ServiceRequester testRequester = new ServiceRequester("testName", "testType", "testFrom");
191-
_requesterService.setRequester(testRequester);
208+
try {
209+
ServiceRequester testRequester = new ServiceRequester("testName", "testType", "testFrom");
210+
_requesterService.setRequester(testRequester);
211+
212+
Map<String, String> flowProperties = Maps.newHashMap();
213+
flowProperties.put("param1", "value1");
214+
215+
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4))
216+
.setTemplateUris(TEST_TEMPLATE_URI)
217+
.setProperties(new StringMap(flowProperties));
218+
_client.createFlowConfig(flowConfig);
219+
220+
testRequester.setName("testName2");
221+
_client.deleteFlowConfig(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4));
222+
} catch (RestLiResponseException e) {
223+
Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
224+
}
225+
}
192226

227+
@Test
228+
public void testGroupRequesterAllowed() throws Exception {
229+
ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
230+
_requesterService.setRequester(testRequester);
193231
Map<String, String> flowProperties = Maps.newHashMap();
194-
flowProperties.put("param1", "value1");
195232

196-
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
197-
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
198-
_client.createFlowConfig(flowConfig);
233+
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5))
234+
.setTemplateUris(TEST_TEMPLATE_URI)
235+
.setProperties(new StringMap(flowProperties))
236+
.setOwningGroup("testGroup");
237+
238+
_client.createFlowConfig(flowConfig);
199239

200240
testRequester.setName("testName2");
201-
_client.deleteFlowConfig(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME));
241+
_client.deleteFlowConfig(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5));
242+
}
243+
244+
@Test
245+
public void testGroupRequesterRejected() throws Exception {
246+
try {
247+
ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
248+
_requesterService.setRequester(testRequester);
249+
Map<String, String> flowProperties = Maps.newHashMap();
250+
251+
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6))
252+
.setTemplateUris(TEST_TEMPLATE_URI)
253+
.setProperties(new StringMap(flowProperties))
254+
.setOwningGroup("testGroup");
255+
256+
_client.createFlowConfig(flowConfig);
257+
258+
testRequester.setName("testName3");
259+
_client.deleteFlowConfig(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6));
260+
} catch (RestLiResponseException e) {
261+
Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
262+
}
263+
}
264+
265+
@Test
266+
public void testLocalGroupOwnershipUpdates() throws Exception {
267+
try {
268+
ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
269+
_requesterService.setRequester(testRequester);
270+
Map<String, String> flowProperties = Maps.newHashMap();
271+
272+
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
273+
.setTemplateUris(TEST_TEMPLATE_URI)
274+
.setProperties(new StringMap(flowProperties))
275+
.setOwningGroup("testGroup2");
276+
277+
_client.createFlowConfig(flowConfig);
278+
279+
} catch (RestLiResponseException e) {
280+
Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
281+
}
282+
283+
String filePath = this.groupConfigFile.getAbsolutePath();
284+
this.groupConfigFile.delete();
285+
this.groupConfigFile = new File(filePath);
286+
String groups ="{\"testGroup2\": \"testName,testName3\"}";
287+
Files.write(groups.getBytes(), this.groupConfigFile);
288+
289+
ServiceRequester testRequester = new ServiceRequester("testName", "USER_PRINCIPAL", "testFrom");
290+
_requesterService.setRequester(testRequester);
291+
Map<String, String> flowProperties = Maps.newHashMap();
292+
293+
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
294+
.setTemplateUris(TEST_TEMPLATE_URI)
295+
.setProperties(new StringMap(flowProperties))
296+
.setOwningGroup("testGroup2");
297+
298+
// this should no longer fail as the localGroupOwnership service should have updated as the file changed
299+
_client.createFlowConfig(flowConfig);
300+
testRequester.setName("testName3");
301+
_client.deleteFlowConfig(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7));
202302
}
203303

204304
@AfterClass(alwaysRun = true)
@@ -226,5 +326,16 @@ public TestRequesterService(Config config) {
226326
public List<ServiceRequester> findRequesters(BaseResource resource) {
227327
return requester == null ? Lists.newArrayList() : Lists.newArrayList(requester);
228328
}
329+
330+
@Override
331+
public boolean isRequesterAllowed(
332+
List<ServiceRequester> originalRequesterList, List<ServiceRequester> currentRequesterList) {
333+
for (ServiceRequester s: currentRequesterList) {
334+
if (originalRequesterList.contains(s)) {
335+
return true;
336+
}
337+
}
338+
return false;
339+
}
229340
}
230341
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"testGroup": "testName,testName2"
3+
}

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java

+4
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,10 @@ public static FlowSpec createFlowSpecForConfig(FlowConfig flowConfig) {
248248
configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXPLAIN_KEY, flowConfig.isExplain());
249249
}
250250

251+
if (flowConfig.hasOwningGroup()) {
252+
configBuilder.addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, flowConfig.getOwningGroup());
253+
}
254+
251255
Config config = configBuilder.build();
252256

253257
Config configWithFallback;

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java

+46-6
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
5656
public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsV2ResourceHandler";
5757
public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
5858
public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
59-
59+
public static final String INJECT_GROUP_OWNERSHIP_SERVICE = "v2GroupOwnershipService";
6060
private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
6161

6262

@@ -77,6 +77,10 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
7777
@Named(INJECT_READY_TO_USE)
7878
private Boolean readyToUse;
7979

80+
@Inject
81+
@Named(INJECT_GROUP_OWNERSHIP_SERVICE)
82+
private GroupOwnershipService groupOwnershipService;
83+
8084
public FlowConfigsV2Resource() {
8185
}
8286

@@ -128,11 +132,14 @@ public List<FlowConfig> getFilteredFlows(@Context PagingContext context,
128132
@ReturnEntity
129133
@Override
130134
public CreateKVResponse create(FlowConfig flowConfig) {
131-
List<ServiceRequester> requestorList = this.requesterService.findRequesters(this);
135+
List<ServiceRequester> requesterList = this.requesterService.findRequesters(this);
132136
try {
133-
String serialized = RequesterService.serialize(requestorList);
137+
String serialized = RequesterService.serialize(requesterList);
134138
flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized);
135139
LOG.info("Rest requester list is " + serialized);
140+
if (flowConfig.hasOwningGroup() && !this.groupOwnershipService.isMemberOfGroup(requesterList, flowConfig.getOwningGroup())) {
141+
throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not part of owning group specified");
142+
}
136143
} catch (IOException e) {
137144
throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "cannot get who is the requester", e);
138145
}
@@ -148,7 +155,7 @@ public CreateKVResponse create(FlowConfig flowConfig) {
148155
*/
149156
@Override
150157
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowConfig flowConfig) {
151-
FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
158+
checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
152159
String flowGroup = key.getKey().getFlowGroup();
153160
String flowName = key.getKey().getFlowName();
154161
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -163,7 +170,7 @@ public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, FlowC
163170
*/
164171
@Override
165172
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, PatchRequest<FlowConfig> flowConfigPatch) {
166-
FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
173+
checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
167174
String flowGroup = key.getKey().getFlowGroup();
168175
String flowName = key.getKey().getFlowName();
169176
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -177,7 +184,7 @@ public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, Patch
177184
*/
178185
@Override
179186
public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
180-
FlowConfigsResource.checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
187+
checkRequester(this.requesterService, get(key), this.requesterService.findRequesters(this));
181188
String flowGroup = key.getKey().getFlowGroup();
182189
String flowName = key.getKey().getFlowName();
183190
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -200,4 +207,37 @@ private Properties getHeaders() {
200207
}
201208
return headerProperties;
202209
}
210+
211+
/**
212+
* Check that all {@link ServiceRequester}s in this request are contained within the original service requester list
213+
* or is part of the original requester's owning group when the flow was submitted. If they are not, throw a {@link FlowConfigLoggedException} with {@link HttpStatus#S_401_UNAUTHORIZED}.
214+
* If there is a failure when deserializing the original requester list, throw a {@link FlowConfigLoggedException} with
215+
* {@link HttpStatus#S_400_BAD_REQUEST}.
216+
* @param requesterService the {@link RequesterService} used to verify the requester
217+
* @param originalFlowConfig original flow config to find original requester
218+
* @param requesterList list of requesters for this request
219+
*/
220+
public void checkRequester(
221+
RequesterService requesterService, FlowConfig originalFlowConfig, List<ServiceRequester> requesterList) {
222+
if (requesterList == null) {
223+
return;
224+
}
225+
226+
try {
227+
String serializedOriginalRequesterList = originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
228+
if (serializedOriginalRequesterList != null) {
229+
List<ServiceRequester> originalRequesterList = RequesterService.deserialize(serializedOriginalRequesterList);
230+
if (!requesterService.isRequesterAllowed(originalRequesterList, requesterList)) {
231+
// if the requester is not whitelisted or the original requester, reject the requester if it is not part of the owning group
232+
// of the original requester
233+
if (!(originalFlowConfig.hasOwningGroup() && this.groupOwnershipService.isMemberOfGroup(
234+
requesterList, originalFlowConfig.getOwningGroup()))) {
235+
throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, "Requester not allowed to make this request");
236+
}
237+
}
238+
}
239+
} catch (IOException e) {
240+
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "Failed to get original requester list", e);
241+
}
242+
}
203243
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.service;
19+
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
/**
24+
* Service for handling group ownership of flows
25+
*/
26+
public abstract class GroupOwnershipService {
27+
28+
/**
29+
* @return true if any of the serviceRequesters belong in the group
30+
*/
31+
public abstract boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, String group);
32+
33+
/**
34+
* Extracts ServiceRequester names
35+
* @param requesterList
36+
* @return a list of service requester names
37+
*/
38+
protected static List<String> extractRequesterNames(List<ServiceRequester> requesterList) {
39+
return requesterList.stream()
40+
.map(requester -> requester.getName())
41+
.collect(Collectors.toList());
42+
}
43+
}

0 commit comments

Comments
 (0)