Skip to content

Commit

Permalink
Optimizied soft delete relationship query by batching (#468)
Browse files Browse the repository at this point in the history
* Fixed errors

* Added unit test

* UNUSED IMPORTS

* checkstyle

* Fixed unit test

* Reset batchCount

* Addedmlog

* Added sleep

* Added transaction

* Fixed logging and transaction

* Nit

* Addressed comments

* Revert retries

* Added retry

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Nov 25, 2024
1 parent 8ccc25e commit fd38c99
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.GraphUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand All @@ -11,17 +12,21 @@
import com.linkedin.metadata.validator.RelationshipValidator;
import io.ebean.EbeanServer;
import io.ebean.SqlUpdate;
import io.ebean.Transaction;
import io.ebean.annotation.Transactional;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.dao.utils.ModelUtils.*;

@Slf4j
public class EbeanLocalRelationshipWriterDAO extends BaseGraphWriterDAO {
private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN";
private final EbeanServer _server;
Expand All @@ -36,6 +41,11 @@ private static class CommonColumnName {
private static final String LAST_MODIFIED_ON = "lastmodifiedon";
private static final String LAST_MODIFIED_BY = "lastmodifiedby";
}
private static final int BATCH_SIZE = 10000; // Process rows in batches of 10,000
private static final int MAX_BATCHES = 1000; // Maximum number of batches to process
private static final String LIMIT = " LIMIT ";
@Getter
private int batchCount = 0;

public EbeanLocalRelationshipWriterDAO(EbeanServer server) {
_server = server;
Expand All @@ -61,20 +71,61 @@ public void processLocalRelationshipUpdates(@Nonnull Urn urn,

/**
* This method clears all the relationships from a source entity urn using REMOVE_ALL_EDGES_FROM_SOURCE.
* @param urn entity urn could be either source or destination, depends on the RemovalOption
* @param relationshipClass relationship that needs to be cleared
* @param isTestMode whether to use test schema
*
* @param urn entity urn could be either source or destination, depends on the RemovalOption
* @param relationshipClass relationship that needs to be cleared
* @param isTestMode whether to use test schema
*/
public void clearRelationshipsByEntity(@Nonnull Urn urn,
@Nonnull Class<? extends RecordTemplate> relationshipClass, boolean isTestMode) {
@Nonnull Class<? extends RecordTemplate> relationshipClass, boolean isTestMode) {
RelationshipValidator.validateRelationshipSchema(relationshipClass, isRelationshipInV2(relationshipClass));
SqlUpdate deletionSQL = _server.createSqlUpdate(SQLStatementUtils.deleteLocalRelationshipSQL(
String deletionQuery = SQLStatementUtils.deleteLocalRelationshipSQL(
isTestMode ? SQLSchemaUtils.getTestRelationshipTableName(relationshipClass)
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE));
: SQLSchemaUtils.getRelationshipTableName(relationshipClass), RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE) + LIMIT + BATCH_SIZE;
SqlUpdate deletionSQL = _server.createSqlUpdate(deletionQuery);
deletionSQL.setParameter(CommonColumnName.SOURCE, urn.toString());
deletionSQL.execute();
batchCount = 0;
while (batchCount < MAX_BATCHES) {
try {
// Use the runInTransactionWithRetry method to handle retries in case of transaction failures
int rowsAffected = runInTransactionWithRetry(deletionSQL::execute, 3); // Retry up to 3 times in case of transient failures
batchCount++;

if (log.isDebugEnabled()) {
log.debug("Deleted {} rows in batch {}", rowsAffected, batchCount);
}

if (rowsAffected < BATCH_SIZE) {
// Exit loop if fewer than BATCH_SIZE rows were affected, indicating all rows are processed
break;
}

// Sleep for 1 millisecond to reduce load
Thread.sleep(1);
} catch (RetryLimitReached e) {
log.error("Error while executing batch deletion after {} batches and retries", batchCount, e);
throw new RuntimeException("Batch deletion failed due to retry limit", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupted status
throw new RuntimeException("Batch deletion interrupted", e);
} catch (Exception e) {
log.error("Error while executing batch deletion after {} batches", batchCount, e);
throw new RuntimeException("Batch deletion failed", e);
}
}

if (batchCount >= MAX_BATCHES) {
log.warn(
"Reached maximum batch count of {}, consider increasing MAX_BATCH_COUNT or debugging the deletion logic.",
MAX_BATCHES);
}

if (log.isDebugEnabled()) {
log.info("Cleared relationships in {} batches", batchCount);
}
}


/**
* Persist the given list of relationships to the local relationship using REMOVE_ALL_EDGES_FROM_SOURCE.
* @param relationships the list of relationships to be persisted
Expand Down Expand Up @@ -186,4 +237,26 @@ private <RELATIONSHIP extends RecordTemplate> void removeRelationshipsBySource(@
deletionSQL.setParameter(CommonColumnName.SOURCE, source.toString());
deletionSQL.execute();
}

@Nonnull
protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTransactionRetry) {
int retryCount = 0;
RuntimeException lastException = null;
while (retryCount <= maxTransactionRetry) {
try (Transaction transaction = _server.beginTransaction()) {
T result = block.get();
transaction.commit();
return result; // Successful execution, return result
} catch (RuntimeException exception) {
lastException = exception;
retryCount++;
}
}
// If we exhausted retries, throw an exception.
if (lastException != null) {
throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries", lastException);
} else {
throw new RetryLimitReached("Failed to execute after " + maxTransactionRetry + " retries due to unknown reasons");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException {
"bar", "urn:li:foo:456", "foo")));

BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
FooUrn fooUrn = FooUrn.createFromString("urn:li:foo:123");

// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
Expand All @@ -227,6 +226,29 @@ public void testClearRelationshipsByEntityUrn() throws URISyntaxException {
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testClearRelationshipsByEntityUrnWithBatching() throws URISyntaxException {
// Insert a large number of relationships to trigger batch processing
for (int i = 0; i < 10001; i++) {
_server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_pairswith", "urn:li:bar:123",
"bar", "urn:li:foo:" + i, "foo")));
}

BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(before.size(), 10001);

_localRelationshipWriterDAO.clearRelationshipsByEntity(barUrn, PairsWith.class, false);

// After processing verification
List<SqlRow> all = _server.createSqlQuery("select * from metadata_relationship_pairswith where deleted_ts is null").findList();
assertEquals(all.size(), 0); // Total number of edges is 0
assertEquals(_localRelationshipWriterDAO.getBatchCount(), 2); //expect 2 batches
// Clean up
_server.execute(Ebean.createSqlUpdate("truncate metadata_relationship_pairswith"));
}

@Test
public void testRemoveRelationships() throws URISyntaxException {
BarUrn barUrn = BarUrn.createFromString("urn:li:bar:123");
Expand Down Expand Up @@ -254,7 +276,7 @@ public void testRemoveRelationships() throws URISyntaxException {

private String insertRelationships(String table, String sourceUrn, String sourceType, String destinationUrn, String destinationType) {
String insertTemplate = "INSERT INTO %s (metadata, source, source_type, destination, destination_type, lastmodifiedon, lastmodifiedby)"
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', '1970-01-01 00:00:01', 'unknown')";
+ " VALUES ('{\"metadata\": true}', '%s', '%s', '%s', '%s', CURRENT_TIMESTAMP, 'unknown')";
return String.format(insertTemplate, table, sourceUrn, sourceType, destinationUrn, destinationType);
}
}

0 comments on commit fd38c99

Please sign in to comment.