diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 40d279f25..6da573579 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -601,7 +601,6 @@ protected long saveLatest(@Nonnull URN urn, @Non // 2. write value of latest version (version = 0) as a new version // 3. update the latest version (version = 0) with the new value. If the value of latest version has been // changed during this process, then rollback by throwing OptimisticLockException - largestVersion = getNextVersion(urn, aspectClass); // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards if (log.isDebugEnabled()) { @@ -618,7 +617,6 @@ protected long saveLatest(@Nonnull URN urn, @Non new Timestamp(oldAuditStamp.getTime()), trackingContext, isTestMode); } else { // When for fresh ingestion or with changeLog disabled - // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards if (log.isDebugEnabled()) { if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) { @@ -680,7 +678,7 @@ public List backfillLo @Nonnull Class aspectClass, boolean isTestMode) { EbeanMetadataAspect result; - if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { + if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) { final String aspectName = ModelUtils.getAspectName(aspectClass); final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, LATEST_VERSION); if (_findMethodology == FindMethodology.DIRECT_SQL) { @@ -696,7 +694,7 @@ public List backfillLo result = _server.find(EbeanMetadataAspect.class, key); } } else { - // for new schema or dual-schema, get latest data from new schema. (Resolving the read de-coupling issue) + // for new schema, get latest data from the new schema entity table. (Resolving the read de-coupling issue) final List results = _localAccess.batchGetUnion(Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0, true, isTestMode); @@ -801,9 +799,9 @@ protected void updateWithOptimisticLocking(@Nonn // ensure atomicity by running old schema update + new schema update in a transaction final SqlUpdate oldSchemaSqlUpdate; - if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) { - // In NEW_SCHEMA or DUAL_SCHEMA, since entity table is the SOT and the getLatest (oldTimestamp) is from the entity - // table, therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking) + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + // In NEW_SCHEMA, the entity table is the SOT and getLatest (oldTimestamp) reads from the entity + // table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking) // aspect table will apply regular update over (urn, aspect, version) primary key combination. oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null); numOfUpdatedRows = runInTransactionWithRetry(() -> { @@ -814,10 +812,17 @@ protected void updateWithOptimisticLocking(@Nonn trackingContext, isTestMode); }, 1); } else { - // In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table - // therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate) + // In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table. + // Therefore, we will apply compare-and-set with oldTimestamp on aspect table (assemblyOldSchemaSqlUpdate) oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, oldTimestamp); - numOfUpdatedRows = _server.execute(oldSchemaSqlUpdate); + numOfUpdatedRows = runInTransactionWithRetry(() -> { + // Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table + if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) { + _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null, + trackingContext, isTestMode); + } + return _server.execute(oldSchemaSqlUpdate); + }, 1); } // If there is no single updated row, emit OptimisticLockException if (numOfUpdatedRows != 1) { @@ -830,8 +835,6 @@ protected void updateWithOptimisticLocking(@Nonn protected void insert(@Nonnull URN urn, @Nullable RecordTemplate value, @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, long version, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { - - final EbeanMetadataAspect aspect = buildMetadataAspectBean(urn, value, aspectClass, auditStamp, version); if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) { // insert() could be called when updating log table (moving current versions into new history version) diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index c1eefc0cc..5b64b17be 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -2905,7 +2905,6 @@ public void testOptimisticLockException() { aspect.setCreatedBy("fooActor"); if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { - // add aspect to the db _server.insert(aspect); @@ -2918,32 +2917,64 @@ public void testOptimisticLockException() { dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 100), 0, new Timestamp(_now - 100), null, false); - } else if (_enableChangeLog) { - // either NEW or DUAL schema, whereas entity table is the SOT and aspect table is the log table + } else if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) { + // in DUAL SCHEMA, the aspect table is the SOT even though it also writes to the entity table + // Given: + // 1. in DUAL SCHEMA mode + // 2. (foo:1, lastmodified(_now + 1)) in entity table (discrepancy) + // 3. (foo:1, lastmodified(_now), version=0) in aspect table + + dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false); + // make inconsistent timestamp only on the entity table + dao.setSchemaConfig(SchemaConfig.NEW_SCHEMA_ONLY); + dao.setChangeLogEnabled(false); + dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1), 0, null, false); + dao.setChangeLogEnabled(true); + dao.setSchemaConfig(_schemaConfig); + + // When: update with old timestamp matches the lastmodifiedon time in entity table + try { + fooAspect.setValue("bar"); + dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 1000L), 0, + new Timestamp(_now), null, false); + } catch (OptimisticLockException e) { + fail("Expect the update pass since the old timestamp matches the lastmodifiedon in aspect table"); + } + // Expect: update succeed and the values are updated + BaseLocalDAO.AspectEntry result = dao.getLatest(fooUrn, AspectFoo.class, false); + assertEquals(result.getAspect().getValue(), "bar"); + assertEquals(result.getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 1000L)); // need to set by at least 1 + + // When: update with old timestamp does not match the lastmodifiedon in the aspect table + // Expect: OptimisticLockException. + dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0, + new Timestamp(_now + 100), null, false); + } else if (_enableChangeLog) { + // either NEW SCHEMA, the entity table is the SOT and the aspect table is the log table // Given: - // 1. in NEW, DUAL schema mode - // 2. (foo:1, lastmodified(_now + 1), version=0) in aspect table (discrepancy) - // 3. (foo:1, lastmodified(_now)) in entity table + // 1. in NEW SCHEMA mode + // 2. (foo:1, lastmodifiedon(_now + 1), version=0) in aspect table (discrepancy) + // 3. (foo:1, lastmodifiedon(_now)) in entity table dao.insert(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now), 0, null, false); // make inconsistent timestamp on aspect table aspect.setCreatedOn(new Timestamp(_now + 1)); _server.update(aspect); - // When: update with old timestamp matches the lastmodified time in entity table + // When: update with old timestamp matches the lastmodifiedon time in entity table try { fooAspect.setValue("bar"); dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 200), 0, new Timestamp(_now), null, false); } catch (OptimisticLockException e) { - fail("Expect the update pass since the old timestamp matches the lastmodified in entity table"); + fail("Expect the update pass since the old timestamp matches the lastmodifiedon in entity table"); } // Expect: update succeed and the values are updated assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getAspect().getValue(), "bar"); assertEquals(dao.getLatest(fooUrn, AspectFoo.class, false).getExtraInfo().getAudit().getTime(), Long.valueOf(_now + 200L)); - // When: update with old timestamp does not match the lastmodified in the entity table + // When: update with old timestamp does not match the lastmodifiedon in the entity table // Expect: OptimisticLockException. dao.updateWithOptimisticLocking(fooUrn, fooAspect, AspectFoo.class, makeAuditStamp("fooActor", _now + 400), 0, new Timestamp(_now + 100), null, false);