Skip to content

Commit

Permalink
fix: lower batch size on BulkWriter retry to stay under throughput li…
Browse files Browse the repository at this point in the history
…mits (#1556)
  • Loading branch information
Brian Chen authored Jul 14, 2021
1 parent 866bd25 commit f17a36e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
37 changes: 28 additions & 9 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,23 @@ class BulkCommitBatch extends WriteBatch {
// been resolved.
readonly pendingOps: Array<BulkWriterOperation> = [];

readonly maxBatchSize: number;
private _maxBatchSize: number;

constructor(firestore: Firestore, maxBatchSize: number) {
super(firestore);
this.maxBatchSize = maxBatchSize;
this._maxBatchSize = maxBatchSize;
}

get maxBatchSize(): number {
return this._maxBatchSize;
}

setMaxBatchSize(size: number): void {
assert(
this.pendingOps.length <= size,
'New batch size cannot be less than the number of enqueued writes'
);
this._maxBatchSize = size;
}

has(documentRef: firestore.DocumentReference<unknown>): boolean {
Expand Down Expand Up @@ -865,6 +877,10 @@ export class BulkWriter {
if (this._bulkCommitBatch._opCount === 0) return;

const pendingBatch = this._bulkCommitBatch;
this._bulkCommitBatch = new BulkCommitBatch(
this.firestore,
this._maxBatchSize
);

// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
Expand All @@ -873,13 +889,6 @@ export class BulkWriter {
const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
const delayedExecution = new Deferred<void>();

// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
const maxBatchSize =
highestBackoffDuration > 0 ? RETRY_MAX_BATCH_SIZE : this._maxBatchSize;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, maxBatchSize);

if (backoffMsWithJitter > 0) {
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
} else {
Expand Down Expand Up @@ -1010,6 +1019,16 @@ export class BulkWriter {
enqueueOnBatchCallback: (bulkCommitBatch: BulkCommitBatch) => void,
op: BulkWriterOperation
): void {
// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
if (op.backoffDuration > 0) {
if (this._bulkCommitBatch.pendingOps.length >= RETRY_MAX_BATCH_SIZE) {
this._scheduleCurrentBatch(/* flush= */ false);
}
this._bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE);
}

if (this._bulkCommitBatch.has(op.ref)) {
// Create a new batch since the backend doesn't support batches with two
// writes to the same document.
Expand Down
7 changes: 5 additions & 2 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ describe('BulkWriter', () => {
{
request: createRequest(
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map((_, i) =>
setOp('doc' + i + RETRY_MAX_BATCH_SIZE, 'bar')
setOp('doc' + (i + RETRY_MAX_BATCH_SIZE), 'bar')
)
),
response: mergeResponses(
Expand All @@ -857,10 +857,13 @@ describe('BulkWriter', () => {
},
]);
for (let i = 0; i < 15; i++) {
bulkWriter.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'});
bulkWriter
.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'})
.then(incrementOpCount);
}

await bulkWriter.close();
expect(opCount).to.equal(15);
});

it('retries maintain correct write resolution ordering', async () => {
Expand Down

0 comments on commit f17a36e

Please sign in to comment.