Skip to content

Commit 94b9da0

Browse files
authored
feat(entityVersioning): initial implementation (datahub-project#12166)
1 parent 90fe5b6 commit 94b9da0

File tree

68 files changed

+4063
-121
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+4063
-121
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java

+15
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@
174174
import com.linkedin.datahub.graphql.resolvers.embed.UpdateEmbedResolver;
175175
import com.linkedin.datahub.graphql.resolvers.entity.EntityExistsResolver;
176176
import com.linkedin.datahub.graphql.resolvers.entity.EntityPrivilegesResolver;
177+
import com.linkedin.datahub.graphql.resolvers.entity.versioning.LinkAssetVersionResolver;
178+
import com.linkedin.datahub.graphql.resolvers.entity.versioning.UnlinkAssetVersionResolver;
177179
import com.linkedin.datahub.graphql.resolvers.form.BatchAssignFormResolver;
178180
import com.linkedin.datahub.graphql.resolvers.form.BatchRemoveFormResolver;
179181
import com.linkedin.datahub.graphql.resolvers.form.CreateDynamicFormAssignmentResolver;
@@ -391,6 +393,7 @@
391393
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
392394
import com.linkedin.metadata.connection.ConnectionService;
393395
import com.linkedin.metadata.entity.EntityService;
396+
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
394397
import com.linkedin.metadata.graph.GraphClient;
395398
import com.linkedin.metadata.graph.SiblingGraphService;
396399
import com.linkedin.metadata.models.registry.EntityRegistry;
@@ -476,6 +479,7 @@ public class GmsGraphQLEngine {
476479
private final RestrictedService restrictedService;
477480
private ConnectionService connectionService;
478481
private AssertionService assertionService;
482+
private final EntityVersioningService entityVersioningService;
479483

480484
private final BusinessAttributeService businessAttributeService;
481485
private final FeatureFlags featureFlags;
@@ -599,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
599603
this.restrictedService = args.restrictedService;
600604
this.connectionService = args.connectionService;
601605
this.assertionService = args.assertionService;
606+
this.entityVersioningService = args.entityVersioningService;
602607

603608
this.businessAttributeService = args.businessAttributeService;
604609
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
@@ -1392,6 +1397,16 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
13921397
"removeBusinessAttribute",
13931398
new RemoveBusinessAttributeResolver(this.entityService));
13941399
}
1400+
if (featureFlags.isEntityVersioning()) {
1401+
typeWiring
1402+
.dataFetcher(
1403+
"linkAssetVersion",
1404+
new LinkAssetVersionResolver(this.entityVersioningService, this.featureFlags))
1405+
.dataFetcher(
1406+
"unlinkAssetVersion",
1407+
new UnlinkAssetVersionResolver(
1408+
this.entityVersioningService, this.featureFlags));
1409+
}
13951410
return typeWiring;
13961411
});
13971412
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngineArgs.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
2222
import com.linkedin.metadata.connection.ConnectionService;
2323
import com.linkedin.metadata.entity.EntityService;
24+
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
2425
import com.linkedin.metadata.graph.GraphClient;
2526
import com.linkedin.metadata.graph.SiblingGraphService;
2627
import com.linkedin.metadata.models.registry.EntityRegistry;
@@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
8889
BusinessAttributeService businessAttributeService;
8990
ConnectionService connectionService;
9091
AssertionService assertionService;
92+
EntityVersioningService entityVersioningService;
9193

9294
// any fork specific args should go below this line
9395
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.linkedin.datahub.graphql.resolvers.entity.versioning;
2+
3+
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
4+
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
5+
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;
6+
7+
import com.datahub.authorization.AuthUtil;
8+
import com.google.common.collect.ImmutableSet;
9+
import com.linkedin.common.urn.Urn;
10+
import com.linkedin.common.urn.UrnUtils;
11+
import com.linkedin.datahub.graphql.QueryContext;
12+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
13+
import com.linkedin.datahub.graphql.exception.AuthorizationException;
14+
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
15+
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
16+
import com.linkedin.metadata.entity.IngestResult;
17+
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
18+
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
19+
import graphql.schema.DataFetcher;
20+
import graphql.schema.DataFetchingEnvironment;
21+
import io.datahubproject.metadata.context.OperationContext;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import org.apache.commons.lang.StringUtils;
25+
26+
/**
27+
* Currently only supports linking the latest version, but may be modified later to support inserts
28+
*/
29+
public class LinkAssetVersionResolver implements DataFetcher<CompletableFuture<String>> {
30+
31+
private final EntityVersioningService entityVersioningService;
32+
private final FeatureFlags featureFlags;
33+
34+
public LinkAssetVersionResolver(
35+
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
36+
this.entityVersioningService = entityVersioningService;
37+
this.featureFlags = featureFlags;
38+
}
39+
40+
@Override
41+
public CompletableFuture<String> get(DataFetchingEnvironment environment) throws Exception {
42+
final QueryContext context = environment.getContext();
43+
final LinkVersionInput input =
44+
bindArgument(environment.getArgument("input"), LinkVersionInput.class);
45+
if (!featureFlags.isEntityVersioning()) {
46+
throw new IllegalAccessError(
47+
"Entity Versioning is not configured, please enable before attempting to use this feature.");
48+
}
49+
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
50+
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
51+
throw new IllegalArgumentException(
52+
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
53+
}
54+
Urn entityUrn = UrnUtils.getUrn(input.getLinkedEntity());
55+
OperationContext opContext = context.getOperationContext();
56+
if (!AuthUtil.isAPIAuthorizedEntityUrns(
57+
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
58+
throw new AuthorizationException(
59+
String.format(
60+
"%s is unauthorized to %s entities %s and %s",
61+
opContext.getAuthentication().getActor().toUrnStr(),
62+
UPDATE,
63+
input.getVersionSet(),
64+
input.getLinkedEntity()));
65+
}
66+
VersionPropertiesInput versionPropertiesInput =
67+
new VersionPropertiesInput(
68+
input.getComment(),
69+
input.getVersion(),
70+
input.getSourceTimestamp(),
71+
input.getSourceCreator());
72+
return GraphQLConcurrencyUtils.supplyAsync(
73+
() -> {
74+
List<IngestResult> linkResults =
75+
entityVersioningService.linkLatestVersion(
76+
opContext, versionSetUrn, entityUrn, versionPropertiesInput);
77+
78+
return linkResults.stream()
79+
.filter(
80+
ingestResult -> input.getLinkedEntity().equals(ingestResult.getUrn().toString()))
81+
.map(ingestResult -> ingestResult.getUrn().toString())
82+
.findAny()
83+
.orElse(StringUtils.EMPTY);
84+
},
85+
this.getClass().getSimpleName(),
86+
"get");
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.linkedin.datahub.graphql.resolvers.entity.versioning;
2+
3+
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
4+
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
5+
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;
6+
7+
import com.datahub.authorization.AuthUtil;
8+
import com.google.common.collect.ImmutableSet;
9+
import com.linkedin.common.urn.Urn;
10+
import com.linkedin.common.urn.UrnUtils;
11+
import com.linkedin.datahub.graphql.QueryContext;
12+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
13+
import com.linkedin.datahub.graphql.exception.AuthorizationException;
14+
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
15+
import com.linkedin.datahub.graphql.generated.UnlinkVersionInput;
16+
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
17+
import graphql.schema.DataFetcher;
18+
import graphql.schema.DataFetchingEnvironment;
19+
import io.datahubproject.metadata.context.OperationContext;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
public class UnlinkAssetVersionResolver implements DataFetcher<CompletableFuture<Boolean>> {
23+
24+
private final EntityVersioningService entityVersioningService;
25+
private final FeatureFlags featureFlags;
26+
27+
public UnlinkAssetVersionResolver(
28+
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
29+
this.entityVersioningService = entityVersioningService;
30+
this.featureFlags = featureFlags;
31+
}
32+
33+
@Override
34+
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
35+
if (!featureFlags.isEntityVersioning()) {
36+
throw new IllegalAccessError(
37+
"Entity Versioning is not configured, please enable before attempting to use this feature.");
38+
}
39+
final QueryContext context = environment.getContext();
40+
final UnlinkVersionInput input =
41+
bindArgument(environment.getArgument("input"), UnlinkVersionInput.class);
42+
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
43+
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
44+
throw new IllegalArgumentException(
45+
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
46+
}
47+
Urn entityUrn = UrnUtils.getUrn(input.getUnlinkedEntity());
48+
OperationContext opContext = context.getOperationContext();
49+
if (!AuthUtil.isAPIAuthorizedEntityUrns(
50+
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
51+
throw new AuthorizationException(
52+
String.format(
53+
"%s is unauthorized to %s entities %s and %s",
54+
opContext.getAuthentication().getActor(),
55+
UPDATE,
56+
input.getVersionSet(),
57+
input.getUnlinkedEntity()));
58+
}
59+
return GraphQLConcurrencyUtils.supplyAsync(
60+
() -> {
61+
entityVersioningService.unlinkVersion(opContext, versionSetUrn, entityUrn);
62+
return true;
63+
},
64+
this.getClass().getSimpleName(),
65+
"get");
66+
}
67+
}

datahub-graphql-core/src/main/resources/entity.graphql

+60
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,16 @@ type Mutation {
956956
Remove Business Attribute
957957
"""
958958
removeBusinessAttribute(input: AddBusinessAttributeInput!): Boolean
959+
960+
"""
961+
Link the latest versioned entity to a Version Set
962+
"""
963+
linkAssetVersion(input: LinkVersionInput!): String
964+
965+
"""
966+
Unlink a versioned entity from a Version Set
967+
"""
968+
unlinkAssetVersion(input: UnlinkVersionInput!): Boolean
959969
}
960970

961971
"""
@@ -12911,6 +12921,56 @@ input ListBusinessAttributesInput {
1291112921
query: String
1291212922
}
1291312923

12924+
"""
12925+
Input for linking a versioned entity to a Version Set
12926+
"""
12927+
input LinkVersionInput {
12928+
"""
12929+
The target version set
12930+
"""
12931+
versionSet: String!
12932+
12933+
"""
12934+
The target versioned entity to link
12935+
"""
12936+
linkedEntity: String!
12937+
12938+
"""
12939+
Version Tag label for the version, should be unique within a Version Set
12940+
"""
12941+
version: String!
12942+
12943+
"""
12944+
Optional timestamp from the source system
12945+
"""
12946+
sourceTimestamp: Long
12947+
12948+
"""
12949+
Optional creator from the source system, will be converted to an Urn
12950+
"""
12951+
sourceCreator: String
12952+
12953+
"""
12954+
Optional comment about the version
12955+
"""
12956+
comment: String
12957+
}
12958+
12959+
"""
12960+
Input for unlinking a versioned entity from a Version Set
12961+
"""
12962+
input UnlinkVersionInput {
12963+
"""
12964+
The target version set
12965+
"""
12966+
versionSet: String
12967+
12968+
"""
12969+
The target versioned entity to unlink
12970+
"""
12971+
unlinkedEntity: String
12972+
}
12973+
1291412974
"""
1291512975
The result obtained when listing Business Attribute
1291612976
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package com.linkedin.datahub.graphql.resolvers.entity.versioning;
2+
3+
import static com.linkedin.datahub.graphql.TestUtils.*;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.ArgumentMatchers.eq;
6+
import static org.testng.Assert.*;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import com.linkedin.common.urn.Urn;
10+
import com.linkedin.common.urn.UrnUtils;
11+
import com.linkedin.datahub.graphql.QueryContext;
12+
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
13+
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
14+
import com.linkedin.metadata.entity.IngestResult;
15+
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
16+
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
17+
import graphql.schema.DataFetchingEnvironment;
18+
import org.mockito.Mockito;
19+
import org.testng.annotations.Test;
20+
21+
public class LinkAssetVersionResolverTest {
22+
23+
private static final String TEST_VERSION_SET_URN = "urn:li:versionSet:test-version-set";
24+
private static final String TEST_ENTITY_URN =
25+
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)";
26+
27+
@Test
28+
public void testGetSuccessful() throws Exception {
29+
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
30+
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);
31+
32+
Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);
33+
34+
IngestResult mockResult =
35+
IngestResult.builder().urn(Urn.createFromString(TEST_ENTITY_URN)).build();
36+
37+
Mockito.when(
38+
mockService.linkLatestVersion(
39+
any(),
40+
eq(UrnUtils.getUrn(TEST_VERSION_SET_URN)),
41+
eq(UrnUtils.getUrn(TEST_ENTITY_URN)),
42+
any(VersionPropertiesInput.class)))
43+
.thenReturn(ImmutableList.of(mockResult));
44+
45+
LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);
46+
47+
// Execute resolver
48+
QueryContext mockContext = getMockAllowContext();
49+
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
50+
LinkVersionInput input = new LinkVersionInput();
51+
input.setVersionSet(TEST_VERSION_SET_URN);
52+
input.setLinkedEntity(TEST_ENTITY_URN);
53+
input.setComment("Test comment");
54+
input.setVersion("v1");
55+
56+
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
57+
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
58+
59+
String result = resolver.get(mockEnv).get();
60+
assertEquals(result, TEST_ENTITY_URN);
61+
}
62+
63+
@Test
64+
public void testGetFeatureFlagDisabled() throws Exception {
65+
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
66+
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);
67+
68+
Mockito.when(mockFlags.isEntityVersioning()).thenReturn(false);
69+
70+
LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);
71+
72+
// Execute resolver
73+
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
74+
LinkVersionInput input = new LinkVersionInput();
75+
input.setVersionSet(TEST_VERSION_SET_URN);
76+
input.setLinkedEntity(TEST_ENTITY_URN);
77+
78+
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
79+
80+
assertThrows(IllegalAccessError.class, () -> resolver.get(mockEnv));
81+
}
82+
83+
@Test
84+
public void testGetInvalidVersionSetUrn() throws Exception {
85+
EntityVersioningService mockService = Mockito.mock(EntityVersioningService.class);
86+
FeatureFlags mockFlags = Mockito.mock(FeatureFlags.class);
87+
88+
Mockito.when(mockFlags.isEntityVersioning()).thenReturn(true);
89+
90+
LinkAssetVersionResolver resolver = new LinkAssetVersionResolver(mockService, mockFlags);
91+
92+
// Execute resolver
93+
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
94+
LinkVersionInput input = new LinkVersionInput();
95+
input.setVersionSet("urn:li:dataset:invalid-version-set"); // Invalid URN type
96+
input.setLinkedEntity(TEST_ENTITY_URN);
97+
98+
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
99+
100+
assertThrows(IllegalArgumentException.class, () -> resolver.get(mockEnv));
101+
}
102+
}

0 commit comments

Comments
 (0)