Skip to content
41 changes: 30 additions & 11 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate';
import type { OperationParent } from './operations/command';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument',
'fullDocumentBeforeChange',
'showExpandedEvents'
] as const;
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -43,6 +34,34 @@ const NO_RESUME_TOKEN_ERROR =
'A change stream document has been received that lacks a resume token (_id).';
const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';

const INVALID_STAGE_OPTIONS = [
'raw',
'useBigInt64',
'promoteLongs',
'promoteValues',
'promoteBuffers',
'ignoreUndefined',
'bsonRegExp',
'serializeFunctions',
'fieldsAsRaw',
'enableUtf8Validation',
'timeoutMS',
'readPreference'
];

export function filterOutOptions(options: AnyOptions, names: ReadonlyArray<string>): AnyOptions {
const filterOptions: AnyOptions = {};

for (const name in options) {
if (!names.includes(name)) {
filterOptions[name] = options[name];
}
}

// Filtered options
return filterOptions;
}

/**
* Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server.
* @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
Expand Down Expand Up @@ -900,7 +919,7 @@ export class ChangeStream<
private _createChangeStreamCursor(
options: ChangeStreamOptions | ChangeStreamCursorOptions
): ChangeStreamCursor<TSchema, TChange> {
const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions: Document = filterOutOptions(options, INVALID_STAGE_OPTIONS);
if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}
Expand Down
4 changes: 2 additions & 2 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ describe('Change Streams', function () {
});
});

it('ignores any invalid option values', function () {
it('allows invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
Expand Down