Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RPC invocation for PreUpdate #443

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.GrpcPreUpdateRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -215,6 +219,8 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();

private GrpcPreUpdateRegistry _grpcPreUpdateRegistry = null;

/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -400,7 +406,7 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
}

/**
* Set pre ingestion aspect registry.
* Set pre ingestion aspect registry for restli implementation.
*/
public void setRestliPreUpdateAspectRegistry(
@Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
Expand All @@ -414,6 +420,12 @@ public RestliPreUpdateAspectRegistry getRestliPreUpdateAspectRegistry() {
return _restliPreUpdateAspectRegistry;
}

/**
* Set pre ingestion aspect registry for grpc implementation.
*/
public void setGrpcPreUpdateRegistry(@Nullable GrpcPreUpdateRegistry grpcPreUpdateRegistry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we call this method? Should we do it through inject?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will inject it in the AssetClass where we call the ingestion method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For RestliRegistry, I was setting it in DatasetLocalDao.

_grpcPreUpdateRegistry = grpcPreUpdateRegistry;
}

/**
* Enables or disables atomic updates of multiple aspects.
Expand Down Expand Up @@ -1657,12 +1669,13 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* This method routes the update request to the appropriate API for pre-ingestion processing.
* @param urn the urn of the asset
* @param newValue the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newValue) {
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(@Nonnull URN urn, @Nonnull ASPECT newValue) {

if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
newValue.getClass())) {
RestliCompliantPreUpdateRoutingClient client =
Expand All @@ -1673,6 +1686,24 @@ protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPEC
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, convertedAspect);
return (ASPECT) convertedAspect;
}

if (_grpcPreUpdateRegistry != null && _grpcPreUpdateRegistry.isRegistered(newValue.getClass())) {
// Fetch routing data (PreUpdateClient instance) for the given aspect
PreRoutingInfo routingMap = _grpcPreUpdateRegistry.getPreUpdateRoutingClient(newValue);
PreUpdateClient preUpdateClient = routingMap.getPreUpdateClient();
try {
// Invoke the grpc service pre update method
PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(urn, newValue);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, previous aspect: {}, updated aspect: {}", urn,
newValue, updatedAspect);
return updatedAspect;
} catch (Exception e) {
log.error("Exception during gRPC pre-update routing for URN: {}, Aspect: {}. Error: {}", urn, newValue,
e.getMessage(), e);
throw new RuntimeException("Error during gRPC preUpdateRouting", e);
}
}
return newValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their GrpcPreUpdateRoutingClient.
*/
@Slf4j
public class GrpcPreUpdateRegistry<ASPECT extends RecordTemplate> {

private Map<Class<? extends RecordTemplate>, PreRoutingInfo> _preUpdateLambdaMap = new ConcurrentHashMap<>();

/**
* Get GrpcPreUpdateRoutingClient for an aspect.
*/
public PreRoutingInfo getPreUpdateRoutingClient(@Nonnull final ASPECT aspect) {
return _preUpdateLambdaMap.get(aspect.getClass());
}

/**
* Check if GrpcPreUpdateRoutingClient is registered for an aspect.
*/
public boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return _preUpdateLambdaMap.containsKey(aspectClass);
}

/**
* Register a pre update lambda for the given aspect.
* @param aspectClass aspect class
* @param preUpdateRoutingInfo pre update routing map
*/
public void registerPreUpdateLambda(@Nonnull Class<? extends RecordTemplate> aspectClass,
@Nonnull PreRoutingInfo preUpdateRoutingInfo) {
log.info("Registering pre update lambda: {}, {}", aspectClass.getCanonicalName(), preUpdateRoutingInfo);
_preUpdateLambdaMap.put(aspectClass, preUpdateRoutingInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import lombok.Data;


@Data
public class PreRoutingInfo {

public PreUpdateClient<? extends Message> preUpdateClient;

public enum RoutingAction {
PROCEED, SKIP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;


public interface PreUpdateClient<ASPECT extends RecordTemplate> {

/**
* Executes the gRPC pre-update logic, including building the request,
* invoking the service, and handling the response.
*
* @param urn The URN of the entity to be updated.
* @param aspect The aspect to be updated.
* @return The updated aspect.
*/
PreUpdateResponse<ASPECT> preUpdate(Urn urn, ASPECT aspect);

/**
* Converts a RecordTemplate URN to a gRPC-compatible Message.
*
* @param urn The RecordTemplate URN.
* @return The gRPC-compatible Message URN.
*/
Message convertUrnToMessage(Urn urn);

/**
* Converts a RecordTemplate Aspect to a gRPC-compatible Message Aspect.
*
* @param aspect The RecordTemplate aspect.
* @return The gRPC-compatible Message aspect.
*/
Message convertAspectToMessage(RecordTemplate aspect);

/**
* Converts a gRPC-compatible Message Aspect back to a RecordTemplate Aspect.
*
* @param messageAspect The Message aspect.
* @return The RecordTemplate aspect.
*/
RecordTemplate convertAspectToRecordTemplate(Message messageAspect);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;


@Data
public class PreUpdateResponse<ASPECT extends RecordTemplate> {
private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;


/**
* A restli client to route update request to the appropriate to custom APIs.
* <p>This interface extends {@link PreUpdateRoutingClient} and provides additional methods for converting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.ingestion.preupdate.GrpcPreUpdateRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreRoutingInfo;
import com.linkedin.metadata.dao.ingestion.preupdate.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.ingestion.preupdate.SamplePreUpdateGrpcClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand Down Expand Up @@ -695,4 +698,44 @@ public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxExcepti
// Verify that the result is the same as the input aspect since it's not registered
assertEquals(result, foo);
}

@Test
public void testPreUpdateRoutingGrpcFromFooToBar() throws URISyntaxException {
// Setup test data
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");

GrpcPreUpdateRegistry grpcPreUpdateRegistry = new GrpcPreUpdateRegistry();
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateGrpcClient());


grpcPreUpdateRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
_dummyLocalDAO.setGrpcPreUpdateRegistry(grpcPreUpdateRegistry);

AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo);
assertEquals(result, bar);
}

@Test
public void testPreUpdateRoutingGrpcWithUnregisteredAspect() throws URISyntaxException {
// Setup test data
FooUrn urn = new FooUrn(1);
AspectBar foo = new AspectBar().setValue("foo");

GrpcPreUpdateRegistry grpcPreUpdateRegistry = new GrpcPreUpdateRegistry();
PreRoutingInfo preRoutingInfo = new PreRoutingInfo();
preRoutingInfo.setPreUpdateClient(new SamplePreUpdateGrpcClient());


grpcPreUpdateRegistry.registerPreUpdateLambda(AspectFoo.class, preRoutingInfo);
_dummyLocalDAO.setGrpcPreUpdateRegistry(grpcPreUpdateRegistry);

// Call the add method
AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo);

// Verify that the result is the same as the input aspect since it's not registered
assertEquals(result, foo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.*;


public class GrpcPreUpdateRegistryTest {

private GrpcPreUpdateRegistry<RecordTemplate> registry;
private PreRoutingInfo mockRoutingInfo;
private RecordTemplate mockAspectInstance;

@BeforeMethod
public void setup() {
registry = new GrpcPreUpdateRegistry<>();
mockRoutingInfo = mock(PreRoutingInfo.class);
mockAspectInstance = mock(RecordTemplate.class);
}

@Test
public void testRegisterPreUpdateLambda() throws InstantiationException, IllegalAccessException {
registry.registerPreUpdateLambda(mockAspectInstance.getClass(), mockRoutingInfo);
System.out.println(registry.getPreUpdateRoutingClient(mockAspectInstance));
assertTrue(registry.isRegistered((Class<RecordTemplate>) mockAspectInstance.getClass()));
}

@Test
public void testGetPreUpdateRoutingClient() {
registry.registerPreUpdateLambda(mockAspectInstance.getClass(), mockRoutingInfo);
PreRoutingInfo retrievedRoutingMap = registry.getPreUpdateRoutingClient(mockAspectInstance);
assertEquals(mockRoutingInfo, retrievedRoutingMap);
}

@Test
public void testIsRegistered() {
assertFalse(registry.isRegistered((Class<RecordTemplate>) mockAspectInstance.getClass()));
registry.registerPreUpdateLambda(mockAspectInstance.getClass(), mockRoutingInfo);
assertTrue(registry.isRegistered((Class<RecordTemplate>) mockAspectInstance.getClass()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.AssertJUnit.*;
import static org.mockito.Mockito.*;


public class PreRoutingInfoTest {
private PreRoutingInfo routingInfo;
private PreUpdateClient<? extends Message> mockPreUpdateClient;

@BeforeMethod
public void setUp() {
routingInfo = new PreRoutingInfo();
mockPreUpdateClient = mock(PreUpdateClient.class);
}

@Test
public void testPreUpdateClientSetterAndGetter() {
routingInfo.setPreUpdateClient(mockPreUpdateClient);
assertEquals(mockPreUpdateClient, routingInfo.getPreUpdateClient());
}

@Test
public void testRoutingActionEnum() {
assertEquals("PROCEED", PreRoutingInfo.RoutingAction.PROCEED.name());
assertEquals("SKIP", PreRoutingInfo.RoutingAction.SKIP.name());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.*;


public class PreUpdateResponseTest {
@Test
public void testConstructorAndGetter() {
// Create a mock instance of RecordTemplate
RecordTemplate mockAspect = mock(RecordTemplate.class);

// Create an instance of PreUpdateResponse with the mock aspect
PreUpdateResponse<RecordTemplate> response = new PreUpdateResponse<>(mockAspect);

// Verify that the getter returns the correct value
assertEquals(mockAspect, response.getUpdatedAspect());
}
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
Expand Down
Loading
Loading