Skip to content
48 changes: 37 additions & 11 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
import type { OperationParent } from './operations/command';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument',
'fullDocumentBeforeChange',
'showExpandedEvents'
] as const;
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -43,6 +34,41 @@ const NO_RESUME_TOKEN_ERROR =
'A change stream document has been received that lacks a resume token (_id).';
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';

const INVALID_STAGE_OPTIONS = new Set([
'authdb',
'batchSize',
'bsonRegExp',
'collation',
'comment',
'dbName',
'enableUtf8Validation',
'fieldsAsRaw',
'ignoreUndefined',
'maxAwaitTimeMS',
'maxTimeMS',
'promoteBuffers',
'promoteLongs',
'promoteValues',
'raw',
'rawData',
'readPreference',
'serializeFunctions',
'timeoutContext',
'timeoutMS',
'useBigInt64',
'writeConcern'
]);

export function filterOutOptions(options: AnyOptions): AnyOptions {
const filterOptions: AnyOptions = {};
for (const name of Object.keys(options)) {
if (!INVALID_STAGE_OPTIONS.has(name)) {
filterOptions[name] = options[name];
}
}
return filterOptions;
}

/**
* Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server.
* @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
Expand Down Expand Up @@ -898,7 +924,7 @@ export class ChangeStream<
private _createChangeStreamCursor(
options: ChangeStreamOptions | ChangeStreamCursorOptions
): ChangeStreamCursor<TSchema, TChange> {
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions: Document = filterOutOptions(options);
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}
Expand Down
37 changes: 5 additions & 32 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ describe('Change Streams', function () {
});
});

it('ignores any invalid option values', function () {
it('allows invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
Expand Down Expand Up @@ -1809,7 +1809,7 @@ describe('Change Streams', function () {
});

context('invalid options', function () {
it('does not send invalid options on the aggregate command', {
it('server errors on invalid options on the initialize', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started: CommandStartedEvent[] = [];
Expand All @@ -1819,35 +1819,8 @@ describe('Change Streams', function () {
// @ts-expect-error: checking for invalid options
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});

it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
// @ts-expect-error: checking for invalid options
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
const error = await once(cs, 'change').catch(error => error);
expect(error).to.be.instanceOf(MongoServerError);
}
});
});
Expand Down