1
+ 'use strict' ;
2
+
3
+ const WebSocket = require ( 'ws' ) ;
4
+ const EventEmitter = require ( 'events' ) ;
5
+ const MessageHelper = require ( './MessageHelper' ) ;
6
+
7
+ const CLIENT_STATUS = require ( './Constants' ) . CLIENT_STATUS ;
8
+ const MESSAGE_TYPE = require ( './Constants' ) . MESSAGE_TYPE ;
9
+
10
+
11
+ class LambdaStreamClient extends EventEmitter {
12
+ constructor ( endPoint , lChainId , accessKey ) {
13
+ super ( ) ;
14
+
15
+ this . endPoint = endPoint ;
16
+ this . lChainId = lChainId ;
17
+ this . accessKey = accessKey ;
18
+
19
+ this . wsClient = null ;
20
+
21
+ this . clientStatus = CLIENT_STATUS . NONE ;
22
+ this . isNeedReconnect = false ;
23
+
24
+ this . processMutex = false ;
25
+ }
26
+
27
+ changeClientStatus ( toStatus ) {
28
+ this . clientStatus = toStatus ;
29
+ }
30
+
31
+ async start ( ) {
32
+ this . changeClientStatus ( CLIENT_STATUS . START ) ;
33
+
34
+ if ( ! this . processTimer ) {
35
+ const self = this ;
36
+ this . processTimer = setInterval ( async function ( ) {
37
+ if ( true === self . processMutex ) {
38
+ return ;
39
+ }
40
+ self . processMutex = true ;
41
+
42
+ try {
43
+ await self . process ( ) ;
44
+ } catch ( err ) {
45
+ console . error ( err ) ;
46
+ }
47
+
48
+ self . processMutex = false ;
49
+ } , 3000 ) ;
50
+ }
51
+ }
52
+
53
+ async close ( ) {
54
+ this . changeClientStatus ( CLIENT_STATUS . CLOSE ) ;
55
+ }
56
+
57
+ async process ( ) {
58
+ if ( true === this . isNeedReconnect ) {
59
+
60
+ this . changeClientStatus ( CLIENT_STATUS . CONNECTING ) ;
61
+ await this . connect ( ) ;
62
+
63
+ } else if ( CLIENT_STATUS . START === this . clientStatus ) {
64
+ this . changeClientStatus ( CLIENT_STATUS . CONNECTING ) ;
65
+ await this . connect ( ) ;
66
+
67
+ } else if ( CLIENT_STATUS . CONNECTED === this . clientStatus ) {
68
+
69
+ this . changeClientStatus ( CLIENT_STATUS . STARTING ) ;
70
+ await MessageHelper . sendStartMessage ( this . wsClient ) ;
71
+
72
+ } else if ( CLIENT_STATUS . PAUSE === this . clientStatus ) {
73
+
74
+ await this . changeClientStatus ( CLIENT_STATUS . PAUSING ) ;
75
+ await MessageHelper . sendPauseMessage ( this . wsClient ) ;
76
+
77
+ } else if ( CLIENT_STATUS . RESUME === this . clientStatus ) {
78
+
79
+ this . changeClientStatus ( CLIENT_STATUS . RESUMING ) ;
80
+ await MessageHelper . sendResumeMessage ( this . wsClient ) ;
81
+
82
+ } else if ( CLIENT_STATUS . CLOSE === this . clientStatus ) {
83
+
84
+ this . changeClientStatus ( CLIENT_STATUS . CLOSING ) ;
85
+ await MessageHelper . sendCloseMessage ( this . wsClient ) ;
86
+ }
87
+ }
88
+
89
+ async onOpen ( ) {
90
+ this . changeClientStatus ( CLIENT_STATUS . CONNECTED ) ;
91
+ this . isNeedReconnect = false ;
92
+ this . emit ( 'open' ) ;
93
+ }
94
+
95
+ async onMessage ( data ) {
96
+ try {
97
+ let object = JSON . parse ( data ) ;
98
+ let type = true === Array . isArray ( object ) ? object [ 0 ] . data . type : object . data . type ;
99
+
100
+ if ( false === MESSAGE_TYPE . isDefined ( type ) ) {
101
+ throw new Error ( `undefined msg type : ${ type } ` ) ;
102
+ }
103
+
104
+ if ( true === CLIENT_STATUS . isDefined ( type ) ) {
105
+ this . changeClientStatus ( CLIENT_STATUS . get ( type ) ) ;
106
+ }
107
+
108
+ this . emit ( type , object ) ;
109
+ } catch ( err ) {
110
+ console . error ( err ) ;
111
+ }
112
+ }
113
+
114
+ async onClose ( ) {
115
+ if ( CLIENT_STATUS . CLOSED === this . clientStatus ) {
116
+ //beautiful closed.
117
+ console . info ( 'close successful' )
118
+ } else {
119
+ if ( this . clientStatus . key . startsWith ( 'PAUS' ) || this . clientStatus . key . startsWith ( 'CLOS' ) ) {
120
+ this . changeClientStatus ( CLIENT_STATUS . CLOSED ) ;
121
+ } else {
122
+ this . isNeedReconnect = true ;
123
+ console . info ( 'abnormal closed, try reconnect' ) ;
124
+ }
125
+ }
126
+ this . emit ( 'close' ) ;
127
+ }
128
+
129
+ async onError ( err ) {
130
+ this . emit ( 'error' , err ) ;
131
+ }
132
+
133
+ async connect ( ) {
134
+ let options = {
135
+ headers : {
136
+ lChainId : this . lChainId ,
137
+ accessKey : this . accessKey
138
+ }
139
+ } ;
140
+ /////////////////////////////
141
+
142
+ this . wsClient = new WebSocket ( this . endPoint , null , options ) ;
143
+
144
+ const self = this ;
145
+
146
+ this . wsClient . on ( 'message' , async function incoming ( message ) {
147
+ await this . self . onMessage ( message ) ;
148
+ } . bind ( { self : self } ) ) ;
149
+
150
+ this . wsClient . on ( 'open' , async function open ( ) {
151
+ await this . self . onOpen ( ) ;
152
+ } . bind ( { self : self } ) ) ;
153
+
154
+ this . wsClient . on ( 'close' , async function close ( ) {
155
+ await this . self . onClose ( ) ;
156
+ } . bind ( { self : self } ) ) ;
157
+
158
+ this . wsClient . on ( 'error' , async function error ( err ) {
159
+ await this . self . onError ( err ) ;
160
+ } . bind ( { self : self } ) ) ;
161
+ }
162
+
163
+
164
+ async resume ( ) {
165
+ let toState = CLIENT_STATUS . RESUME ;
166
+ this . clientStatus = toState ;
167
+ }
168
+
169
+ async pause ( ) {
170
+ let toState = CLIENT_STATUS . PAUSE ;
171
+
172
+ this . clientStatus = toState ;
173
+ }
174
+
175
+ async commit ( offset ) {
176
+ await MessageHelper . sendCommit ( this . wsClient , offset ) ;
177
+ }
178
+
179
+ async rollback ( offset ) {
180
+ await MessageHelper . sendRollback ( this . wsClient , offset ) ;
181
+ }
182
+
183
+ async closeQuietly ( ) {
184
+ try {
185
+ if ( this . wsClient ) {
186
+ this . wsClient . terminate ( ) ;
187
+ this . wsClient = null ;
188
+ }
189
+ } catch ( err ) {
190
+ console . warn ( err ) ;
191
+ }
192
+ }
193
+
194
+
195
+
196
+
197
+
198
+
199
+ }
200
+
201
+ module . exports = LambdaStreamClient ;
0 commit comments