Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dmypy.json
MANIFEST
*.pyc
.python-version
Pipfile
Pipfile.lock

# Generated files
**/bin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.impl;

import com.google.common.base.Throwables;
import com.linkedin.datahub.upgrade.UpgradeReport;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -20,6 +21,8 @@ public void addLine(String line) {
public void addLine(String line, Exception e) {
log.error(line, e);
reportLines.add(line + String.format(": %s", e));
reportLines.add(
String.format("Exception stack trace: %s", Throwables.getStackTraceAsString(e)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package com.linkedin.datahub.upgrade.restoreindices;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.ebean.Database;
import io.ebean.ExpressionList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -156,22 +152,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
}

@VisibleForTesting
int getRowCount(RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> countExp =
_server
.find(EbeanAspectV2.class)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
if (args.aspectName != null) {
countExp = countExp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
}
if (args.urn != null) {
countExp = countExp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
}
if (args.urnLike != null) {
countExp = countExp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
return countExp.findCount();
int getRowCount(UpgradeContext context, RestoreIndicesArgs args) {
return _entityService.countAspect(args, context.report()::addLine);
}

@Override
Expand All @@ -184,7 +166,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {

context.report().addLine("Sending MAE from local DB");
long startTime = System.currentTimeMillis();
final int rowCount = getRowCount(args);
final int rowCount = getRowCount(context, args);
context
.report()
.addLine(
Expand Down Expand Up @@ -224,7 +206,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
context.report().addLine("End of data.");
break;
} else {
log.error("Failure processing restore indices batch.", e);
context.report().addLine("Exception while processing batch", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade;

import static org.testng.Assert.*;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeReport;
import java.util.List;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class UpgradeReportTest {

private UpgradeReport upgradeReport;

@BeforeMethod
public void setup() {
upgradeReport = new DefaultUpgradeReport();
}

@Test
public void testAddLine() {
// Given
String line1 = "Starting upgrade";
String line2 = "Processing step 1";
String line3 = "Upgrade completed";

// When
upgradeReport.addLine(line1);
upgradeReport.addLine(line2);
upgradeReport.addLine(line3);

// Then
List<String> lines = upgradeReport.lines();
assertEquals(lines.size(), 3);
assertEquals(lines.get(0), line1);
assertEquals(lines.get(1), line2);
assertEquals(lines.get(2), line3);
}

@Test
public void testAddLineWithException() {
// Given
String errorMessage = "Error occurred during upgrade";
Exception testException = new RuntimeException("Test exception message");

// When
upgradeReport.addLine(errorMessage, testException);

// Then
List<String> lines = upgradeReport.lines();
assertEquals(lines.size(), 2);
assertTrue(lines.get(0).contains(errorMessage));
assertTrue(lines.get(0).contains("Test exception message"));
assertTrue(lines.get(1).contains("Exception stack trace:"));
assertTrue(lines.get(1).contains("RuntimeException"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public void testExecutableWithDefaultArgs() {
// Insert a few test rows
insertTestRows(5, null);

// Mock countAspect to return 5 rows
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -175,6 +178,8 @@ public void testExecutableWithCustomArgs() {
// Insert some test data
insertTestRows(5, "testAspect");

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -209,6 +214,8 @@ public void testExecutableWithUrnLike() {
// Insert data that matches the URN pattern
insertTestRows(3, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down Expand Up @@ -242,6 +249,8 @@ public void testExecutableWithPitEpochMs() {
insertTestRow("urn:li:test:2", "testAspect", 0, oneHourAgo, "testUser"); // Edge of range
insertTestRow("urn:li:test:3", "testAspect", 0, twoHoursAgo, "testUser"); // Outside range

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(2);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand All @@ -267,14 +276,14 @@ public void testExecutableWithAspectNames() {
insertTestRow("urn:li:test:2", "aspect2", 0, now, "testUser");
insertTestRow("urn:li:test:3", "aspect3", 0, now, "testUser");

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

// Verify aspect names parameter
ArgumentCaptor<RestoreIndicesArgs> argsCaptor =
ArgumentCaptor.forClass(RestoreIndicesArgs.class);
verify(mockEntityService).restoreIndices(eq(mockOpContext), argsCaptor.capture(), any());

RestoreIndicesArgs capturedArgs = argsCaptor.getValue();
assertEquals(capturedArgs.aspectNames.size(), 3);
assertTrue(capturedArgs.aspectNames.contains("aspect1"));
Expand All @@ -291,6 +300,8 @@ public void testExecutableWithUrnBasedPagination() {
// Insert enough rows to trigger pagination
insertTestRows(5, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);

// Setup sequential results for pagination
RestoreIndicesResult firstResult = new RestoreIndicesResult();
firstResult.rowsMigrated = 2;
Expand Down Expand Up @@ -374,6 +385,8 @@ public void testExecutableWithError() {
// Insert rows so the query returns data
insertTestRows(10, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);

// Force the service to throw an exception
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
.thenThrow(new RuntimeException("Test exception"));
Expand All @@ -387,6 +400,28 @@ public void testExecutableWithError() {
assertEquals(result.stepId(), sendMAEStep.id());
}

@Test
public void testUrnBasedPaginationExecutableWithError() {
parsedArgs.put(RestoreIndices.URN_BASED_PAGINATION_ARG_NAME, Optional.of("true"));

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);

// Force the service to throw an exception
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
.thenThrow(new RuntimeException("Test exception"));

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

// Verify failure
assertTrue(result instanceof DefaultUpgradeStepResult);
assertEquals(result.result(), DataHubUpgradeState.FAILED);
assertEquals(result.stepId(), sendMAEStep.id());

// verify exception reported
verify(mockReport, atLeastOnce()).addLine(anyString(), any(Exception.class));
}

@Test
public void testReportAddedLines() {
// Insert some test data
Expand All @@ -407,6 +442,8 @@ public void testExecutableWithCreateDefaultAspects() {
// Insert test data
insertTestRows(3, null);

when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);

// Execute
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ ListResult<String> listUrns(
@Nonnull
Integer countAspect(@Nonnull final String aspectName, @Nullable String urnLike);

@Nonnull
Integer countAspect(final RestoreIndicesArgs args);

@Nonnull
PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,12 @@ public Integer getCountAspect(
return aspectDao.countAspect(aspectName, urnLike);
}

@Override
public Integer countAspect(@Nonnull RestoreIndicesArgs args, @Nonnull Consumer<String> logger) {
logger.accept(String.format("Args are %s", args));
return aspectDao.countAspect(args);
}

@Nonnull
@Override
public List<RestoreIndicesResult> restoreIndices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
return -1;
}

@Nonnull
@Override
public Integer countAspect(final RestoreIndicesArgs args) {
// Not implemented
return -1;
}

@Nonnull
public PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args) {
// Not implemented
Expand Down
Loading
Loading