|
| 1 | +{- | |
| 2 | +Copyright: (c) 2018-2020 Kowainik, (c) 2020 Alexander Vershilov |
| 3 | +SPDX-License-Identifier: MPL-2.0 |
| 4 | +Maintainer: Alexander Vershilov <alexander.vershilov@gmail.com> |
| 5 | +
|
| 6 | +For the speed reasons you may want to dump logs asynchronously. |
| 7 | +This is especially useful when application threads are CPU |
| 8 | +bound while logs emitting is I/O bound. This approach |
| 9 | +allows to mitigate bottlenecks from the I/O. |
| 10 | +
|
| 11 | +When writing an application user should be aware of the tradeoffs |
| 12 | +that concurrent log system can provide, in this module we explain |
| 13 | +potential tradeoffs and describe if certain building blocks are |
| 14 | +affected or not. |
| 15 | +
|
| 16 | + 1. Unbounded memory usage - if there is no backpressure mechanism |
| 17 | + the user threads, they may generate more logs that can be |
| 18 | + written in the same amount of time. In those cases messages will |
| 19 | + be accumulated in memory. That will lead to extended GC times and |
| 20 | + application may be killed by the operating systems mechanisms. |
| 21 | +
|
| 22 | + 2. Persistence requirement - sometimes application may want to |
| 23 | + ensure that logs were written before it can continue. This is not |
| 24 | + a case with concurrent log systems in general, and some logs may |
| 25 | + be lost when application exits before dumping all logs. |
| 26 | +
|
| 27 | + 3. Non-precise logging - sometimes it may happen that there can be |
| 28 | + logs reordering (in case if thread was moved to another capability). |
| 29 | +
|
| 30 | +In case if your application is a subject of those problems you may |
| 31 | +consider not using concurrent logging system in other cases concurrent |
| 32 | +logger may be a good default for you. |
| 33 | +-} |
| 34 | + |
| 35 | +module Colog.Concurrent |
| 36 | + ( -- $general |
| 37 | + -- * Simple API. |
| 38 | + -- $simple-api |
| 39 | + withBackgroundLogger |
| 40 | + , defCapacity |
| 41 | + -- * Extended API |
| 42 | + -- $extended-api |
| 43 | + -- ** Background worker |
| 44 | + -- $background-worker |
| 45 | + , BackgroundWorker |
| 46 | + , backgroundWorkerWrite |
| 47 | + , killBackgroundLogger |
| 48 | + -- ** Background logger |
| 49 | + , forkBackgroundLogger |
| 50 | + , convertToLogAction |
| 51 | + -- ** Worker thread |
| 52 | + -- $worker-thread |
| 53 | + , mkBackgroundThread |
| 54 | + , runInBackgroundThread |
| 55 | + -- *** Usage example |
| 56 | + -- $worker-thread-usage |
| 57 | + ) where |
| 58 | + |
| 59 | +import Control.Applicative (many) |
| 60 | +import Control.Concurrent (forkFinally, killThread) |
| 61 | +import Control.Concurrent.STM (atomically, check, newTVarIO, readTVar, writeTVar) |
| 62 | +import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue) |
| 63 | +import Control.Exception (bracket, finally) |
| 64 | +import Control.Monad (forever, join) |
| 65 | +import Control.Monad.IO.Class (MonadIO (..)) |
| 66 | +import Data.Foldable (for_) |
| 67 | + |
| 68 | +import Colog.Concurrent.Internal (BackgroundWorker (..), Capacity (..)) |
| 69 | +import Colog.Core.Action (LogAction (..)) |
| 70 | + |
| 71 | + |
| 72 | +{- $general |
| 73 | +
|
| 74 | +Concurrent logger consists of the basic parts (see schema below). |
| 75 | +
|
| 76 | + 1. Logger in application thread. This logger is evaluated in the |
| 77 | + application thread and has an access to all the context available |
| 78 | + in that thread and monad, this logger can work in any @m@. |
| 79 | +
|
| 80 | + 2. Communication channel with backpressure support. In addition to |
| 81 | + the channel we have a converter that puts the user message to the |
| 82 | + communication channel. This converter works in the user thread. |
| 83 | + Such a logger usually works in 'IO' but it's possible to make it |
| 84 | + work in 'Control.Concurrent.STM.STM' as well. At this point library provides only 'IO' |
| 85 | + version, but it can be lifted to any 'MonadIO' by the user. |
| 86 | +
|
| 87 | + 3. Logger thread. This is the thread that performs actual write to |
| 88 | + the sinks. Loggers there do not have access to the users thread |
| 89 | + state, unless that state was passed in the message. |
| 90 | +
|
| 91 | +
|
| 92 | +@ |
| 93 | +
|
| 94 | + +-------------------------+ +--------------------------------+ |
| 95 | + | | | Logger | Sink-1 | |
| 96 | + | Application Thread | | Thread +---> | |
| 97 | + | ----------------- | +-----------+ | | +----------------+ |
| 98 | + | | | | +---------+ | +----------------+ |
| 99 | + | +-------------+ | channel | | Shared +-----> Sink-2 | |
| 100 | + | | application|| | +----> logger | | | | |
| 101 | + | | logger +-----> | +---------+ | +----------------+ |
| 102 | + | +-------------+ | | | | +----------------+ |
| 103 | + | | +-----------+ | +---> Sink3 | |
| 104 | + | | | | | |
| 105 | + | | | +----------------+ |
| 106 | + | | | | |
| 107 | + +-------------------------+ +--------------------------------+ |
| 108 | +@ |
| 109 | +
|
| 110 | +So usually user should write the logging system in the way that all 'LogAction' |
| 111 | +that populate and filter information should live in the application logger. |
| 112 | +All loggers that do serialization and formatting should live in shared logger. |
| 113 | +
|
| 114 | +
|
| 115 | +If more concurrency is needed it's possible to build multilayer systems: |
| 116 | +
|
| 117 | +
|
| 118 | +@ |
| 119 | + +-------------+ +-------+ |
| 120 | + | application |---+ +---| sink-1| |
| 121 | + +-------------+ | +---------+ | +-------+ |
| 122 | + +---| logger |---+ |
| 123 | + +---------+ | +-------+ |
| 124 | + +---| sink-2| |
| 125 | + +-------+ |
| 126 | +@ |
| 127 | +
|
| 128 | +In this approach application will be concurrently write logs to the logger, then |
| 129 | +logger will be concurrently writing to all sinks. |
| 130 | +-} |
| 131 | + |
| 132 | +{- $simple-api |
| 133 | +
|
| 134 | +Simple API provides a handy easy to use API that can be used directly |
| 135 | +in application without dealing with internals. Based on users feedback |
| 136 | +internal implementation of the simple API may change, especially in early |
| 137 | +versions of the library. But the guarantee that we give is that no matter |
| 138 | +what implementation is it will be kept with reasonable defaults and will |
| 139 | +be applicable to a generic application. |
| 140 | +-} |
| 141 | + |
| 142 | +{- | An exception safe way to create background logger. This method will fork |
| 143 | +a thread that will run 'shared worker', see schema above. |
| 144 | +
|
| 145 | +@Capacity@ - provides a backpressure mechanism and tells how many messages |
| 146 | +in flight are allowed. In most cases 'defCapacity' will work well. |
| 147 | +See 'forkBackgroundLogger' for more details. |
| 148 | +
|
| 149 | +@LogAction@ - provides a logger action, this action does not have access to the |
| 150 | +application state or thread info, so you should only pass methods that serialize |
| 151 | +and dump data there. |
| 152 | +
|
| 153 | +@ |
| 154 | +main :: IO () |
| 155 | +main = |
| 156 | + 'withBackgroundLogger' |
| 157 | + 'defCapacity' |
| 158 | + 'Colog.Actions.logByteStringStdout' |
| 159 | + (\log -> 'Colog.Monad.usingLoggerT' log $ __do__ |
| 160 | + 'Colog.Monad.logMsg' \@ByteString "Starting application..." |
| 161 | + 'Colog.Monad.logMsg' \@ByteString "Finishing application..." |
| 162 | + ) |
| 163 | +@ |
| 164 | +-} |
| 165 | +withBackgroundLogger |
| 166 | + :: MonadIO m |
| 167 | + => Capacity -- ^ Capacity of messages to handle; bounded channel size |
| 168 | + -> LogAction IO msg -- ^ Action that will be used in a forked thread |
| 169 | + -> (LogAction m msg -> IO a) -- ^ Continuation action |
| 170 | + -> IO a |
| 171 | +withBackgroundLogger cap logger action = |
| 172 | + bracket (forkBackgroundLogger cap logger) |
| 173 | + killBackgroundLogger |
| 174 | + (action . convertToLogAction) |
| 175 | + |
| 176 | +-- | Default capacity size, (4096) |
| 177 | +defCapacity :: Capacity |
| 178 | +defCapacity = Capacity 4096 |
| 179 | + |
| 180 | + |
| 181 | +{- $extended-api |
| 182 | +
|
| 183 | +Extended API explains how asynchronous logging is working and provides basic |
| 184 | +building blocks for writing your own combinators. This is the part of the public |
| 185 | +API and will not change without prior notice. |
| 186 | +-} |
| 187 | + |
| 188 | +{- $background-worker |
| 189 | +The main abstraction for the concurrent worker is 'BackgroundWorker'. This |
| 190 | +is a wrapper of the thread, that has communication channel to talk to, and threadId. |
| 191 | +
|
| 192 | +Background worker may provide a backpressure mechanism, but does not provide |
| 193 | +notification of completeness unless it's included in the message itself. |
| 194 | +-} |
| 195 | + |
| 196 | +{- | Stop background logger thread. |
| 197 | +
|
| 198 | +The thread is blocked until background thread will finish processing |
| 199 | +all messages that were written in the channel. |
| 200 | +-} |
| 201 | +killBackgroundLogger :: BackgroundWorker msg -> IO () |
| 202 | +killBackgroundLogger bl = do |
| 203 | + killThread (backgroundWorkerThreadId bl) |
| 204 | + atomically $ readTVar (backgroundWorkerIsAlive bl) >>= check . not |
| 205 | + |
| 206 | +{- $background-logger |
| 207 | +
|
| 208 | +Background logger is specialized version of the 'BackgroundWorker' process. |
| 209 | +Instead of running any job it will accept @msg@ type |
| 210 | +instead and process it with a single logger defined at creation time. |
| 211 | +-} |
| 212 | + |
| 213 | +{- | Creates background logger with given @Capacity@, |
| 214 | +takes a 'LogAction' that should describe how to write |
| 215 | +logs. |
| 216 | +
|
| 217 | +@capacity@ - parameter tells how many in flight messages are allowed, |
| 218 | +if that value is reached then user's thread that emits logs will be |
| 219 | +blocked until any message will be written. Usually if value should be |
| 220 | +chosen reasonably high and if this value is reached it means that |
| 221 | +the application environment experience severe problems. |
| 222 | +
|
| 223 | +__N.B.__ The 'LogAction' will be run in the background |
| 224 | +thread so that logger should not add any thread specific |
| 225 | +context to the message. |
| 226 | +
|
| 227 | +__N.B.__ On exit, even in case of exception thread will dump all values |
| 228 | +that are in the queue. But it will stop doing that in case if another |
| 229 | +exception will happen. |
| 230 | +-} |
| 231 | +forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO (BackgroundWorker msg) |
| 232 | +forkBackgroundLogger (Capacity cap) logAction = do |
| 233 | + queue <- newTBQueueIO cap |
| 234 | + isAlive <- newTVarIO True |
| 235 | + tid <- forkFinally |
| 236 | + (forever $ do |
| 237 | + msg <- atomically $ readTBQueue queue |
| 238 | + unLogAction logAction msg) |
| 239 | + (\_ -> |
| 240 | + (do msgs <- atomically $ many $ readTBQueue queue |
| 241 | + for_ msgs $ unLogAction logAction) |
| 242 | + `finally` atomically (writeTVar isAlive False)) |
| 243 | + pure $ BackgroundWorker tid (writeTBQueue queue) isAlive |
| 244 | + |
| 245 | + |
| 246 | +{- | Convert a given 'BackgroundWorker msg' into a 'LogAction msg' |
| 247 | +that will send log message to the background thread, |
| 248 | +without blocking the thread. |
| 249 | +
|
| 250 | +If logger dies for any reason then thread that emits |
| 251 | +logs will receive 'BlockedIndefinitelyOnSTM' exception. |
| 252 | +
|
| 253 | +You can extend result worker with all functionality available |
| 254 | +with co-log. This logger will have an access to the thread |
| 255 | +state. |
| 256 | +-} |
| 257 | +convertToLogAction :: MonadIO m => BackgroundWorker msg -> LogAction m msg |
| 258 | +convertToLogAction logger = LogAction $ \msg -> |
| 259 | + liftIO $ atomically $ backgroundWorkerWrite logger msg |
| 260 | + |
| 261 | +{- $worker-thread |
| 262 | +
|
| 263 | +While generic background logger is enough for the most |
| 264 | +of the usecases, sometimes you may want even more. |
| 265 | +
|
| 266 | +There are at least two cases where that may happen: |
| 267 | +
|
| 268 | + 1. You need to modify logger, for example different |
| 269 | + threads wants to write to different sources. Or you |
| 270 | + want to change lgo mechanism in runtime. |
| 271 | +
|
| 272 | + 2. You may want to implement some notification |
| 273 | + machinery that allows you to guarantee that your |
| 274 | + logs were written before processing further. |
| 275 | +
|
| 276 | +In order to solve those problems worker thread abstraction |
| 277 | +was introduced. This is a worker that accepts any action |
| 278 | +and performs that. |
| 279 | +-} |
| 280 | + |
| 281 | +{- | Create a background worker with a given capacity. |
| 282 | +If capacity is reached, then the thread that tries to |
| 283 | +write logs will be blocked. |
| 284 | +
|
| 285 | +This method is more generic than 'forkBackgroundLogger' but |
| 286 | +it's less effective, as you have to pass entire closure to |
| 287 | +be run and that leads to extra memory usage and indirect calls |
| 288 | +happening. |
| 289 | +
|
| 290 | +When closed it will dump all pending messages, unless |
| 291 | +another asynchronous exception will arrive, or synchronous |
| 292 | +exception will happen during the logging. |
| 293 | +-} |
| 294 | +mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ())) |
| 295 | +mkBackgroundThread (Capacity cap) = do |
| 296 | + queue <- newTBQueueIO cap |
| 297 | + isAlive <- newTVarIO True |
| 298 | + tid <- forkFinally |
| 299 | + (forever $ join $ atomically $ readTBQueue queue) |
| 300 | + (\_ -> |
| 301 | + (sequence_ =<< atomically (many $ readTBQueue queue)) |
| 302 | + `finally` atomically (writeTVar isAlive False)) |
| 303 | + pure $ BackgroundWorker tid (writeTBQueue queue) isAlive |
| 304 | + |
| 305 | +{- | Run logger action asynchronously in the worker thread. |
| 306 | +Logger is executed in the other thread entirely, so if |
| 307 | +logger takes any thread related context it will be |
| 308 | +read from the other thread. |
| 309 | +-} |
| 310 | +runInBackgroundThread :: BackgroundWorker (IO ()) -> LogAction IO msg -> LogAction IO msg |
| 311 | +runInBackgroundThread bt logAction = LogAction $ \msg -> |
| 312 | + atomically $ backgroundWorkerWrite bt $ unLogAction logAction msg |
| 313 | + |
| 314 | +{- $worker-thread-usage |
| 315 | +
|
| 316 | +Consider following example. (Leaving resource control aside). |
| 317 | +
|
| 318 | +@ |
| 319 | +data M msg = M (MVar ()) msg |
| 320 | +
|
| 321 | +notificationLogger :: MonadIO m => LoggerAction m msg -> LoggerAction m (M msg) |
| 322 | +notificationLogger logger = 'LogAction' $ \(M lock msg) -> |
| 323 | + (unLogger logger msg) `finally` (putMVar lock ()) |
| 324 | +
|
| 325 | +example = __do__ |
| 326 | + worker <- 'mkBackgroundThread' 'defCapacity' |
| 327 | + lock <- newEmptyMVar |
| 328 | + -- Log message with default logger. |
| 329 | + 'unLogger' |
| 330 | + ('runInBackgroundThread' worker |
| 331 | + (notificationLogger $ 'Colog.Action.withLogByteStringFile' "\/var\/log\/myapp\/log") |
| 332 | + (M lock "my message") |
| 333 | + -- Log message with a different logger. |
| 334 | + 'unLogger' |
| 335 | + ('runInBackgroundThread' worker |
| 336 | + ('Colog.Action.withLogByteStringFile' "/var/log/myapp/log") |
| 337 | + ("another message") |
| 338 | + -- Block until first message is logged. |
| 339 | + _ <- takeMVar lock |
| 340 | +@ |
| 341 | +-} |
0 commit comments