1
1
import { expect , test , describe } from "bun:test" ;
2
+ import { Redis } from "../redis" ;
2
3
import { newHttpClient } from "../test-utils" ;
3
- import { SubscribeCommand } from "./subscribe" ;
4
- import { PublishCommand } from "./publish" ;
5
4
6
- describe ( "Subscribe Command " , ( ) => {
5
+ describe ( "Subscriber " , ( ) => {
7
6
const client = newHttpClient ( ) ;
7
+ const redis = new Redis ( client ) ;
8
8
9
9
test ( "receives a single published message" , async ( ) => {
10
10
const channel = "test-single" ;
11
11
const receivedMessages : any [ ] = [ ] ;
12
12
13
- const subscription = new SubscribeCommand ( [ channel ] , {
14
- onMessage : ( message ) => {
15
- receivedMessages . push ( JSON . parse ( message ) ) ;
16
- } ,
13
+ const subscriber = redis . subscribe ( [ channel ] ) ;
14
+ subscriber . on ( "message" , ( message ) => {
15
+ receivedMessages . push ( message ) ;
17
16
} ) ;
18
17
19
- const subscribePromise = subscription . exec ( client ) ;
20
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
18
+ // Wait for subscription to establish
19
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
21
20
22
21
const testMessage = {
23
22
user : "testUser" ,
24
23
message : "Hello, World!" ,
25
24
timestamp : Date . now ( ) ,
26
25
} ;
27
26
28
- await new PublishCommand ( [ channel , JSON . stringify ( testMessage ) ] ) . exec ( client ) ;
29
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
27
+ await redis . publish ( channel , testMessage ) ;
28
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
30
29
31
- const result = await subscribePromise ;
32
- expect ( result ) . toBe ( 1 ) ;
33
30
expect ( receivedMessages ) . toHaveLength ( 1 ) ;
34
31
expect ( receivedMessages [ 0 ] ) . toEqual ( testMessage ) ;
35
- } , 10000 ) ;
32
+
33
+ await subscriber . unsubscribe ( ) ;
34
+ } , 10_000 ) ;
36
35
37
36
test ( "receives multiple messages in order" , async ( ) => {
38
37
const channel = "test-multiple" ;
39
38
const receivedMessages : any [ ] = [ ] ;
40
39
41
- const subscription = new SubscribeCommand ( [ channel ] , {
42
- onMessage : ( message ) => {
43
- receivedMessages . push ( JSON . parse ( message ) ) ;
44
- } ,
40
+ const subscriber = redis . subscribe ( [ channel ] ) ;
41
+ subscriber . on ( "message" , ( message ) => {
42
+ receivedMessages . push ( message ) ;
45
43
} ) ;
46
44
47
- const subscribePromise = subscription . exec ( client ) ;
48
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
45
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
49
46
50
47
const messages = [
51
48
{ user : "user1" , message : "First" , timestamp : Date . now ( ) } ,
@@ -54,77 +51,115 @@ describe("Subscribe Command", () => {
54
51
] ;
55
52
56
53
for ( const msg of messages ) {
57
- await new PublishCommand ( [ channel , JSON . stringify ( msg ) ] ) . exec ( client ) ;
54
+ await redis . publish ( channel , msg ) ;
58
55
await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
59
56
}
60
57
61
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
62
- await subscribePromise ;
58
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
63
59
64
60
expect ( receivedMessages ) . toHaveLength ( messages . length ) ;
65
61
expect ( receivedMessages . map ( ( m ) => m . message ) ) . toEqual ( messages . map ( ( m ) => m . message ) ) ;
66
- } , 15000 ) ;
67
-
68
- test ( "uses default message handler when no handler provided" , async ( ) => {
69
- const channel = "test-default" ;
70
- const logs : any [ ] = [ ] ;
71
- const originalLog = console . log ;
72
- console . log = ( ...args ) => logs . push ( args ) ;
73
-
74
- try {
75
- const subscription = new SubscribeCommand ( [ channel ] ) ;
76
- const subscribePromise = subscription . exec ( client ) ;
77
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
78
-
79
- const testMessage = {
80
- user : "testUser" ,
81
- message : "Test default" ,
82
- timestamp : Date . now ( ) ,
83
- } ;
84
-
85
- await new PublishCommand ( [ channel , JSON . stringify ( testMessage ) ] ) . exec ( client ) ;
86
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
87
- await subscribePromise ;
88
-
89
- expect (
90
- logs . some (
91
- ( log ) =>
92
- log [ 0 ] === "message" && log [ 1 ] === channel && log [ 2 ] . message === testMessage . message
93
- )
94
- ) . toBe ( true ) ;
95
- } finally {
96
- console . log = originalLog ;
97
- }
98
- } , 10000 ) ;
62
+
63
+ await subscriber . unsubscribe ( ) ;
64
+ } , 15_000 ) ;
65
+
66
+ test ( "handles channel-specific messages" , async ( ) => {
67
+ const channel = "test-specific" ;
68
+ const channelMessages : any [ ] = [ ] ;
69
+
70
+ const subscriber = redis . subscribe ( [ channel ] ) ;
71
+ subscriber . on ( `message:${ channel } ` , ( message ) => {
72
+ channelMessages . push ( message ) ;
73
+ } ) ;
74
+
75
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
76
+
77
+ const testMessage = {
78
+ user : "testUser" ,
79
+ message : "Channel specific" ,
80
+ timestamp : Date . now ( ) ,
81
+ } ;
82
+
83
+ await redis . publish ( channel , testMessage ) ;
84
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
85
+
86
+ expect ( channelMessages ) . toHaveLength ( 1 ) ;
87
+ expect ( channelMessages [ 0 ] ) . toEqual ( testMessage ) ;
88
+
89
+ await subscriber . unsubscribe ( ) ;
90
+ } , 10_000 ) ;
99
91
100
92
test ( "multiple subscribers receive same message" , async ( ) => {
101
93
const channel = "test-multi-sub" ;
102
94
const messages1 : any [ ] = [ ] ;
103
95
const messages2 : any [ ] = [ ] ;
104
96
105
- const sub1 = new SubscribeCommand ( [ channel ] , {
106
- onMessage : ( message ) => messages1 . push ( JSON . parse ( message ) ) ,
107
- } ) ;
108
- const sub2 = new SubscribeCommand ( [ channel ] , {
109
- onMessage : ( message ) => messages2 . push ( JSON . parse ( message ) ) ,
110
- } ) ;
97
+ const subscriber1 = redis . subscribe ( [ channel ] ) ;
98
+ const subscriber2 = redis . subscribe ( [ channel ] ) ;
111
99
112
- const [ promise1 , promise2 ] = [ sub1 . exec ( client ) , sub2 . exec ( client ) ] ;
113
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
100
+ subscriber1 . on ( "message" , ( message ) => messages1 . push ( message ) ) ;
101
+ subscriber2 . on ( "message" , ( message ) => messages2 . push ( message ) ) ;
102
+
103
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
114
104
115
105
const testMessage = {
116
106
user : "testUser" ,
117
107
message : "Broadcast" ,
118
108
timestamp : Date . now ( ) ,
119
109
} ;
120
110
121
- await new PublishCommand ( [ channel , JSON . stringify ( testMessage ) ] ) . exec ( client ) ;
122
- await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
123
-
124
- await Promise . all ( [ promise1 , promise2 ] ) ;
111
+ await redis . publish ( channel , testMessage ) ;
112
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
125
113
126
114
expect ( messages1 [ 0 ] ) . toEqual ( testMessage ) ;
127
115
expect ( messages2 [ 0 ] ) . toEqual ( testMessage ) ;
128
116
expect ( messages1 ) . toEqual ( messages2 ) ;
129
- } , 15000 ) ;
117
+
118
+ await Promise . all ( [ subscriber1 . unsubscribe ( ) , subscriber2 . unsubscribe ( ) ] ) ;
119
+ } , 15_000 ) ;
120
+
121
+ test ( "unsubscribe from specific channel" , async ( ) => {
122
+ const channels = [ "channel1" , "channel2" ] ;
123
+ const messages : Record < string , any [ ] > = {
124
+ channel1 : [ ] ,
125
+ channel2 : [ ] ,
126
+ } ;
127
+
128
+ const subscriber = redis . subscribe ( channels ) ;
129
+
130
+ subscriber . on ( "messageBuffer" , ( { channel, message } ) => {
131
+ messages [ channel ] . push ( message ) ;
132
+ } ) ;
133
+
134
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
135
+
136
+ // Send initial messages to both channels
137
+ await redis . publish ( "channel1" , { test : "before1" } ) ;
138
+ await redis . publish ( "channel2" , { test : "before2" } ) ;
139
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
140
+
141
+ // Verify both channels received messages
142
+ expect ( messages . channel1 ) . toHaveLength ( 1 ) ;
143
+ expect ( messages . channel2 ) . toHaveLength ( 1 ) ;
144
+
145
+ // Unsubscribe from channel1
146
+ await subscriber . unsubscribe ( [ "channel1" ] ) ;
147
+ expect ( subscriber . getSubscribedChannels ( ) ) . toEqual ( [ "channel2" ] ) ;
148
+
149
+ // Clear messages for clean test
150
+ messages . channel1 = [ ] ;
151
+ messages . channel2 = [ ] ;
152
+
153
+ // Send more messages
154
+ await redis . publish ( "channel1" , { test : "after1" } ) ;
155
+ await redis . publish ( "channel2" , { test : "after2" } ) ;
156
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
157
+
158
+ // Verify only channel2 received message
159
+ expect ( messages . channel1 ) . toHaveLength ( 0 ) ;
160
+ expect ( messages . channel2 ) . toHaveLength ( 1 ) ;
161
+ expect ( messages . channel2 [ 0 ] ) . toEqual ( { test : "after2" } ) ;
162
+
163
+ await subscriber . unsubscribe ( ) ;
164
+ } , 15_000 ) ;
130
165
} ) ;
0 commit comments