Skip to content

Commit 8ee03b4

Browse files
fix(upgrade reindex): Ensure count uses all the restore job args to accurately estimate the… (#15094)
Co-authored-by: Harsh Verma <[email protected]>
1 parent 4e0b8b7 commit 8ee03b4

File tree

13 files changed

+290
-58
lines changed

13 files changed

+290
-58
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ dmypy.json
3333
MANIFEST
3434
*.pyc
3535
.python-version
36+
Pipfile
37+
Pipfile.lock
3638

3739
# Generated files
3840
**/bin

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeReport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.datahub.upgrade.impl;
22

3+
import com.google.common.base.Throwables;
34
import com.linkedin.datahub.upgrade.UpgradeReport;
45
import java.util.ArrayList;
56
import java.util.List;
@@ -20,6 +21,8 @@ public void addLine(String line) {
2021
public void addLine(String line, Exception e) {
2122
log.error(line, e);
2223
reportLines.add(line + String.format(": %s", e));
24+
reportLines.add(
25+
String.format("Exception stack trace: %s", Throwables.getStackTraceAsString(e)));
2326
}
2427

2528
@Override

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
package com.linkedin.datahub.upgrade.restoreindices;
22

3-
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
4-
53
import com.google.common.annotations.VisibleForTesting;
64
import com.linkedin.datahub.upgrade.UpgradeContext;
75
import com.linkedin.datahub.upgrade.UpgradeStep;
86
import com.linkedin.datahub.upgrade.UpgradeStepResult;
97
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
108
import com.linkedin.metadata.entity.EntityService;
11-
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
129
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
1310
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
1411
import com.linkedin.upgrade.DataHubUpgradeState;
1512
import io.ebean.Database;
16-
import io.ebean.ExpressionList;
1713
import java.util.ArrayList;
1814
import java.util.Arrays;
1915
import java.util.List;
@@ -156,22 +152,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
156152
}
157153

158154
@VisibleForTesting
159-
int getRowCount(RestoreIndicesArgs args) {
160-
ExpressionList<EbeanAspectV2> countExp =
161-
_server
162-
.find(EbeanAspectV2.class)
163-
.where()
164-
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
165-
if (args.aspectName != null) {
166-
countExp = countExp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
167-
}
168-
if (args.urn != null) {
169-
countExp = countExp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
170-
}
171-
if (args.urnLike != null) {
172-
countExp = countExp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
173-
}
174-
return countExp.findCount();
155+
int getRowCount(UpgradeContext context, RestoreIndicesArgs args) {
156+
return _entityService.countAspect(args, context.report()::addLine);
175157
}
176158

177159
@Override
@@ -184,7 +166,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
184166

185167
context.report().addLine("Sending MAE from local DB");
186168
long startTime = System.currentTimeMillis();
187-
final int rowCount = getRowCount(args);
169+
final int rowCount = getRowCount(context, args);
188170
context
189171
.report()
190172
.addLine(
@@ -224,7 +206,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
224206
context.report().addLine("End of data.");
225207
break;
226208
} else {
227-
log.error("Failure processing restore indices batch.", e);
209+
context.report().addLine("Exception while processing batch", e);
228210
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
229211
}
230212
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.linkedin.datahub.upgrade;
2+
3+
import static org.testng.Assert.*;
4+
5+
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeReport;
6+
import java.util.List;
7+
import org.testng.annotations.BeforeMethod;
8+
import org.testng.annotations.Test;
9+
10+
public class UpgradeReportTest {
11+
12+
private UpgradeReport upgradeReport;
13+
14+
@BeforeMethod
15+
public void setup() {
16+
upgradeReport = new DefaultUpgradeReport();
17+
}
18+
19+
@Test
20+
public void testAddLine() {
21+
// Given
22+
String line1 = "Starting upgrade";
23+
String line2 = "Processing step 1";
24+
String line3 = "Upgrade completed";
25+
26+
// When
27+
upgradeReport.addLine(line1);
28+
upgradeReport.addLine(line2);
29+
upgradeReport.addLine(line3);
30+
31+
// Then
32+
List<String> lines = upgradeReport.lines();
33+
assertEquals(lines.size(), 3);
34+
assertEquals(lines.get(0), line1);
35+
assertEquals(lines.get(1), line2);
36+
assertEquals(lines.get(2), line3);
37+
}
38+
39+
@Test
40+
public void testAddLineWithException() {
41+
// Given
42+
String errorMessage = "Error occurred during upgrade";
43+
Exception testException = new RuntimeException("Test exception message");
44+
45+
// When
46+
upgradeReport.addLine(errorMessage, testException);
47+
48+
// Then
49+
List<String> lines = upgradeReport.lines();
50+
assertEquals(lines.size(), 2);
51+
assertTrue(lines.get(0).contains(errorMessage));
52+
assertTrue(lines.get(0).contains("Test exception message"));
53+
assertTrue(lines.get(1).contains("Exception stack trace:"));
54+
assertTrue(lines.get(1).contains("RuntimeException"));
55+
}
56+
}

datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStepTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public void testExecutableWithDefaultArgs() {
136136
// Insert a few test rows
137137
insertTestRows(5, null);
138138

139+
// Mock countAspect to return 5 rows
140+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);
141+
139142
// Execute
140143
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
141144

@@ -175,6 +178,8 @@ public void testExecutableWithCustomArgs() {
175178
// Insert some test data
176179
insertTestRows(5, "testAspect");
177180

181+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);
182+
178183
// Execute
179184
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
180185

@@ -209,6 +214,8 @@ public void testExecutableWithUrnLike() {
209214
// Insert data that matches the URN pattern
210215
insertTestRows(3, null);
211216

217+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);
218+
212219
// Execute
213220
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
214221

@@ -242,6 +249,8 @@ public void testExecutableWithPitEpochMs() {
242249
insertTestRow("urn:li:test:2", "testAspect", 0, oneHourAgo, "testUser"); // Edge of range
243250
insertTestRow("urn:li:test:3", "testAspect", 0, twoHoursAgo, "testUser"); // Outside range
244251

252+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(2);
253+
245254
// Execute
246255
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
247256

@@ -267,14 +276,14 @@ public void testExecutableWithAspectNames() {
267276
insertTestRow("urn:li:test:2", "aspect2", 0, now, "testUser");
268277
insertTestRow("urn:li:test:3", "aspect3", 0, now, "testUser");
269278

279+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);
280+
270281
// Execute
271282
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
272283

273-
// Verify aspect names parameter
274284
ArgumentCaptor<RestoreIndicesArgs> argsCaptor =
275285
ArgumentCaptor.forClass(RestoreIndicesArgs.class);
276286
verify(mockEntityService).restoreIndices(eq(mockOpContext), argsCaptor.capture(), any());
277-
278287
RestoreIndicesArgs capturedArgs = argsCaptor.getValue();
279288
assertEquals(capturedArgs.aspectNames.size(), 3);
280289
assertTrue(capturedArgs.aspectNames.contains("aspect1"));
@@ -291,6 +300,8 @@ public void testExecutableWithUrnBasedPagination() {
291300
// Insert enough rows to trigger pagination
292301
insertTestRows(5, null);
293302

303+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(5);
304+
294305
// Setup sequential results for pagination
295306
RestoreIndicesResult firstResult = new RestoreIndicesResult();
296307
firstResult.rowsMigrated = 2;
@@ -374,6 +385,8 @@ public void testExecutableWithError() {
374385
// Insert rows so the query returns data
375386
insertTestRows(10, null);
376387

388+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);
389+
377390
// Force the service to throw an exception
378391
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
379392
.thenThrow(new RuntimeException("Test exception"));
@@ -387,6 +400,28 @@ public void testExecutableWithError() {
387400
assertEquals(result.stepId(), sendMAEStep.id());
388401
}
389402

403+
@Test
404+
public void testUrnBasedPaginationExecutableWithError() {
405+
parsedArgs.put(RestoreIndices.URN_BASED_PAGINATION_ARG_NAME, Optional.of("true"));
406+
407+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(10);
408+
409+
// Force the service to throw an exception
410+
when(mockEntityService.restoreIndices(eq(mockOpContext), any(RestoreIndicesArgs.class), any()))
411+
.thenThrow(new RuntimeException("Test exception"));
412+
413+
// Execute
414+
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
415+
416+
// Verify failure
417+
assertTrue(result instanceof DefaultUpgradeStepResult);
418+
assertEquals(result.result(), DataHubUpgradeState.FAILED);
419+
assertEquals(result.stepId(), sendMAEStep.id());
420+
421+
// verify exception reported
422+
verify(mockReport, atLeastOnce()).addLine(anyString(), any(Exception.class));
423+
}
424+
390425
@Test
391426
public void testReportAddedLines() {
392427
// Insert some test data
@@ -407,6 +442,8 @@ public void testExecutableWithCreateDefaultAspects() {
407442
// Insert test data
408443
insertTestRows(3, null);
409444

445+
when(mockEntityService.countAspect(any(RestoreIndicesArgs.class), any())).thenReturn(3);
446+
410447
// Execute
411448
UpgradeStepResult result = sendMAEStep.executable().apply(mockContext);
412449

metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ ListResult<String> listUrns(
210210
@Nonnull
211211
Integer countAspect(@Nonnull final String aspectName, @Nullable String urnLike);
212212

213+
@Nonnull
214+
Integer countAspect(final RestoreIndicesArgs args);
215+
213216
@Nonnull
214217
PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args);
215218

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,12 @@ public Integer getCountAspect(
17811781
return aspectDao.countAspect(aspectName, urnLike);
17821782
}
17831783

1784+
@Override
1785+
public Integer countAspect(@Nonnull RestoreIndicesArgs args, @Nonnull Consumer<String> logger) {
1786+
logger.accept(String.format("Args are %s", args));
1787+
return aspectDao.countAspect(args);
1788+
}
1789+
17841790
@Nonnull
17851791
@Override
17861792
public List<RestoreIndicesResult> restoreIndices(

metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,13 @@ public Integer countAspect(@Nonnull String aspectName, @Nullable String urnLike)
574574
return -1;
575575
}
576576

577+
@Nonnull
578+
@Override
579+
public Integer countAspect(final RestoreIndicesArgs args) {
580+
// Not implemented
581+
return -1;
582+
}
583+
577584
@Nonnull
578585
public PartitionedStream<EbeanAspectV2> streamAspectBatches(final RestoreIndicesArgs args) {
579586
// Not implemented

0 commit comments

Comments
 (0)