Skip to content

Commit

Permalink
Merge branch 'master' into rakhi/optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
rakhiagr authored Nov 12, 2024
2 parents f7c35e9 + 017f037 commit b93dba4
Show file tree
Hide file tree
Showing 20 changed files with 399 additions and 92 deletions.
21 changes: 5 additions & 16 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ protected static class AspectUpdateResult<ASPECT extends RecordTemplate> {
private final Map<Class<? extends RecordTemplate>, EqualityTester<? extends RecordTemplate>>
_aspectEqualityTesterMap = new ConcurrentHashMap<>();

private boolean _modelValidationOnWrite = true;

// Always emit MAE on every update regardless if there's any actual change in value
private boolean _alwaysEmitAuditEvent = false;

Expand Down Expand Up @@ -430,13 +428,6 @@ public void enableAtomicMultipleUpdate(boolean enabled) {
_enableAtomicMultipleUpdate = enabled;
}

/**
* Enables or disables model validation before persisting.
*/
public void enableModelValidationOnWrite(boolean enabled) {
_modelValidationOnWrite = enabled;
}

/**
* Sets if MAE should be always emitted after each update even if there's no actual value change.
*/
Expand Down Expand Up @@ -653,9 +644,7 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN

checkValidAspect(newValue.getClass());

if (_modelValidationOnWrite) {
validateAgainstSchema(newValue);
}
validateAgainstSchemaAndFillinDefault(newValue);

// Invoke pre-update hooks, if any
if (_aspectPreUpdateHooksMap.containsKey(updateTuple.getAspectClass())) {
Expand Down Expand Up @@ -818,8 +807,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
maxTransactionRetry);

// skip MAE producing and post update hook in test mode or if the result is null (no actual update with addCommon)
return updateLambda.getIngestionParams().isTestMode() ? result.newValue
: result == null ? null : unwrapAddResult(urn, result, auditStamp, trackingContext);
return result == null ? null : (updateLambda.getIngestionParams().isTestMode() ? result.newValue
: unwrapAddResult(urn, result, auditStamp, trackingContext));
}

/**
Expand Down Expand Up @@ -1550,9 +1539,9 @@ public long newNumericId() {
/**
* Validates a model against its schema.
*/
protected void validateAgainstSchema(@Nonnull RecordTemplate model) {
public static void validateAgainstSchemaAndFillinDefault(@Nonnull RecordTemplate model) {
ValidationResult result = ValidateDataAgainstSchema.validate(model,
new ValidationOptions(RequiredMode.CAN_BE_ABSENT_IF_HAS_DEFAULT, CoercionMode.NORMAL,
new ValidationOptions(RequiredMode.FIXUP_ABSENT_WITH_DEFAULT, CoercionMode.NORMAL,
UnrecognizedFieldMode.DISALLOW));

if (!result.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,34 @@
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static com.linkedin.metadata.dao.utils.ModelUtils.*;


public class GraphUtils {
private static final String SOURCE = "source";
private GraphUtils() {
// Util class
}

/**
* Check if a group relationship shares the same source urn, destination urn or both based on the remove option.
* @param relationships list of relationships
* @param removalOption removal option to specify which relationships to be removed
* @param sourceField name of the source field
* @param destinationField name of the destination field
* @param urn source urn to compare. Optional for V1. Needed for V2.
*/
public static void checkSameUrn(@Nonnull final List<? extends RecordTemplate> relationships,
@Nonnull final BaseGraphWriterDAO.RemovalOption removalOption, final String sourceField, final String destinationField) {
@Nonnull final BaseGraphWriterDAO.RemovalOption removalOption, @Nonnull final String sourceField,
@Nonnull final String destinationField, @Nullable Urn urn) {

if (relationships.isEmpty()) {
return;
}

// ToDo: how to handle this for Relationship V2?
final Urn sourceUrn = getSourceUrnFromRelationship(relationships.get(0));
final Urn sourceUrn = getSourceUrnBasedOnRelationshipVersion(relationships.get(0), urn);
final Urn destinationUrn = getDestinationUrnFromRelationship(relationships.get(0));

if (removalOption == BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) {
Expand All @@ -38,9 +45,43 @@ public static void checkSameUrn(@Nonnull final List<? extends RecordTemplate> re
}
}

/**
* Get the source asset's urn for a given relationship.
* @param relationship Relationship. The relationship can be in model V1 or V2.
* @param urn The source asset urn. Optional for V1. Must for V2. Exception will be thrown if urn is not provided for V2.
* @return The source asset urn.
*/
public static <RELATIONSHIP extends RecordTemplate> Urn getSourceUrnBasedOnRelationshipVersion(
@Nonnull RELATIONSHIP relationship, @Nullable Urn urn) {
Urn sourceUrn;
boolean isRelationshipInV2 = ModelUtils.isRelationshipInV2(relationship.schema());
if (isRelationshipInV2 && urn != null) {
// if relationship model in V2 and urn is not null, get the sourceUrn from the input urn
sourceUrn = urn;
} else if (!isRelationshipInV2) {
// if relationship model in V1, get the sourceUrn from relationship
sourceUrn = getSourceUrnFromRelationship(relationship);
} else {
// throw exception if relationship in V2 but source urn not provided
throw new IllegalArgumentException("Source urn is needed for Relationship V2");
}
return sourceUrn;
}

public static void checkSameUrn(@Nonnull final List<? extends RecordTemplate> relationships,
@Nonnull final BaseGraphWriterDAO.RemovalOption removalOption, @Nonnull final String sourceField,
@Nonnull final String destinationField) {
checkSameUrn(relationships, removalOption, sourceField, destinationField, null);
}

private static void checkSameUrn(@Nonnull List<? extends RecordTemplate> records, @Nonnull String field,
@Nonnull Urn compare) {
for (RecordTemplate relation : records) {
if (ModelUtils.isRelationshipInV2(relation.schema()) && field.equals(SOURCE)) {
// Skip source urn check for V2 relationships since they don't have source field
// ToDo: enhance the source check for V2 relationships
return;
}
if (!compare.equals(ModelUtils.getUrnFromRelationship(relation, field))) {
throw new IllegalArgumentException("Records have different " + field + " urn");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ public static boolean isCommonAspect(@Nonnull Class<? extends RecordTemplate> cl
* @param relationship must be a valid relationship model defined in com.linkedin.metadata.relationship
* @return boolean. True if the relationship is in MG model V2.
*/
static <RELATIONSHIP extends RecordTemplate> boolean isRelationshipInV2(Class<? extends RecordTemplate> relationship) {
public static <RELATIONSHIP extends RecordTemplate> boolean isRelationshipInV2(Class<? extends RecordTemplate> relationship) {
final RecordDataSchema schema = ValidationUtils.getRecordSchema(relationship);
return isRelationshipInV2(schema);
}
Expand All @@ -887,7 +887,7 @@ static <RELATIONSHIP extends RecordTemplate> boolean isRelationshipInV2(Class<?
* @param schema schema of a valid relationship model defined in com.linkedin.metadata.relationship
* @return boolean. True if the relationship is in MG model V2.
*/
static boolean isRelationshipInV2(@Nonnull RecordDataSchema schema) {
public static boolean isRelationshipInV2(@Nonnull RecordDataSchema schema) {
// check the data type of the destination fields in schema and see if it's a union type
return schema.getFields().stream().noneMatch(
field -> field.getName().equals(SOURCE_FIELD))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.validator.InvalidSchemaException;
import com.linkedin.metadata.validator.ValidationUtils;
import com.linkedin.restli.internal.server.response.ResponseUtils;
import com.linkedin.restli.server.RestLiServiceException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -76,28 +74,7 @@ public static String toJsonString(@Nonnull RecordTemplate recordTemplate) {
try {
return DATA_TEMPLATE_CODEC.mapToString(recordTemplate.data());
} catch (IOException e) {
throw new ModelConversionException("Failed to serialize RecordTemplate: " + recordTemplate.toString());
}
}

/**
* Serializes a {@link RecordTemplate} to JSON string.
* Also take test mode as input to control the default value fill in strategy
* @param recordTemplate the record template to serialize
* @return the JSON string serialized using {@link JacksonDataTemplateCodec}.
*/
//Todo: we will remove this method once we verify it works and does not bring too much degrade in test mode.
@Nonnull
public static String toJsonString(@Nonnull RecordTemplate recordTemplate, boolean isTestMode) {
if (isTestMode) {
try {
DataMap dataWithDefaultValue = (DataMap) ResponseUtils.fillInDataDefault(recordTemplate.schema(), recordTemplate.data());
return DATA_TEMPLATE_CODEC.mapToString(dataWithDefaultValue);
} catch (RestLiServiceException | IOException e) {
throw new ModelConversionException("Failed to serialize RecordTemplate: " + recordTemplate.toString(), e);
}
} else {
return toJsonString(recordTemplate);
throw new ModelConversionException("Failed to serialize RecordTemplate: " + recordTemplate.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public class GraphUtilsTest {
public void testCheckSameUrnWithEmptyRelationships() {
List<RecordTemplate> relationships = Collections.emptyList();
GraphUtils.checkSameUrn(relationships, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE, "source", "destination");
GraphUtils.checkSameUrn(relationships, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, "source", "destination", new BarUrn(1));
// No exception should be thrown
}

@Test
public void testCheckSameUrnWithSameSourceUrn() {
// ToDo: Add test cases for relationship V2

// Test cases for relationship V1
RelationshipFoo relationship;
try {
relationship = mockRelationshipFoo(new FooUrn(1), new BarUrn(2));
Expand All @@ -42,10 +42,36 @@ public void testCheckSameUrnWithSameSourceUrn() {
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}

// when urn is provided for relationship V1, the provided urn should be ignored
try {
GraphUtils.checkSameUrn(relationships, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE, "source", "destination", new BarUrn(10));
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}

// Test cases for relationship V2
RelationshipV2Bar relationshipV2 = mockRelationshipV2Bar(new BarUrn(2));
List<RecordTemplate> relationshipsV2 = Lists.newArrayList(relationshipV2, relationshipV2);
// when urn is provided for relationship V2, the check should pass
try {
GraphUtils.checkSameUrn(relationshipsV2, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE, "source", "destination", new BarUrn(2));
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}
// when urn is not provided for relationship V2, it should throw IllegalArgumentException
assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationshipsV2,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE,
"source",
"destination")
);
}

@Test
public void testCheckSameUrnWithDifferentSourceUrn() {
// Test cases for relationship V1
RecordTemplate relationship1;
RecordTemplate relationship2;
try {
Expand All @@ -63,49 +89,116 @@ public void testCheckSameUrnWithDifferentSourceUrn() {
"source",
"destination")
);
// when urn is provided for relationship V1, the provided urn should be ignored
assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE,
"source",
"destination",
new BarUrn(10))
);

// ToDo: add test cases for V2. Right now it check if a list of relationships have the same source urn.
}

@Test
public void testCheckSameUrnWithSameDestinationUrn() {
// Test cases for relationship V1
RelationshipFoo relationship1;
RelationshipV2Bar relationship2;
try {
relationship1 = mockRelationshipFoo(new FooUrn(1), new BarUrn(2));
relationship2 = mockRelationshipV2Bar(new BarUrn(2));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
List<RecordTemplate> relationships = Lists.newArrayList(relationship1, relationship2);
List<RecordTemplate> relationships1 = Lists.newArrayList(relationship1, relationship1);

try {
GraphUtils.checkSameUrn(relationships, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, "source",
GraphUtils.checkSameUrn(relationships1, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, "source",
"destination");
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}

// when urn is provided for relationship V1, the provided urn should be ignored
try {
GraphUtils.checkSameUrn(relationships1, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, "source",
"destination", new BarUrn(10));
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}

// Test cases for relationship V2
RelationshipV2Bar relationship2 = mockRelationshipV2Bar(new BarUrn(2));
List<RecordTemplate> relationships2 = Lists.newArrayList(relationship2, relationship2);

// throws exception if V2 relationships without source urn provided
assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships2,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION,
"source",
"destination")
);

try {
GraphUtils.checkSameUrn(relationships2, BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION, "source",
"destination", new BarUrn(10));
} catch (IllegalArgumentException e) {
fail("Expected no IllegalArgumentException to be thrown, but got: " + e.getMessage());
}
}

@Test
public void testCheckSameUrnWithDifferentDestinationUrn() {
// Test cases for relationship V1
RelationshipFoo relationship1;
RelationshipBar relationship2;
RelationshipV2Bar relationship3;
try {
relationship1 = mockRelationshipFoo(new FooUrn(1), new BarUrn(2));
relationship2 = new RelationshipBar().setSource(new FooUrn(4)).setDestination(new BazUrn(2));
relationship3 = mockRelationshipV2Bar(new BarUrn(3));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

List<RecordTemplate> relationships = Lists.newArrayList(relationship1, relationship2, relationship3);
List<RecordTemplate> relationships1 = Lists.newArrayList(relationship1, relationship2);
assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships,
relationships1,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION,
"source",
"destination")
);

assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships1,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION,
"source",
"destination",
new BarUrn(10))
);

// Test cases for relationship V2
RelationshipV2Bar relationship3 = mockRelationshipV2Bar(new BarUrn(3));
RelationshipV2Bar relationship4 = mockRelationshipV2Bar(new BarUrn(4));
List<RecordTemplate> relationships2 = Lists.newArrayList(relationship3, relationship4);
assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships2,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION,
"source",
"destination")
);

assertThrows(IllegalArgumentException.class,
() -> GraphUtils.checkSameUrn(
relationships2,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION,
"source",
"destination",
new BarUrn(10))
);
}

private RelationshipFoo mockRelationshipFoo(FooUrn expectedSource, BarUrn expectedDestination) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.dao.BaseLocalDAO;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.validator.InvalidSchemaException;
import com.linkedin.metadata.validator.ValidationUtils;
Expand Down Expand Up @@ -62,8 +63,8 @@ public void testToJsonStringWithDefault() throws IOException {
AspectWithDefaultValue defaultValueAspect = new AspectWithDefaultValue().setNestedValueWithDefault(new MapValueRecord());
String expected =
loadJsonFromResource("defaultValueAspect.json").replaceAll("\\s+", "").replaceAll("\\n", "").replaceAll("\\r", "");

String actual = RecordUtils.toJsonString(defaultValueAspect, true);
BaseLocalDAO.validateAgainstSchemaAndFillinDefault(defaultValueAspect);
String actual = RecordUtils.toJsonString(defaultValueAspect);

assertEquals(actual, expected);
}
Expand Down
Loading

0 comments on commit b93dba4

Please sign in to comment.