88use MongoDB \Driver \ReadPreference ;
99use MongoDB \Driver \Server ;
1010use MongoDB \Driver \Exception \ConnectionTimeoutException ;
11+ use MongoDB \Driver \Exception \LogicException ;
1112use MongoDB \Exception \ResumeTokenException ;
1213use MongoDB \Operation \CreateCollection ;
1314use MongoDB \Operation \DatabaseCommand ;
@@ -225,6 +226,62 @@ private function assertStartAtOperationTime(TimestampInterface $expectedOperatio
225226 $ this ->assertEquals ($ expectedOperationTime , $ command ->pipeline [0 ]->{'$changeStream ' }->startAtOperationTime );
226227 }
227228
229+ public function testRewindMultipleTimesWithResults ()
230+ {
231+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
232+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
233+
234+ $ this ->insertDocument (['x ' => 1 ]);
235+ $ this ->insertDocument (['x ' => 2 ]);
236+
237+ $ changeStream ->rewind ();
238+ $ this ->assertTrue ($ changeStream ->valid ());
239+ $ this ->assertSame (0 , $ changeStream ->key ());
240+ $ this ->assertNotNull ($ changeStream ->current ());
241+
242+ // Subsequent rewind does not change iterator state
243+ $ changeStream ->rewind ();
244+ $ this ->assertTrue ($ changeStream ->valid ());
245+ $ this ->assertSame (0 , $ changeStream ->key ());
246+ $ this ->assertNotNull ($ changeStream ->current ());
247+
248+ $ changeStream ->next ();
249+
250+ $ this ->assertTrue ($ changeStream ->valid ());
251+ $ this ->assertSame (1 , $ changeStream ->key ());
252+ $ this ->assertNotNull ($ changeStream ->current ());
253+
254+ // Rewinding after advancing the iterator is an error
255+ $ this ->expectException (LogicException::class);
256+ $ changeStream ->rewind ();
257+ }
258+
259+ public function testRewindMultipleTimesWithNoResults ()
260+ {
261+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
262+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
263+
264+ $ changeStream ->rewind ();
265+ $ this ->assertFalse ($ changeStream ->valid ());
266+ $ this ->assertNull ($ changeStream ->key ());
267+ $ this ->assertNull ($ changeStream ->current ());
268+
269+ // Subsequent rewind does not change iterator state
270+ $ changeStream ->rewind ();
271+ $ this ->assertFalse ($ changeStream ->valid ());
272+ $ this ->assertNull ($ changeStream ->key ());
273+ $ this ->assertNull ($ changeStream ->current ());
274+
275+ $ changeStream ->next ();
276+ $ this ->assertFalse ($ changeStream ->valid ());
277+ $ this ->assertNull ($ changeStream ->key ());
278+ $ this ->assertNull ($ changeStream ->current ());
279+
280+ // Rewinding after advancing the iterator is an error
281+ $ this ->expectException (LogicException::class);
282+ $ changeStream ->rewind ();
283+ }
284+
228285 public function testRewindResumesAfterConnectionException ()
229286 {
230287 /* In order to trigger a dropped connection, we'll use a new client with
@@ -322,20 +379,67 @@ public function testNoChangeAfterResumeBeforeInsert()
322379 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
323380 }
324381
325- public function testResumeTokenIsUpdatedAfterResuming ()
382+ public function testResumeMultipleTimesInSuccession ()
326383 {
327- $ this ->insertDocument (['_id ' => 1 ]);
384+ $ operation = new CreateCollection ($ this ->getDatabaseName (), $ this ->getCollectionName ());
385+ $ operation ->execute ($ this ->getPrimaryServer ());
328386
329387 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
330388 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
331389
390+ /* Killing the cursor when there are no results will test that neither
391+ * the initial rewind() nor its resume attempt incremented the key. */
392+ $ this ->killChangeStreamCursor ($ changeStream );
393+
332394 $ changeStream ->rewind ();
395+ $ this ->assertFalse ($ changeStream ->valid ());
396+ $ this ->assertNull ($ changeStream ->key ());
333397 $ this ->assertNull ($ changeStream ->current ());
334398
399+ $ this ->insertDocument (['_id ' => 1 ]);
400+
401+ /* Killing the cursor a second time when there is a result will test
402+ * that the resume attempt picks up the latest change. */
403+ $ this ->killChangeStreamCursor ($ changeStream );
404+
405+ $ changeStream ->rewind ();
406+ $ this ->assertTrue ($ changeStream ->valid ());
407+ $ this ->assertSame (0 , $ changeStream ->key ());
408+
409+ $ expectedResult = [
410+ '_id ' => $ changeStream ->current ()->_id ,
411+ 'operationType ' => 'insert ' ,
412+ 'fullDocument ' => ['_id ' => 1 ],
413+ 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
414+ 'documentKey ' => ['_id ' => 1 ],
415+ ];
416+
417+ $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
418+
419+ /* Killing the cursor a second time will not trigger a resume until
420+ * ChangeStream::next() is called. A successive call to rewind() should
421+ * not change the iterator's state and preserve the current result. */
422+ $ this ->killChangeStreamCursor ($ changeStream );
423+
424+ $ changeStream ->rewind ();
425+ $ this ->assertTrue ($ changeStream ->valid ());
426+ $ this ->assertSame (0 , $ changeStream ->key ());
427+
428+ $ expectedResult = [
429+ '_id ' => $ changeStream ->current ()->_id ,
430+ 'operationType ' => 'insert ' ,
431+ 'fullDocument ' => ['_id ' => 1 ],
432+ 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
433+ 'documentKey ' => ['_id ' => 1 ],
434+ ];
435+
436+ $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
437+
335438 $ this ->insertDocument (['_id ' => 2 ]);
336439
337440 $ changeStream ->next ();
338441 $ this ->assertTrue ($ changeStream ->valid ());
442+ $ this ->assertSame (1 , $ changeStream ->key ());
339443
340444 $ expectedResult = [
341445 '_id ' => $ changeStream ->current ()->_id ,
@@ -353,6 +457,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
353457
354458 $ changeStream ->next ();
355459 $ this ->assertTrue ($ changeStream ->valid ());
460+ $ this ->assertSame (2 , $ changeStream ->key ());
356461
357462 $ expectedResult = [
358463 '_id ' => $ changeStream ->current ()->_id ,
@@ -374,6 +479,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
374479
375480 $ changeStream ->next ();
376481 $ this ->assertTrue ($ changeStream ->valid ());
482+ $ this ->assertSame (3 , $ changeStream ->key ());
377483
378484 $ expectedResult = [
379485 '_id ' => $ changeStream ->current ()->_id ,
@@ -689,6 +795,8 @@ public function testNextAdvancesKey()
689795 $ this ->insertDocument (['x ' => 1 ]);
690796 $ this ->insertDocument (['x ' => 2 ]);
691797
798+ /* Note: we intentionally do not start iteration with rewind() to ensure
799+ * that next() behaves identically when called without rewind(). */
692800 $ changeStream ->next ();
693801
694802 $ this ->assertSame (0 , $ changeStream ->key ());
0 commit comments