1
+ /**
2
+ * This module manages PostgreSQL connections in serverless applications.
3
+ * This module wrap node-postgres package, more detail regarding it can be found here:
4
+ * https://github.com/brianc/node-postgres
5
+ * @author Matteo Gioioso <[email protected] >
6
+ * @version 1.1.0
7
+ * @license MIT
8
+ */
9
+
1
10
const { Client } = require ( "pg" ) ;
2
11
3
12
function ServerlessClient ( config ) {
4
- Client . call ( this , config ) ;
5
-
6
13
this . _config = config ;
7
- this . _maxRetries = config . maxRetries || 10 ;
14
+
15
+ // If this parameters is set to true it will query to get the maxConnections values,
16
+ // to maximize performance you should set the maxConnections yourself.
17
+ // Is suggested to manually set the maxConnections and keep this setting to false.
18
+ this . _automaticMaxConnections = config . automaticMaxConnections
19
+ // Cache expiration for getting the max connections value in milliseconds
20
+ this . _maxConnsFreqMs = config . maxConnsFreqMs || 60000 ;
8
21
this . _maxConnections = config . maxConnections || 100 ;
22
+ this . _maxConnectionsCache = {
23
+ total : this . _maxConnections ,
24
+ updated : Date . now ( )
25
+ }
9
26
10
- // The bigger the slower, but more impactful on the connections dropped
11
- this . _maxRetrivedProcesses = config . maxRetrivedProcesses || 10 ;
27
+ // Activate debugging logger
28
+ this . _debug = config . debug
12
29
13
- // This number represent the percentage threshold at which the cleanup will be triggered.
14
- // Ex: if you have 100 max connections and processesPercentageThreshold set at 60 (%),
15
- // then it will start dropping connections if the total idle connections count is more than 60
16
- this . _processesPercentageThreshold = ( config . processesPercentageThreshold || 50 ) / 100
17
- this . _retries = 1 ;
30
+ // The bigger, the more idle connections will be possibly dropped
31
+ this . _maxIdleConnections = config . maxIdleConnections || 10 ;
18
32
19
- // pg throws an error if we terminate the connection, therefore we need to swallow these errors
20
- // and throw the rest
21
- this . on ( "error" , err => {
22
- if (
23
- err . message === "terminating connection due to administrator command" ||
24
- err . message === "Connection terminated unexpectedly"
25
- ) {
26
- // Swallow the error
27
- } else if ( err . message === "sorry, too many clients already" ) {
28
- throw err ;
29
- } else {
30
- throw err ;
31
- }
32
- } ) ;
33
+ // The percentage of total connections to use when connecting to your Postgres server.
34
+ // A value of 0.75 would use 75% of your total available connections.
35
+ // Past this threshold the connection killer will kick in.
36
+ this . _connUtilization = config . connUtilization || 0.8
37
+
38
+ this . _backoff = {
39
+ capMs : config . capMs ,
40
+ baseMs : config . baseMs ,
41
+ delayMs : config . retryDelayMs || 1000 ,
42
+ maxRetries : config . maxRetries || 3 ,
43
+ retries : 0 ,
44
+ }
33
45
}
34
46
35
- ServerlessClient . prototype = new Client ( ) ;
36
47
ServerlessClient . prototype . constructor = ServerlessClient ;
37
48
ServerlessClient . prototype . _sleep = delay =>
38
49
new Promise ( resolve => {
@@ -41,19 +52,39 @@ ServerlessClient.prototype._sleep = delay =>
41
52
} , delay ) ;
42
53
} ) ;
43
54
55
+ ServerlessClient . prototype . _setMaxConnections = async ( ) => {
56
+ // If cache is expired
57
+ if ( Date . now ( ) - this . _maxConnectionsCache . updated > this . _maxConnsFreqMs ) {
58
+ const results = await this . _client . query (
59
+ `SHOW max_connections`
60
+ )
61
+
62
+ this . _logger ( "Getting max connections from database..." )
63
+
64
+ this . _maxConnectionsCache = {
65
+ total : results . rows [ 0 ] ,
66
+ updated : Date . now ( )
67
+ }
68
+ }
69
+
70
+ this . _maxConnections = this . _maxConnectionsCache . total
71
+ }
72
+
44
73
ServerlessClient . prototype . _getIdleProcessesListOrderByDate = async function ( ) {
45
- return this . query (
74
+ const result = await this . _client . query (
46
75
`SELECT pid,backend_start,state
47
76
FROM pg_stat_activity
48
77
WHERE datname=$1 AND state='idle'
49
78
ORDER BY backend_start
50
79
DESC LIMIT $2;` ,
51
- [ this . _config . database , this . _maxRetrivedProcesses ]
80
+ [ this . _config . database , this . _maxIdleConnections ]
52
81
) ;
82
+
83
+ return result . rows
53
84
} ;
54
85
55
86
ServerlessClient . prototype . _getProcessesCount = async function ( ) {
56
- const result = await this . query (
87
+ const result = await this . _client . query (
57
88
"SELECT COUNT(pid) FROM pg_stat_activity WHERE datname=$1 AND state='idle';" ,
58
89
[ this . _config . database ]
59
90
) ;
@@ -62,48 +93,110 @@ ServerlessClient.prototype._getProcessesCount = async function() {
62
93
} ;
63
94
64
95
ServerlessClient . prototype . _killProcesses = async function ( processesList ) {
65
- const pids = processesList . rows . map ( proc => proc . pid ) ;
96
+ const pids = processesList . map ( proc => proc . pid ) ;
66
97
const query = `
67
98
SELECT pg_terminate_backend(pid)
68
99
FROM pg_stat_activity
69
100
WHERE pid = ANY ($1)
70
101
AND datname = $2 AND state='idle';`
71
102
const values = [ pids , this . _config . database ]
72
103
73
- return this . query ( query , values )
104
+ return this . _client . query ( query , values )
74
105
} ;
75
106
107
+ ServerlessClient . prototype . _decorrelatedJitter = function ( delay = 0 ) {
108
+ const cap = Number . isInteger ( this . _backoff . capMs ) ? this . _backoff . capMs : 100 // default to 100 ms
109
+ const base = Number . isInteger ( this . _backoff . baseMs ) ? this . _backoff . baseMs : 2 // default to 2 ms
110
+ const randRange = ( min , max ) => Math . floor ( Math . random ( ) * ( max - min + 1 ) ) + min
111
+ return Math . min ( cap , randRange ( base , delay * 3 ) )
112
+ }
113
+
76
114
ServerlessClient . prototype . clean = async function ( ) {
77
115
const processCount = await this . _getProcessesCount ( ) ;
116
+ this . _logger ( "Current process count: " , processCount )
78
117
79
- if ( processCount > this . _maxConnections * this . _processesPercentageThreshold ) {
118
+ if ( processCount > this . _maxConnections * this . _connUtilization ) {
80
119
const processesList = await this . _getIdleProcessesListOrderByDate ( ) ;
81
120
await this . _killProcesses ( processesList ) ;
121
+ this . _logger ( "Killed processes: " , processesList . length )
82
122
}
83
123
} ;
84
124
85
125
ServerlessClient . prototype . sconnect = async function ( ) {
86
126
try {
87
- await this . connect ( ) ;
127
+ await this . _init ( )
128
+ await this . _client . connect ( ) ;
88
129
} catch ( e ) {
89
130
if ( e . message === "sorry, too many clients already" ) {
131
+ // Client in node-pg is usable only one time, once it errors we cannot re-connect again,
132
+ // therefore we need to throw the instance and recreate a new one
133
+ await this . _init ( )
90
134
const backoff = async delay => {
91
- if ( this . _maxRetries > 0 ) {
92
- console . log ( this . _maxRetries , " trying to reconnect... " ) ;
93
- await this . _sleep ( delay ) ;
94
- this . _maxRetries -- ;
135
+ if ( this . _backoff . retries < this . _backoff . maxRetries ) {
136
+ this . _logger ( this . _backoff . maxRetries , " trying to reconnect... " )
137
+ const totalDelay = this . _decorrelatedJitter ( delay )
138
+ this . _logger ( "total delay: " , totalDelay )
139
+ await this . _sleep ( totalDelay ) ;
140
+ this . _backoff . retries ++ ;
95
141
await this . sconnect ( ) ;
96
- console . log ( "Re-connection successful!" ) ;
142
+ this . _logger ( "Re-connection successful after " , this . _backoff . retries )
97
143
}
98
144
} ;
99
145
100
- this . _retries ++ ;
101
- let delay = 1000 * this . _retries ;
102
- await backoff ( delay ) ;
146
+ this . _logger ( "Current delay: " , this . _backoff . delayMs )
147
+ await backoff ( this . _backoff . delayMs ) ;
103
148
} else {
104
149
throw e ;
105
150
}
106
151
}
107
152
} ;
108
153
154
+ ServerlessClient . prototype . _init = async function ( ) {
155
+ this . _client = new Client ( this . _config )
156
+
157
+ if ( this . _automaticMaxConnections ) {
158
+ await this . _setMaxConnections ( )
159
+ }
160
+
161
+ this . _logger ( "Max connections: " , this . _maxConnections )
162
+
163
+ // pg throws an error if we terminate the connection, therefore we need to swallow these errors
164
+ // and throw the rest
165
+ this . _client . on ( "error" , err => {
166
+ if (
167
+ err . message === "terminating connection due to administrator command" ||
168
+ err . message === "Connection terminated unexpectedly"
169
+ ) {
170
+ // Swallow the error
171
+ } else if ( err . message === "sorry, too many clients already" ) {
172
+ throw err ;
173
+ } else {
174
+ throw err ;
175
+ }
176
+ } ) ;
177
+ }
178
+
179
+ // TODO add validation for the client config
180
+ ServerlessClient . prototype . _validateConfig = function ( ) {
181
+
182
+ }
183
+
184
+ ServerlessClient . prototype . _logger = function ( ...args ) {
185
+ if ( this . _debug ) {
186
+ console . log ( '\x1b[36m%s\x1b[0m' , 'serverless-pg | ' , ...args )
187
+ }
188
+ }
189
+
190
+ ServerlessClient . prototype . query = async function ( ...args ) {
191
+ return this . _client . query ( ...args )
192
+ }
193
+
194
+ ServerlessClient . prototype . end = async function ( ) {
195
+ return this . _client . end ( )
196
+ }
197
+
198
+ ServerlessClient . prototype . on = function ( ...args ) {
199
+ return this . _client . on ( ...args )
200
+ }
201
+
109
202
module . exports = { ServerlessClient } ;
0 commit comments