Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RPC invocation for PreUpdate #443

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.GrpcPreUpdateRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RoutingMap;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -215,6 +219,8 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();

private GrpcPreUpdateRegistry _grpcPreUpdateRegistry = null;

/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -400,7 +406,7 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
}

/**
* Set pre ingestion aspect registry.
* Set pre ingestion aspect registry for restli implementation.
*/
public void setRestliPreUpdateAspectRegistry(
@Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
Expand All @@ -414,6 +420,12 @@ public RestliPreUpdateAspectRegistry getRestliPreUpdateAspectRegistry() {
return _restliPreUpdateAspectRegistry;
}

/**
* Set pre ingestion aspect registry for grpc implementation.
*/
public void setGrpcPreUpdateRegistry(@Nullable GrpcPreUpdateRegistry grpcPreUpdateRegistry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we call this method? Should we do it through inject?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will inject it in the AssetClass where we call the ingestion method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For RestliRegistry, I was setting it in DatasetLocalDao.

_grpcPreUpdateRegistry = grpcPreUpdateRegistry;
}

/**
* Enables or disables atomic updates of multiple aspects.
Expand Down Expand Up @@ -1662,7 +1674,8 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
* @param newValue the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newValue) {
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(@Nonnull URN urn, @Nonnull ASPECT newValue) {

if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
newValue.getClass())) {
RestliCompliantPreUpdateRoutingClient client =
Expand All @@ -1673,6 +1686,29 @@ protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPEC
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, convertedAspect);
return (ASPECT) convertedAspect;
}

if (_grpcPreUpdateRegistry != null && _grpcPreUpdateRegistry.isRegistered(newValue.getClass())) {
// Fetch routing data (PreUpdateClient instance) for the given aspect
RoutingMap routingMap = _grpcPreUpdateRegistry.getPreUpdateRoutingClient(newValue);
PreUpdateClient preUpdateClient = routingMap.getPreUpdateClient();
try {
// Convert the URN and aspect to gRPC-compatible Message objects
Message grpcUrn = preUpdateClient.convertUrnToMessage(urn);
Message grpcAspect = preUpdateClient.convertAspectToMessage(newValue);

// Invoke the grpc service pre update method
PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(grpcUrn, grpcAspect);

// Convert the updated gRPC Message aspect back to RecordTemplate
ASPECT updatedAspect = (ASPECT) preUpdateClient.convertAspectToRecordTemplate(preUpdateResponse.getUpdatedAspect());

log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, previous aspect: {}, updated aspect: {}", urn, newValue, updatedAspect);
return updatedAspect;
} catch (Exception e) {
log.error("Exception during gRPC pre-update routing for URN: {}, Aspect: {}. Error: {}", urn, newValue, e.getMessage(), e);
throw new RuntimeException("Error during gRPC preUpdateRouting", e);
}
}
return newValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their GrpcPreUpdateRoutingClient.
*/
@Slf4j
public class GrpcPreUpdateRegistry<ASPECT extends RecordTemplate> {

private Map<Class<? extends RecordTemplate>, RoutingMap> _preUpdateLambdaMap =
new ConcurrentHashMap<>();

/**
* Get GrpcPreUpdateRoutingClient for an aspect.
*/
public RoutingMap getPreUpdateRoutingClient(@Nonnull final ASPECT aspect){
return _preUpdateLambdaMap.get(aspect.getClass());
}

/**
* Check if GrpcPreUpdateRoutingClient is registered for an aspect.
*/
public boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass){
return _preUpdateLambdaMap.containsKey(aspectClass);
}

/**
* Register a pre update lambda for the given aspect
* @param aspectClass aspect class
* @param preUpdateRoutingMap pre update routing map
*/
public void registerPreUpdateLambda(@Nonnull Class<? extends RecordTemplate> aspectClass,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, who should register this lambda?

Copy link
Contributor Author

@rakhiagr rakhiagr Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create a Factory PreUpdateAspectRegistryFactory

PreUpdateAspectRegistryFactory extends SimpleSingletonFactory<GrpcPreUpdateRegistry> {

protected GrpcPreUpdateRegistry createInstance(ConfigView view) {    
return GrpcPreUpdateRegistry.registerPreUpdateLambda(DatasetAccountableOwnership.class, getBean(DatasetAccountableOwnershipPreUpdateClient
       .class));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's include all those detail in your doc and trying to see which part can be covered by SMO in the future

@Nonnull RoutingMap preUpdateRoutingMap) {
log.info("Registering pre update lambda: {}, {}", aspectClass.getCanonicalName(), preUpdateRoutingMap);
_preUpdateLambdaMap.put(aspectClass, preUpdateRoutingMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;


public interface PreUpdateClient<ASPECT extends Message> {

/**
* Executes the gRPC pre-update logic, including building the request,
* invoking the service, and handling the response.
*
* @param urn The URN of the entity to be updated.
* @param aspect The aspect to be updated.
* @return The updated aspect.
*/
PreUpdateResponse<ASPECT> preUpdate(Message urn, ASPECT aspect);

/**
* Converts a RecordTemplate URN to a gRPC-compatible Message.
*
* @param urn The RecordTemplate URN.
* @return The gRPC-compatible Message URN.
*/
Message convertUrnToMessage(Urn urn);

/**
* Converts a RecordTemplate Aspect to a gRPC-compatible Message Aspect.
*
* @param aspect The RecordTemplate aspect.
* @return The gRPC-compatible Message aspect.
*/
ASPECT convertAspectToMessage(RecordTemplate aspect);

/**
* Converts a gRPC-compatible Message Aspect back to a RecordTemplate Aspect.
*
* @param messageAspect The Message aspect.
* @return The RecordTemplate aspect.
*/
RecordTemplate convertAspectToRecordTemplate(ASPECT messageAspect);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import lombok.Data;

@Data
public class PreUpdateResponse<ASPECT extends Message> {
private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;


/**
* A restli client to route update request to the appropriate to custom APIs.
* <p>This interface extends {@link PreUpdateRoutingClient} and provides additional methods for converting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.linkedin.data.template.RecordTemplate;
import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Message;
import lombok.Data;


@Data
public class RoutingMap {

public enum RoutingAction {
SKIP,
PROCEED
}
public PreUpdateClient<? extends Message> preUpdateClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.ingestion.preupdate.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.dao.ingestion;
package com.linkedin.metadata.dao.ingestion.preupdate;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.linkedin.data.template.StringArray;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.AspectKey;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.internal.IngestionParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliPreUpdateAspectRegistry;
import com.linkedin.testing.AspectBar;
import com.linkedin.testing.AspectFoo;
import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.google.protobuf.StringValue;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.preupdate.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.testing.AspectFoo;


Expand Down
Loading