@@ -5160,36 +5160,41 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
51605160 Version end) {
51615161 // mergeSort all iterator until all are exhausted
51625162 // it stores all mutations for the next min version, in new format
5163- state bool atLeastOneIteratorHasNext = true ;
5164- state Version minVersion, lastMinVersion = invalidVersion;
5165- while (atLeastOneIteratorHasNext) {
5166- std::pair<Version, bool > minVersionAndHasNext = wait (findNextVersion (iterators));
5167- minVersion = minVersionAndHasNext.first ;
5168- atLeastOneIteratorHasNext = minVersionAndHasNext.second ;
5169- ASSERT_LT (lastMinVersion, minVersion);
5170- lastMinVersion = minVersion;
5171-
5172- if (atLeastOneIteratorHasNext) {
5173- std::vector<Standalone<VectorRef<VersionedMutation>>> mutationsSingleVersion =
5174- wait (getMutationsForVersion (iterators, minVersion));
5175-
5176- if (minVersion < begin) {
5177- // skip generating mutations, because this is not within desired range
5178- // this is already handled by the previous taskfunc
5179- continue ;
5180- } else if (minVersion >= end) {
5181- // all valid data has been consumed
5182- break ;
5183- }
5163+ try {
5164+ state bool atLeastOneIteratorHasNext = true ;
5165+ state Version minVersion, lastMinVersion = invalidVersion;
5166+ while (atLeastOneIteratorHasNext) {
5167+ std::pair<Version, bool > minVersionAndHasNext = wait (findNextVersion (iterators));
5168+ minVersion = minVersionAndHasNext.first ;
5169+ atLeastOneIteratorHasNext = minVersionAndHasNext.second ;
5170+ ASSERT_LT (lastMinVersion, minVersion);
5171+ lastMinVersion = minVersion;
5172+
5173+ if (atLeastOneIteratorHasNext) {
5174+ std::vector<Standalone<VectorRef<VersionedMutation>>> mutationsSingleVersion =
5175+ wait (getMutationsForVersion (iterators, minVersion));
5176+
5177+ if (minVersion < begin) {
5178+ // skip generating mutations, because this is not within desired range
5179+ // this is already handled by the previous taskfunc
5180+ continue ;
5181+ } else if (minVersion >= end) {
5182+ // all valid data has been consumed
5183+ break ;
5184+ }
51845185
5185- // transform from new format to old format(param1, param2) for this version.
5186- // This transformation has to be done version by version.
5187- Standalone<VectorRef<KeyValueRef>> oldFormatMutations =
5188- generateOldFormatMutations (minVersion, mutationsSingleVersion);
5189- mutationStream.send (oldFormatMutations);
5186+ // transform from new format to old format(param1, param2) for this version.
5187+ // This transformation has to be done version by version.
5188+ Standalone<VectorRef<KeyValueRef>> oldFormatMutations =
5189+ generateOldFormatMutations (minVersion, mutationsSingleVersion);
5190+ mutationStream.send (oldFormatMutations);
5191+ }
51905192 }
5193+ mutationStream.sendError (end_of_stream ());
5194+ } catch (Error& e) {
5195+ TraceEvent (SevWarn, " FileRestoreLogReadError" ).error (e);
5196+ mutationStream.sendError (e);
51915197 }
5192- mutationStream.sendError (end_of_stream ());
51935198 return Void ();
51945199 }
51955200
@@ -5217,7 +5222,8 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
52175222 ACTOR static Future<Void> writeMutations (Database cx,
52185223 std::vector<Standalone<VectorRef<KeyValueRef>>> mutations,
52195224 Key mutationLogPrefix,
5220- Reference<Task> task) {
5225+ Reference<Task> task,
5226+ Reference<TaskBucket> taskBucket) {
52215227 state Reference<ReadYourWritesTransaction> tr (new ReadYourWritesTransaction (cx));
52225228 state Standalone<VectorRef<KeyValueRef>> oldFormatMutations;
52235229 state int mutationIndex = 0 ;
@@ -5253,7 +5259,9 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
52535259 txBytes += v.expectedSize ();
52545260 ++mutationCount;
52555261 }
5262+ wait (taskBucket->keepRunning (tr, task));
52565263 wait (tr->commit ());
5264+
52575265 int64_t oldBytes = Params.bytesWritten ().get (task);
52585266 Params.bytesWritten ().set (task, oldBytes + txBytes);
52595267 DisabledTraceEvent (" FileRestorePartitionedLogCommittData" )
@@ -5364,7 +5372,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
53645372 // batching mutations from multiple versions together before writing to the database
53655373 state int64_t bytes = oneVersionData.expectedSize ();
53665374 if (totalBytes + bytes > CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE ) {
5367- wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task));
5375+ wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task, taskBucket ));
53685376 mutations.clear ();
53695377 totalBytes = 0 ;
53705378 }
@@ -5373,7 +5381,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
53735381 } catch (Error& e) {
53745382 if (e.code () == error_code_end_of_stream) {
53755383 if (mutations.size () > 0 ) {
5376- wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task));
5384+ wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task, taskBucket ));
53775385 }
53785386 break ;
53795387 } else {
0 commit comments