Skip to content

Commit

Permalink
fix(dual schema): use aspect table as SOT for optimistic locking in D…
Browse files Browse the repository at this point in the history
…UAL_SCHEMA mode (#483)

Co-authored-by: Justin Donn <[email protected]>
  • Loading branch information
jsdonn and Justin Donn authored Dec 9, 2024
1 parent 3644f12 commit ae21b69
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
// 2. write value of latest version (version = 0) as a new version
// 3. update the latest version (version = 0) with the new value. If the value of latest version has been
// changed during this process, then rollback by throwing OptimisticLockException

largestVersion = getNextVersion(urn, aspectClass);
// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
Expand All @@ -618,7 +617,6 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
new Timestamp(oldAuditStamp.getTime()), trackingContext, isTestMode);
} else {
// When for fresh ingestion or with changeLog disabled

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
Expand Down Expand Up @@ -680,7 +678,7 @@ public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLo
@Nonnull Class<ASPECT> aspectClass, boolean isTestMode) {

EbeanMetadataAspect result;
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
final String aspectName = ModelUtils.getAspectName(aspectClass);
final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, LATEST_VERSION);
if (_findMethodology == FindMethodology.DIRECT_SQL) {
Expand All @@ -696,7 +694,7 @@ public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLo
result = _server.find(EbeanMetadataAspect.class, key);
}
} else {
// for new schema or dual-schema, get latest data from new schema. (Resolving the read de-coupling issue)
// for new schema, get latest data from the new schema entity table. (Resolving the read de-coupling issue)
final List<EbeanMetadataAspect> results =
_localAccess.batchGetUnion(Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0,
true, isTestMode);
Expand Down Expand Up @@ -801,9 +799,9 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
// ensure atomicity by running old schema update + new schema update in a transaction

final SqlUpdate oldSchemaSqlUpdate;
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// In NEW_SCHEMA or DUAL_SCHEMA, since entity table is the SOT and the getLatest (oldTimestamp) is from the entity
// table, therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking)
if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
// In NEW_SCHEMA, the entity table is the SOT and getLatest (oldTimestamp) reads from the entity
// table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking)
// aspect table will apply regular update over (urn, aspect, version) primary key combination.
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
Expand All @@ -814,10 +812,17 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
trackingContext, isTestMode);
}, 1);
} else {
// In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table
// therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate)
// In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table.
// Therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate)
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, oldTimestamp);
numOfUpdatedRows = _server.execute(oldSchemaSqlUpdate);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
// Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table
if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
_localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null,
trackingContext, isTestMode);
}
return _server.execute(oldSchemaSqlUpdate);
}, 1);
}
// If there is no single updated row, emit OptimisticLockException
if (numOfUpdatedRows != 1) {
Expand All @@ -830,8 +835,6 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullable RecordTemplate value,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, long version,
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {


final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version);
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2905,7 +2905,6 @@ public void testOptimisticLockException() {
aspect.setCreatedBy("fooActor");

if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {

// add aspect to the db
_server.insert(aspect);

Expand All @@ -2918,32 +2917,64 @@ public void testOptimisticLockException() {
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 100),
0, new Timestamp(_now - 100), null, false);

} else if (_enableChangeLog) {
// either NEW or DUAL schema, whereas entity table is the SOT and aspect table is the log table
} else if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// in DUAL SCHEMA, the aspect table is the SOT even though it also writes to the entity table
// Given:
// 1. in DUAL SCHEMA mode
// 2. (foo:1, lastmodified(_now + 1)) in entity table (discrepancy)
// 3. (foo:1, lastmodified(_now), version=0) in aspect table

dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false);

// make inconsistent timestamp only on the entity table
dao.setSchemaConfig(SchemaConfig.NEW_SCHEMA_ONLY);
dao.setChangeLogEnabled(false);
dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1), 0, null, false);
dao.setChangeLogEnabled(true);
dao.setSchemaConfig(_schemaConfig);

// When: update with old timestamp matches the lastmodifiedon time in entity table
try {
fooAspect.setValue("bar");
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1000L), 0,
new Timestamp(_now), null, false);
} catch (OptimisticLockException e) {
fail("Expect the update pass since the old timestamp matches the lastmodifiedon in aspect table");
}
// Expect: update succeed and the values are updated
BaseLocalDAO.AspectEntry<AspectFoo> result = dao.getLatest(fooUrn, AspectFoo.class, false);
assertEquals(result.getAspect().getValue(), "bar");
assertEquals(result.getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 1000L)); // need to set by at least 1

// When: update with old timestamp does not match the lastmodifiedon in the aspect table
// Expect: OptimisticLockException.
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0,
new Timestamp(_now + 100), null, false);
} else if (_enableChangeLog) {
// either NEW SCHEMA, the entity table is the SOT and the aspect table is the log table
// Given:
// 1. in NEW, DUAL schema mode
// 2. (foo:1, lastmodified(_now + 1), version=0) in aspect table (discrepancy)
// 3. (foo:1, lastmodified(_now)) in entity table
// 1. in NEW SCHEMA mode
// 2. (foo:1, lastmodifiedon(_now + 1), version=0) in aspect table (discrepancy)
// 3. (foo:1, lastmodifiedon(_now)) in entity table

dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false);
// make inconsistent timestamp on aspect table
aspect.setCreatedOn(new Timestamp(_now + 1));
_server.update(aspect);

// When: update with old timestamp matches the lastmodified time in entity table
// When: update with old timestamp matches the lastmodifiedon time in entity table
try {
fooAspect.setValue("bar");
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 200), 0,
new Timestamp(_now), null, false);
} catch (OptimisticLockException e) {
fail("Expect the update pass since the old timestamp matches the lastmodified in entity table");
fail("Expect the update pass since the old timestamp matches the lastmodifiedon in entity table");
}
// Expect: update succeed and the values are updated
assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getAspect().getValue(), "bar");
assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 200L));

// When: update with old timestamp does not match the lastmodified in the entity table
// When: update with old timestamp does not match the lastmodifiedon in the entity table
// Expect: OptimisticLockException.
dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0,
new Timestamp(_now + 100), null, false);
Expand Down

0 comments on commit ae21b69

Please sign in to comment.