Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Provide newVersion in modifyBeforeWrite API
Browse files Browse the repository at this point in the history
  • Loading branch information
Timur Abishev committed Jan 24, 2019
1 parent da523be commit ff34d13
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class VersionedBatchStore[K, V, K2, V2](rootPath: String, versionsToKeep: Int, o

if (!target.sinkExists(mode)) {
logger.info(s"Versioned batched store version for $this @ $newVersion doesn't exist. Will write out.")
modifyBeforeWrite(lastVals).map(pack(batchID, _))
modifyBeforeWrite(lastVals, newVersion).map(pack(batchID, _))
.write(target)
} else {
logger.warn(s"Versioned batched store version for $this @ $newVersion already exists! Will skip adding to plan.")
Expand All @@ -152,5 +152,6 @@ class VersionedBatchStore[K, V, K2, V2](rootPath: String, versionsToKeep: Int, o
.map(unpack)
}

protected def modifyBeforeWrite(lastVals: TypedPipe[(K, V)]): TypedPipe[(K, V)] = lastVals
protected def modifyBeforeWrite(lastVals: TypedPipe[(K, V)], newVersion: Long): TypedPipe[(K, V)] =
lastVals
}

0 comments on commit ff34d13

Please sign in to comment.