Skip to content

Commit 5e333df

Browse files
committed
Respawn listener connection on error or shutdown the whole server depending on the value of PGWS_CHECK_LISTENER_INTERVAL
1 parent 1b7f8ac commit 5e333df

File tree

5 files changed

+199
-151
lines changed

5 files changed

+199
-151
lines changed

sample-env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ export PGWS_PORT=3000
2121
## (use "@filename" to load from separate file)
2222
export PGWS_JWT_SECRET="auwhfdnskjhewfi34uwehdlaehsfkuaeiskjnfduierhfsiweskjcnzeiluwhskdewishdnpwe"
2323
export PGWS_JWT_SECRET_BASE64=False
24+
25+
## Check database listener every 10 seconds
26+
## comment it out to disable and shutdown the server on listener errors (can be useful when using external process supervisors)
27+
export PGWS_CHECK_LISTENER_INTERVAL=10000

src/PostgresWebsockets/Broadcast.hs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ data Channel = Channel
6060

6161
instance A.ToJSON MultiplexerSnapshot
6262

63+
-- | Given a multiplexer derive a type that can be printed for debugging or logging purposes
64+
takeSnapshot :: Multiplexer -> IO MultiplexerSnapshot
65+
takeSnapshot multi =
66+
MultiplexerSnapshot <$> size <*> e <*> thread
67+
where
68+
size = atomically $ M.size $ channels multi
69+
thread = show <$> readMVar (producerThreadId multi)
70+
e = atomically $ isEmptyTQueue $ messages multi
71+
6372
-- | Opens a thread that relays messages from the producer thread to the channels forever
6473
relayMessagesForever :: Multiplexer -> IO ThreadId
6574
relayMessagesForever = forkIO . forever . relayMessages
@@ -89,17 +98,26 @@ newMultiplexer openProducer closeProducer = do
8998
-- | Given a multiplexer, a number of milliseconds and an IO computation that returns a boolean
9099
-- Runs the IO computation at every interval of milliseconds interval and reopens the multiplexer producer
91100
-- if the resulting boolean is true
101+
-- When interval is 0 this is NOOP, so the minimum interval is 1ms
92102
-- Call this in case you want to ensure the producer thread is killed and restarted under a certain condition
93103
superviseMultiplexer :: Multiplexer -> Int -> IO Bool -> IO ()
94104
superviseMultiplexer multi msInterval shouldRestart = do
95105
void $
96106
forkIO $
97107
forever $ do
98-
threadDelay msInterval
108+
threadDelay $ msInterval * 1000
99109
sr <- shouldRestart
100110
when sr $ do
111+
snapBefore <- takeSnapshot multi
101112
void $ killThread <$> readMVar (producerThreadId multi)
102-
void $ swapMVar (producerThreadId multi) <$> reopenProducer multi
113+
new <- reopenProducer multi
114+
void $ swapMVar (producerThreadId multi) new
115+
snapAfter <- takeSnapshot multi
116+
putStrLn $
117+
"Restarting producer. Multiplexer updated: "
118+
<> A.encode snapBefore
119+
<> " -> "
120+
<> A.encode snapAfter
103121

104122
openChannel :: Multiplexer -> Text -> STM Channel
105123
openChannel multi chan = do

src/PostgresWebsockets/Config.hs

Lines changed: 68 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
1-
{-|
2-
Module : PostgresWebsockets.Config
3-
Description : Manages PostgresWebsockets configuration options.
4-
5-
This module provides a helper function to read the command line
6-
arguments using the AppConfig type to store
7-
them. It also can be used to define other middleware configuration that
8-
may be delegated to some sort of external configuration.
9-
-}
10-
module PostgresWebsockets.Config
11-
( prettyVersion
12-
, loadConfig
13-
, warpSettings
14-
, AppConfig (..)
15-
) where
16-
17-
import Env
18-
import Data.Text (intercalate, pack, replace, strip, stripPrefix)
19-
import Data.Version (versionBranch)
20-
import Paths_postgres_websockets (version)
21-
import Protolude hiding (intercalate, (<>), optional, replace, toS)
22-
import Protolude.Conv
23-
import Data.String (IsString(..))
24-
import Network.Wai.Handler.Warp
25-
import qualified Data.ByteString as BS
26-
import qualified Data.ByteString.Base64 as B64
1+
-- |
2+
-- Module : PostgresWebsockets.Config
3+
-- Description : Manages PostgresWebsockets configuration options.
4+
--
5+
-- This module provides a helper function to read the command line
6+
-- arguments using the AppConfig type to store
7+
-- them. It also can be used to define other middleware configuration that
8+
-- may be delegated to some sort of external configuration.
9+
module PostgresWebsockets.Config
10+
( prettyVersion,
11+
loadConfig,
12+
warpSettings,
13+
AppConfig (..),
14+
)
15+
where
16+
17+
import qualified Data.ByteString as BS
18+
import qualified Data.ByteString.Base64 as B64
19+
import Data.String (IsString (..))
20+
import Data.Text (intercalate, pack, replace, strip, stripPrefix)
21+
import Data.Version (versionBranch)
22+
import Env
23+
import Network.Wai.Handler.Warp
24+
import Paths_postgres_websockets (version)
25+
import Protolude hiding (intercalate, optional, replace, toS, (<>))
26+
import Protolude.Conv
2727

2828
-- | Config file settings for the server
29-
data AppConfig = AppConfig {
30-
configDatabase :: Text
31-
, configPath :: Maybe Text
32-
, configHost :: Text
33-
, configPort :: Int
34-
, configListenChannel :: Text
35-
, configMetaChannel :: Maybe Text
36-
, configJwtSecret :: ByteString
37-
, configJwtSecretIsBase64 :: Bool
38-
, configPool :: Int
39-
, configRetries :: Int
40-
, configReconnectInterval :: Int
29+
data AppConfig = AppConfig
30+
{ configDatabase :: Text,
31+
configPath :: Maybe Text,
32+
configHost :: Text,
33+
configPort :: Int,
34+
configListenChannel :: Text,
35+
configMetaChannel :: Maybe Text,
36+
configJwtSecret :: ByteString,
37+
configJwtSecretIsBase64 :: Bool,
38+
configPool :: Int,
39+
configRetries :: Int,
40+
configReconnectInterval :: Maybe Int
4141
}
4242

4343
-- | User friendly version number
@@ -50,54 +50,54 @@ loadConfig = readOptions >>= loadSecretFile >>= loadDatabaseURIFile
5050

5151
-- | Given a shutdown handler and an AppConfig builds a Warp Settings to start a stand-alone server
5252
warpSettings :: (IO () -> IO ()) -> AppConfig -> Settings
53-
warpSettings waitForShutdown AppConfig{..} =
54-
setHost (fromString $ toS configHost)
55-
. setPort configPort
56-
. setServerName (toS $ "postgres-websockets/" <> prettyVersion)
57-
. setTimeout 3600
58-
. setInstallShutdownHandler waitForShutdown
59-
. setGracefulShutdownTimeout (Just 5)
60-
$ defaultSettings
61-
53+
warpSettings waitForShutdown AppConfig {..} =
54+
setHost (fromString $ toS configHost)
55+
. setPort configPort
56+
. setServerName (toS $ "postgres-websockets/" <> prettyVersion)
57+
. setTimeout 3600
58+
. setInstallShutdownHandler waitForShutdown
59+
. setGracefulShutdownTimeout (Just 5)
60+
$ defaultSettings
6261

6362
-- private
6463

6564
-- | Function to read and parse options from the environment
6665
readOptions :: IO AppConfig
6766
readOptions =
68-
Env.parse (header "You need to configure some environment variables to start the service.") $
69-
AppConfig <$> var (str <=< nonempty) "PGWS_DB_URI" (help "String to connect to PostgreSQL")
70-
<*> optional (var str "PGWS_ROOT_PATH" (help "Root path to serve static files, unset to disable."))
71-
<*> var str "PGWS_HOST" (def "*4" <> helpDef show <> help "Address the server will listen for websocket connections")
72-
<*> var auto "PGWS_PORT" (def 3000 <> helpDef show <> help "Port the server will listen for websocket connections")
73-
<*> var str "PGWS_LISTEN_CHANNEL" (def "postgres-websockets-listener" <> helpDef show <> help "Master channel used in the database to send or read messages in any notification channel")
74-
<*> optional (var str "PGWS_META_CHANNEL" (help "Websockets channel used to send events about the server state changes."))
75-
<*> var str "PGWS_JWT_SECRET" (help "Secret used to sign JWT tokens used to open communications channels")
76-
<*> var auto "PGWS_JWT_SECRET_BASE64" (def False <> helpDef show <> help "Indicate whether the JWT secret should be decoded from a base64 encoded string")
77-
<*> var auto "PGWS_POOL_SIZE" (def 10 <> helpDef show <> help "How many connection to the database should be used by the connection pool")
78-
<*> var auto "PGWS_RETRIES" (def 5 <> helpDef show <> help "How many times it should try to connect to the database on startup before exiting with an error")
79-
<*> var auto "PGWS_CHECK_LISTENER_INTERVAL" (def 0 <> helpDef show <> help "Interval for supervisor thread to check if listener connection is alive. 0 to disable it.")
67+
Env.parse (header "You need to configure some environment variables to start the service.") $
68+
AppConfig <$> var (str <=< nonempty) "PGWS_DB_URI" (help "String to connect to PostgreSQL")
69+
<*> optional (var str "PGWS_ROOT_PATH" (help "Root path to serve static files, unset to disable."))
70+
<*> var str "PGWS_HOST" (def "*4" <> helpDef show <> help "Address the server will listen for websocket connections")
71+
<*> var auto "PGWS_PORT" (def 3000 <> helpDef show <> help "Port the server will listen for websocket connections")
72+
<*> var str "PGWS_LISTEN_CHANNEL" (def "postgres-websockets-listener" <> helpDef show <> help "Master channel used in the database to send or read messages in any notification channel")
73+
<*> optional (var str "PGWS_META_CHANNEL" (help "Websockets channel used to send events about the server state changes."))
74+
<*> var str "PGWS_JWT_SECRET" (help "Secret used to sign JWT tokens used to open communications channels")
75+
<*> var auto "PGWS_JWT_SECRET_BASE64" (def False <> helpDef show <> help "Indicate whether the JWT secret should be decoded from a base64 encoded string")
76+
<*> var auto "PGWS_POOL_SIZE" (def 10 <> helpDef show <> help "How many connection to the database should be used by the connection pool")
77+
<*> var auto "PGWS_RETRIES" (def 5 <> helpDef show <> help "How many times it should try to connect to the database on startup before exiting with an error")
78+
<*> optional (var auto "PGWS_CHECK_LISTENER_INTERVAL" (helpDef show <> help "Interval for supervisor thread to check if listener connection is alive. 0 to disable it."))
8079

8180
loadDatabaseURIFile :: AppConfig -> IO AppConfig
82-
loadDatabaseURIFile conf@AppConfig{..} =
81+
loadDatabaseURIFile conf@AppConfig {..} =
8382
case stripPrefix "@" configDatabase of
84-
Nothing -> pure conf
83+
Nothing -> pure conf
8584
Just filename -> setDatabase . strip <$> readFile (toS filename)
8685
where
8786
setDatabase uri = conf {configDatabase = uri}
8887

8988
loadSecretFile :: AppConfig -> IO AppConfig
9089
loadSecretFile conf = extractAndTransform secret
9190
where
92-
secret = decodeUtf8 $ configJwtSecret conf
93-
isB64 = configJwtSecretIsBase64 conf
91+
secret = decodeUtf8 $ configJwtSecret conf
92+
isB64 = configJwtSecretIsBase64 conf
9493

9594
extractAndTransform :: Text -> IO AppConfig
9695
extractAndTransform s =
97-
fmap setSecret $ transformString isB64 =<<
98-
case stripPrefix "@" s of
99-
Nothing -> return . encodeUtf8 $ s
100-
Just filename -> chomp <$> BS.readFile (toS filename)
96+
fmap setSecret $
97+
transformString isB64
98+
=<< case stripPrefix "@" s of
99+
Nothing -> return . encodeUtf8 $ s
100+
Just filename -> chomp <$> BS.readFile (toS filename)
101101
where
102102
chomp bs = fromMaybe bs (BS.stripSuffix "\n" bs)
103103

@@ -107,11 +107,10 @@ loadSecretFile conf = extractAndTransform secret
107107
transformString True t =
108108
case B64.decode $ encodeUtf8 $ strip $ replaceUrlChars $ decodeUtf8 t of
109109
Left errMsg -> panic $ pack errMsg
110-
Right bs -> return bs
110+
Right bs -> return bs
111111

112112
setSecret bs = conf {configJwtSecret = bs}
113113

114114
-- replace: Replace every occurrence of one substring with another
115115
replaceUrlChars =
116116
replace "_" "/" . replace "-" "+" . replace "." "="
117-

src/PostgresWebsockets/Context.hs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,45 @@
1-
{-|
2-
Module : PostgresWebsockets.Context
3-
Description : Produce a context capable of running postgres-websockets sessions
4-
-}
1+
-- |
2+
-- Module : PostgresWebsockets.Context
3+
-- Description : Produce a context capable of running postgres-websockets sessions
54
module PostgresWebsockets.Context
6-
( Context (..)
7-
, mkContext
8-
) where
5+
( Context (..),
6+
mkContext,
7+
)
8+
where
99

10-
import Protolude hiding (toS)
11-
import Protolude.Conv
10+
import Control.AutoUpdate
11+
( defaultUpdateSettings,
12+
mkAutoUpdate,
13+
updateAction,
14+
)
1215
import Data.Time.Clock (UTCTime, getCurrentTime)
13-
import Control.AutoUpdate ( defaultUpdateSettings
14-
, mkAutoUpdate
15-
, updateAction
16-
)
1716
import qualified Hasql.Pool as P
18-
19-
import PostgresWebsockets.Config ( AppConfig(..) )
20-
import PostgresWebsockets.HasqlBroadcast (newHasqlBroadcaster)
2117
import PostgresWebsockets.Broadcast (Multiplexer)
18+
import PostgresWebsockets.Config (AppConfig (..))
19+
import PostgresWebsockets.HasqlBroadcast (newHasqlBroadcaster)
20+
import Protolude hiding (toS)
21+
import Protolude.Conv
2222

23-
data Context = Context {
24-
ctxConfig :: AppConfig
25-
, ctxPool :: P.Pool
26-
, ctxMulti :: Multiplexer
27-
, ctxGetTime :: IO UTCTime
23+
data Context = Context
24+
{ ctxConfig :: AppConfig,
25+
ctxPool :: P.Pool,
26+
ctxMulti :: Multiplexer,
27+
ctxGetTime :: IO UTCTime
2828
}
2929

3030
-- | Given a configuration and a shutdown action (performed when the Multiplexer's listen connection dies) produces the context necessary to run sessions
3131
mkContext :: AppConfig -> IO () -> IO Context
32-
mkContext conf@AppConfig{..} shutdown = do
32+
mkContext conf@AppConfig {..} shutdownServer = do
3333
Context conf
3434
<$> P.acquire (configPool, 10, pgSettings)
3535
<*> newHasqlBroadcaster shutdown (toS configListenChannel) configRetries configReconnectInterval pgSettings
3636
<*> mkGetTime
3737
where
38+
shutdown =
39+
maybe
40+
shutdownServer
41+
(const $ putText "Producer thread is dead")
42+
configReconnectInterval
3843
mkGetTime :: IO (IO UTCTime)
3944
mkGetTime = mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}
4045
pgSettings = toS configDatabase

0 commit comments

Comments
 (0)