Skip to content

Commit a2daf76

Browse files
feat(NODE-5510)!: dont filter change stream options (#4723)
Co-authored-by: bailey <[email protected]>
1 parent 52267a5 commit a2daf76

File tree

3 files changed

+111
-43
lines changed

3 files changed

+111
-43
lines changed

src/change_stream.ts

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
2020
import type { OperationParent } from './operations/command';
2121
import type { ServerSessionId } from './sessions';
2222
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
23-
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
24-
25-
const CHANGE_STREAM_OPTIONS = [
26-
'resumeAfter',
27-
'startAfter',
28-
'startAtOperationTime',
29-
'fullDocument',
30-
'fullDocumentBeforeChange',
31-
'showExpandedEvents'
32-
] as const;
23+
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
3324

3425
const CHANGE_DOMAIN_TYPES = {
3526
COLLECTION: Symbol('Collection'),
@@ -43,6 +34,14 @@ const NO_RESUME_TOKEN_ERROR =
4334
'A change stream document has been received that lacks a resume token (_id).';
4435
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';
4536

37+
const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions();
38+
39+
export function filterOutOptions(options: AnyOptions): AnyOptions {
40+
return Object.fromEntries(
41+
Object.entries(options).filter(([k, _]) => !INVALID_STAGE_OPTIONS.has(k))
42+
);
43+
}
44+
4645
/**
4746
* Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server.
4847
* @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
@@ -898,7 +897,7 @@ export class ChangeStream<
898897
private _createChangeStreamCursor(
899898
options: ChangeStreamOptions | ChangeStreamCursorOptions
900899
): ChangeStreamCursor<TSchema, TChange> {
901-
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
900+
const changeStreamStageOptions: Document = filterOutOptions(options);
902901
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
903902
changeStreamStageOptions.allChangesForCluster = true;
904903
}
@@ -1084,3 +1083,76 @@ export class ChangeStream<
10841083
}
10851084
}
10861085
}
1086+
1087+
/**
1088+
* This function returns a list of options that are *not* supported by the $changeStream
1089+
* aggregation stage. This is best-effort - it uses the options "officially supported" by the driver
1090+
* to derive a list of known, unsupported options for the $changeStream stage.
1091+
*
1092+
* Notably, at runtime, users can still provide options unknown to the driver and the driver will
1093+
* *not* filter them out of the options object (see NODE-5510).
1094+
*/
1095+
function buildDisallowedChangeStreamOptions(): Set<string> {
1096+
/** hard-coded list of allowed ChangeStream options */
1097+
type CSOptions =
1098+
| 'resumeAfter'
1099+
| 'startAfter'
1100+
| 'startAtOperationTime'
1101+
| 'fullDocument'
1102+
| 'fullDocumentBeforeChange'
1103+
| 'showExpandedEvents';
1104+
1105+
/**
1106+
* a type representing all known options that the driver supports that are *not* change stream stage options.
1107+
*
1108+
* each known key is mapped to a non-optional string, so that if new driver-specific options are added, the
1109+
* instantiation of `denyList` below results in a TS error.
1110+
*/
1111+
type DisallowedOptions = {
1112+
[k in Exclude<
1113+
keyof ChangeStreamOptions & { timeoutContext: TimeoutContext },
1114+
CSOptions
1115+
>]: string;
1116+
};
1117+
1118+
const denyList: DisallowedOptions = {
1119+
allowDiskUse: '',
1120+
authdb: '',
1121+
batchSize: '',
1122+
bsonRegExp: '',
1123+
bypassDocumentValidation: '',
1124+
bypassPinningCheck: '',
1125+
checkKeys: '',
1126+
collation: '',
1127+
comment: '',
1128+
cursor: '',
1129+
dbName: '',
1130+
enableUtf8Validation: '',
1131+
explain: '',
1132+
fieldsAsRaw: '',
1133+
hint: '',
1134+
ignoreUndefined: '',
1135+
let: '',
1136+
maxAwaitTimeMS: '',
1137+
maxTimeMS: '',
1138+
omitMaxTimeMS: '',
1139+
out: '',
1140+
promoteBuffers: '',
1141+
promoteLongs: '',
1142+
promoteValues: '',
1143+
raw: '',
1144+
rawData: '',
1145+
readConcern: '',
1146+
readPreference: '',
1147+
serializeFunctions: '',
1148+
session: '',
1149+
timeoutContext: '',
1150+
timeoutMS: '',
1151+
timeoutMode: '',
1152+
useBigInt64: '',
1153+
willRetryWrite: '',
1154+
writeConcern: ''
1155+
};
1156+
1157+
return new Set(Object.keys(denyList));
1158+
}

test/integration/change-streams/change_stream.test.ts

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ describe('Change Streams', function () {
157157
});
158158
});
159159

160-
it('ignores any invalid option values', function () {
160+
it('allows invalid option values', function () {
161161
const changeStream = collection.watch([], { invalidOption: true });
162162

163-
expect(changeStream).not.to.have.nested.property(
163+
expect(changeStream).to.have.nested.property(
164164
'cursor.pipeline[0].$changeStream.invalidOption'
165165
);
166166
});
@@ -1809,7 +1809,7 @@ describe('Change Streams', function () {
18091809
});
18101810

18111811
context('invalid options', function () {
1812-
it('does not send invalid options on the aggregate command', {
1812+
it('server errors on invalid options on the initialize', {
18131813
metadata: { requires: { topology: '!single' } },
18141814
test: async function () {
18151815
const started: CommandStartedEvent[] = [];
@@ -1819,35 +1819,8 @@ describe('Change Streams', function () {
18191819
// @ts-expect-error: checking for invalid options
18201820
cs = collection.watch([], doc);
18211821

1822-
const willBeChange = once(cs, 'change').then(args => args[0]);
1823-
await once(cs.cursor, 'init');
1824-
1825-
const result = await collection.insertOne({ a: Long.fromNumber(0) });
1826-
expect(result).to.exist;
1827-
1828-
await willBeChange;
1829-
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
1830-
}
1831-
});
1832-
1833-
it('does not send invalid options on the getMore command', {
1834-
metadata: { requires: { topology: '!single' } },
1835-
test: async function () {
1836-
const started: CommandStartedEvent[] = [];
1837-
1838-
client.on('commandStarted', filterForCommands(['aggregate'], started));
1839-
const doc = { invalidBSONOption: true };
1840-
// @ts-expect-error: checking for invalid options
1841-
cs = collection.watch([], doc);
1842-
1843-
const willBeChange = once(cs, 'change').then(args => args[0]);
1844-
await once(cs.cursor, 'init');
1845-
1846-
const result = await collection.insertOne({ a: Long.fromNumber(0) });
1847-
expect(result).to.exist;
1848-
1849-
await willBeChange;
1850-
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
1822+
const error = await once(cs, 'change').catch(error => error);
1823+
expect(error).to.be.instanceOf(MongoServerError);
18511824
}
18521825
});
18531826
});

test/unit/change_stream.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Long, Timestamp } from 'bson';
22
import { expect } from 'chai';
33
import * as sinon from 'sinon';
44

5+
import { filterOutOptions } from '../../src/change_stream';
56
import { ChangeStreamCursor } from '../../src/cursor/change_stream_cursor';
67
import { MongoClient } from '../../src/mongo_client';
78
import { MongoDBNamespace } from '../../src/utils';
@@ -11,6 +12,28 @@ describe('ChangeStreamCursor', function () {
1112
sinon.restore();
1213
});
1314

15+
describe('#filterOutOptions', function () {
16+
const options = {
17+
raw: false,
18+
useBigInt64: false,
19+
promoteLongs: true,
20+
promoteValues: true,
21+
promoteBuffers: false,
22+
ignoreUndefined: false,
23+
bsonRegExp: false,
24+
serializeFunctions: false,
25+
fieldsAsRaw: {},
26+
enableUtf8Validation: true,
27+
fullDocument: true
28+
};
29+
30+
it('filters out all invalid options', function () {
31+
expect(filterOutOptions(options)).to.deep.equal({
32+
fullDocument: true
33+
});
34+
});
35+
});
36+
1437
describe('get resumeOptions()', function () {
1538
context('when there is a cached resumeToken', function () {
1639
it('copies all non-resume related options from the original cursor', function () {

0 commit comments

Comments
 (0)