1
1
import { type SanityClient } from '@sanity/client'
2
- import { merge , type Observable , of , ReplaySubject , share , timer } from 'rxjs'
2
+ import { EMPTY , merge , Observable , of , ReplaySubject , share , timer } from 'rxjs'
3
+ import { mergeMap } from 'rxjs/operators'
3
4
4
5
import { type PairListenerOptions } from '../getPairListener'
5
6
import { type IdPair } from '../types'
@@ -24,12 +25,16 @@ export const memoizedPair: (
24
25
serverActionsEnabled : Observable < boolean > ,
25
26
pairListenerOptions ?: PairListenerOptions ,
26
27
) : Observable < Pair > => {
27
- const pair = checkoutPair ( client , idPair , serverActionsEnabled , pairListenerOptions )
28
- return merge (
29
- of ( pair ) ,
30
- // makes sure the pair listener is kept alive for as long as there are subscribers
31
- pair . _keepalive ,
32
- ) . pipe (
28
+ return new Observable < Pair > ( ( subscriber ) => {
29
+ const pair = checkoutPair ( client , idPair , serverActionsEnabled , pairListenerOptions )
30
+ return merge (
31
+ of ( pair ) ,
32
+ // merge in draft events and published events to makes sure they receive
33
+ // the events they need for as long as the pair is subscribed to
34
+ pair . draft . events . pipe ( mergeMap ( ( ) => EMPTY ) ) ,
35
+ pair . published . events . pipe ( mergeMap ( ( ) => EMPTY ) ) ,
36
+ ) . subscribe ( subscriber )
37
+ } ) . pipe (
33
38
share ( {
34
39
connector : ( ) => new ReplaySubject ( 1 ) ,
35
40
resetOnComplete : true ,
0 commit comments