diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 1694c08bd..afb7f1751 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -24,7 +24,7 @@ import com.linkedin.metadata.dao.exception.ModelValidationException; import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction; import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry; -import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo; +import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; @@ -1666,9 +1666,9 @@ protected ASPECT updatePreIngestionLambdas(@Nonn protected ASPECT preUpdateRouting(URN urn, ASPECT newAspect) { if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered( newAspect.getClass())) { - PreRoutingInfo routingMap = _preUpdateAspectRegistry.getPreUpdateRoutingClient(newAspect); + PreRoutingAccessor preRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingClient(newAspect); PreUpdateRoutingClient client = - routingMap.getPreUpdateClient(); + preRoutingAccessor.getPreUpdateClient(); PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect); ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect(); log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect); diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfo.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessor.java similarity index 52% rename from dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfo.java rename to dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessor.java index d11a5b509..33068d5ac 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfo.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessor.java @@ -1,13 +1,14 @@ package com.linkedin.metadata.dao.ingestion.preupdate; import com.google.protobuf.Message; +import com.linkedin.data.template.RecordTemplate; import lombok.Data; @Data -public class PreRoutingInfo { +public class PreRoutingAccessor { - public PreUpdateRoutingClient preUpdateClient; + public PreUpdateRoutingClient preUpdateClient; public enum RoutingAction { PROCEED, SKIP diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java index 64762f3bc..b71703617 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java @@ -1,8 +1,8 @@ package com.linkedin.metadata.dao.ingestion.preupdate; import com.linkedin.data.template.RecordTemplate; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -11,21 +11,21 @@ * A registry which maintains mapping of aspects and their PreUpdateRoutingClient. */ @Slf4j -public class PreUpdateAspectRegistry { +public class PreUpdateAspectRegistry { - private Map, PreRoutingInfo> _preUpdateLambdaMap = new ConcurrentHashMap<>(); + private Map, PreRoutingAccessor> _preUpdateLambdaMap = new HashMap<>(); /** * Get GrpcPreUpdateRoutingClient for an aspect. */ - public PreRoutingInfo getPreUpdateRoutingClient(@Nonnull final ASPECT aspect) { + public PreRoutingAccessor getPreUpdateRoutingClient(@Nonnull final ASPECT aspect) { return _preUpdateLambdaMap.get(aspect.getClass()); } /** * Check if GrpcPreUpdateRoutingClient is registered for an aspect. */ - public boolean isRegistered(@Nonnull final Class aspectClass) { + public boolean isRegistered(@Nonnull final Class aspectClass) { return _preUpdateLambdaMap.containsKey(aspectClass); } @@ -35,7 +35,7 @@ public boolean isRegistered(@Nonnull final Class aspectClass) { * @param preUpdateRoutingInfo pre update routing map */ public void registerPreUpdateLambda(@Nonnull Class aspectClass, - @Nonnull PreRoutingInfo preUpdateRoutingInfo) { + @Nonnull PreRoutingAccessor preUpdateRoutingInfo) { log.info("Registering pre update lambda: {}, {}", aspectClass.getCanonicalName(), preUpdateRoutingInfo); _preUpdateLambdaMap.put(aspectClass, preUpdateRoutingInfo); } diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java index 39734dcf4..b77d12a45 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java @@ -3,7 +3,9 @@ import com.linkedin.data.template.RecordTemplate; import lombok.Data; - +/** + * Response of pre-update process that includes the updated aspect. + */ @Data public class PreUpdateResponse { private final ASPECT updatedAspect; diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index 81a0992bd..7a5a07979 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -7,7 +7,7 @@ import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl; import com.linkedin.metadata.dao.ingestion.SamplePreUpdateRoutingClient; -import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo; +import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; @@ -661,11 +661,11 @@ public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException { AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(); - preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo); @@ -678,10 +678,10 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException { AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); _dummyLocalDAO.setAlwaysEmitAuditEvent(true); - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(); - preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); expectGetLatest(urn, AspectFoo.class, @@ -701,10 +701,10 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti AspectBar foo = new AspectBar().setValue("foo"); // Inject RestliPreIngestionAspectRegistry with no registered aspect - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(); - preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + preUpdateAspectRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); // Call the add method diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfoTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessorTest.java similarity index 71% rename from dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfoTest.java rename to dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessorTest.java index 85b62d819..3d046fa62 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingInfoTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreRoutingAccessorTest.java @@ -8,13 +8,13 @@ import static org.mockito.Mockito.*; -public class PreRoutingInfoTest { - private PreRoutingInfo routingInfo; +public class PreRoutingAccessorTest { + private PreRoutingAccessor routingInfo; private PreUpdateRoutingClient mockPreUpdateClient; @BeforeMethod public void setUp() { - routingInfo = new PreRoutingInfo(); + routingInfo = new PreRoutingAccessor(); mockPreUpdateClient = mock(PreUpdateRoutingClient.class); } @@ -26,7 +26,7 @@ public void testPreUpdateClientSetterAndGetter() { @Test public void testRoutingActionEnum() { - assertEquals("PROCEED", PreRoutingInfo.RoutingAction.PROCEED.name()); - assertEquals("SKIP", PreRoutingInfo.RoutingAction.SKIP.name()); + assertEquals("PROCEED", PreRoutingAccessor.RoutingAction.PROCEED.name()); + assertEquals("SKIP", PreRoutingAccessor.RoutingAction.SKIP.name()); } } diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java index 199cd6df6..dd03e7ab5 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java @@ -7,7 +7,7 @@ import com.linkedin.data.template.StringArray; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.AspectKey; -import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo; +import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; @@ -687,8 +687,8 @@ private List getValueFromRoutingGms(@Nonnull URN urn, * @return the updated aspect */ private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, PreUpdateAspectRegistry registry) { - PreRoutingInfo routingMap = registry.getPreUpdateRoutingClient(aspect); - PreUpdateRoutingClient preUpdateClient = routingMap.getPreUpdateClient(); + PreRoutingAccessor preRoutingAccessor = registry.getPreUpdateRoutingClient(aspect); + PreUpdateRoutingClient preUpdateClient = preRoutingAccessor.getPreUpdateClient(); PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(urn, aspect); return preUpdateResponse.getUpdatedAspect(); } diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java index 76e825c06..65fbb959c 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java @@ -8,7 +8,7 @@ import com.linkedin.metadata.dao.BaseBrowseDAO; import com.linkedin.metadata.dao.BaseLocalDAO; import com.linkedin.metadata.dao.BaseSearchDAO; -import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo; +import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingAccessor; import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; @@ -562,9 +562,9 @@ public void testPreUpdateRoutingWithRegisteredAspect() { EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(); - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); // given: ingest a snapshot containing a routed aspect which has a registered pre-update lambda. @@ -607,9 +607,9 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() { EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(); - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); @@ -658,9 +658,9 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(); - PreRoutingInfo preRoutingInfo = new PreRoutingInfo(); - preRoutingInfo.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - registry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo); + PreRoutingAccessor preRoutingAccessor = new PreRoutingAccessor(); + preRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + registry.registerPreUpdateLambda(AspectFoo.class, preRoutingAccessor); when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); runAndWait(_resource.ingest(snapshot));