Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libs/wire-subsystems/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
, aeson-pretty
, amazonka
, amazonka-core
, amazonka-dynamodb
, amazonka-ses
, amazonka-sqs
, amqp
, async
, attoparsec
Expand Down Expand Up @@ -121,7 +123,9 @@ mkDerivation {
aeson-pretty
amazonka
amazonka-core
amazonka-dynamodb
amazonka-ses
amazonka-sqs
amqp
async
attoparsec
Expand Down Expand Up @@ -223,7 +227,9 @@ mkDerivation {
aeson-pretty
amazonka
amazonka-core
amazonka-dynamodb
amazonka-ses
amazonka-sqs
amqp
async
attoparsec
Expand Down
65 changes: 65 additions & 0 deletions libs/wire-subsystems/src/Wire/AWSSubsystem.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TemplateHaskell #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2025 Wire Swiss GmbH <[email protected]>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Wire.AWSSubsystem where

import Amazonka qualified as AWS
import Amazonka.SQS qualified as SQS
import Control.Monad.Catch
import Data.Aeson hiding ((.=))
import Data.ByteString.Lazy qualified as BL
import Data.UUID hiding (null)
import Imports hiding (group)
import Polysemy (makeSem)

data AWSSubsystem m r where
RunAwsRequest ::
forall a m.
( AWS.AWSRequest a,
Typeable a,
Typeable (AWS.AWSResponse a)
) =>
a ->
AWSSubsystem m (Either AWS.Error (AWS.AWSResponse a))
RunAwsRequestThrow ::
forall a m.
( AWS.AWSRequest a,
Typeable a,
Typeable (AWS.AWSResponse a)
) =>
a ->
AWSSubsystem m (AWS.AWSResponse a)
GetQueueUrl :: Text -> AWSSubsystem m Text
GetJournalQueueUrl :: AWSSubsystem m (Maybe Text)
Listen :: forall a m. (FromJSON a, Show a) => Int -> Text -> (a -> m ()) -> AWSSubsystem m ()
EnqueueStandard :: Text -> BL.ByteString -> AWSSubsystem m SQS.SendMessageResponse
EnqueueFIFO :: Text -> Text -> UUID -> BL.ByteString -> AWSSubsystem m SQS.SendMessageResponse

makeSem ''AWSSubsystem

data AWSSubsystemError where
GeneralError :: (Show e, AWS.AsError e) => e -> AWSSubsystemError
SESInvalidDomain :: AWSSubsystemError

deriving instance Show AWSSubsystemError

deriving instance Typeable AWSSubsystemError

instance Exception AWSSubsystemError
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE NoFieldSelectors #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <[email protected]>
-- Copyright (C) 2025 Wire Swiss GmbH <[email protected]>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
Expand All @@ -18,37 +21,14 @@
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Brig.AWS
( -- * Monad
Env (..),
mkEnv,
Amazon,
amazonkaEnv,
execute,
sesQueue,
userJournalQueue,
prekeyTable,
Error (..),

-- * SQS
listen,
enqueueFIFO,
enqueueStandard,
getQueueUrl,

-- * AWS
exec,
execCatch,
)
where
module Wire.AWSSubsystem.AWS where

import Amazonka (AWSRequest, AWSResponse)
import Amazonka qualified as AWS
import Amazonka.DynamoDB qualified as DDB
import Amazonka.SES qualified as SES
import Amazonka.SQS qualified as SQS
import Amazonka.SQS.Lens qualified as SQS
import Brig.Options qualified as Opt
import Control.Lens hiding ((.=))
import Control.Monad.Catch
import Control.Monad.Trans.Resource
Expand All @@ -60,14 +40,16 @@ import Data.Text.Encoding qualified as Text
import Data.UUID hiding (null)
import Imports hiding (group)
import Network.HTTP.Client (Manager)
import Polysemy (runM)
import Polysemy hiding (send)
import Polysemy.Final
import Polysemy.Input (runInputConst)
import System.Logger qualified as Logger
import System.Logger.Class
import UnliftIO.Async
import UnliftIO.Exception
import Util.Options
import Wire.AWS
import Wire.AWSSubsystem (AWSSubsystem (..), AWSSubsystemError (..))

data Env = Env
{ _logger :: !Logger,
Expand All @@ -82,7 +64,7 @@ makeLenses ''Env
newtype Amazon a = Amazon
{ unAmazon :: ReaderT Env (ResourceT IO) a
}
deriving
deriving newtype
( Functor,
Applicative,
Monad,
Expand All @@ -98,27 +80,53 @@ newtype Amazon a = Amazon
instance MonadLogger Amazon where
log l m = view logger >>= \g -> Logger.log g l m

mkEnv :: Logger -> Opt.AWSOpts -> Maybe Opt.EmailAWSOpts -> Manager -> IO Env
data AWSOpts = AWSOpts
{ -- | Event journal queue for user events
-- (e.g. user deletion)
userJournalQueue :: !(Maybe Text),
-- | Dynamo table for storing prekey data
prekeyTable :: !Text,
-- | AWS SQS endpoint
sqsEndpoint :: !AWSEndpoint,
-- | DynamoDB endpoint
dynamoDBEndpoint :: !(Maybe AWSEndpoint)
}
deriving (Show, Generic)
deriving anyclass (FromJSON)

data EmailAWSOpts = EmailAWSOpts
{ -- | Event feedback queue for SES
-- (e.g. for email bounces and complaints)
sesQueue :: !Text,
-- | AWS SES endpoint
sesEndpoint :: !AWSEndpoint
}
deriving (Show, Generic)
deriving anyclass (FromJSON)

mkEnv :: Logger -> AWSOpts -> Maybe EmailAWSOpts -> Manager -> IO Env
mkEnv lgr opts emailOpts mgr = do
let g = Logger.clone (Just "aws.brig") lgr
let pk = Opt.prekeyTable opts
let sesEndpoint = mkEndpoint SES.defaultService . Opt.sesEndpoint <$> emailOpts
let dynamoEndpoint = mkEndpoint DDB.defaultService <$> Opt.dynamoDBEndpoint opts
let pk = opts.prekeyTable
let sesEndpoint = mkEndpoint SES.defaultService . (.sesEndpoint) <$> emailOpts
let dynamoEndpoint = mkEndpoint DDB.defaultService <$> opts.dynamoDBEndpoint
e <-
mkAwsEnv
g
sesEndpoint
dynamoEndpoint
(mkEndpoint SQS.defaultService (Opt.sqsEndpoint opts))
sq <- maybe (pure Nothing) (fmap Just . getQueueUrl e . Opt.sesQueue) emailOpts
jq <- maybe (pure Nothing) (fmap Just . getQueueUrl e) (Opt.userJournalQueue opts)
(mkEndpoint SQS.defaultService opts.sqsEndpoint)
sq <- maybe (pure Nothing) (fmap Just . getQueueUrl e . (.sesQueue)) emailOpts
jq <- maybe (pure Nothing) (fmap Just . getQueueUrl e) opts.userJournalQueue
pure (Env g sq jq pk e)
where
mkEndpoint svc e = AWS.setEndpoint (e ^. awsSecure) (e ^. awsHost) (e ^. awsPort) svc
mkAwsEnv g ses dyn sqs = do
baseEnv <-
AWS.newEnv AWS.discover
<&> AWS.configureService sqs . maybe id AWS.configureService dyn . maybe id AWS.configureService ses
<&> AWS.configureService sqs
. maybe id AWS.configureService dyn
. maybe id AWS.configureService ses
pure $
baseEnv
{ AWS.logger = awsLogger g,
Expand All @@ -139,30 +147,17 @@ mkEnv lgr opts emailOpts mgr = do
-- they are still revealed on debug level.
mapLevel AWS.Error = Logger.Debug

---------------------------------------------------------

-- | Variant of getQueueUrlImpl for calling during Env construction.
getQueueUrl ::
(MonadUnliftIO m, MonadCatch m) =>
AWS.Env ->
Text ->
m Text
getQueueUrl e q = view SQS.getQueueUrlResponse_queueUrl <$> exec e (SQS.newGetQueueUrl q)

execute :: (MonadIO m) => Env -> Amazon a -> m a
execute e m = liftIO $ runResourceT (runReaderT (unAmazon m) e)

data Error where
GeneralError :: (Show e, AWS.AsError e) => e -> Error
SESInvalidDomain :: Error

deriving instance Show Error
getQueueUrl e q = view SQS.getQueueUrlResponse_queueUrl <$> runAwsRequestThrow e (SQS.newGetQueueUrl q)

deriving instance Typeable Error

instance Exception Error

--------------------------------------------------------------------------------
-- SQS

listen :: (FromJSON a, Show a) => Int -> Text -> (a -> IO ()) -> Amazon ()
listen :: (FromJSON a, Show a) => Int -> Text -> (a -> IO x) -> Amazon y
listen throttleMillis url callback = forever . handleAny unexpectedError $ do
msgs <- fromMaybe [] . view SQS.receiveMessageResponse_messages <$> send receive
void $ mapConcurrently onMessage msgs
Expand All @@ -178,7 +173,7 @@ listen throttleMillis url callback = forever . handleAny unexpectedError $ do
Left e -> err $ msg (val "Failed to parse SQS event") . field "error" e . field "message" (show m)
Right n -> do
debug $ msg (val "Received SQS event") . field "event" (show n)
liftIO $ callback n
liftIO $ void $ callback n
for_ (m ^. SQS.message_receiptHandle) (void . send . SQS.newDeleteMessage url)
unexpectedError x = do
err $ "error" .= show x ~~ msg (val "Failed to read or process message from SQS")
Expand Down Expand Up @@ -216,7 +211,7 @@ sendCatchAmazon req = do
throwA :: Either AWS.Error a -> Amazon a
throwA = either (throwM . GeneralError) pure

execCatch ::
runAwsRequest ::
( AWSRequest a,
Typeable a,
MonadUnliftIO m,
Expand All @@ -226,12 +221,12 @@ execCatch ::
AWS.Env ->
a ->
m (Either AWS.Error (AWSResponse a))
execCatch e cmd =
runAwsRequest e cmd =
runResourceT $
AWS.trying AWS._Error $
AWS.send e cmd

exec ::
runAwsRequestThrow ::
( AWSRequest a,
Typeable a,
Typeable (AWSResponse a),
Expand All @@ -241,7 +236,33 @@ exec ::
AWS.Env ->
a ->
m (AWSResponse a)
exec e cmd = liftIO (execCatch e cmd) >>= either (throwM . GeneralError) pure
runAwsRequestThrow e cmd = liftIO (runAwsRequest e cmd) >>= either (throwM . GeneralError) pure

retry5x :: (Monad m) => RetryPolicyM m
retry5x = limitRetries 5 <> exponentialBackoff 100000

--------------------------------------------------------------------------------
-- Polysemy Interpreter

-- | Run AWSSubsystem effect by interpreting it into the Amazon monad.
-- Uses Final IO strategy for the higher-order Listen effect.
runAWSSubsystem ::
(Member (Final IO) r) =>
Env ->
Sem (AWSSubsystem : r) a ->
Sem r a
runAWSSubsystem env = interpretFinal $ \case
RunAwsRequest x -> liftS @IO $ runAwsRequest env._amazonkaEnv x
RunAwsRequestThrow x -> liftS @IO $ runAwsRequestThrow env._amazonkaEnv x
GetQueueUrl queueName -> liftS @IO $ do
resp <- runResourceT $ AWS.send env._amazonkaEnv (SQS.newGetQueueUrl queueName)
pure $ view SQS.getQueueUrlResponse_queueUrl resp
GetJournalQueueUrl -> liftS $ pure (env ^. userJournalQueue)
EnqueueStandard url message -> liftS $ do
runResourceT $ runReaderT ((enqueueStandard url message).unAmazon) env
EnqueueFIFO url group dedupId message -> liftS $ do
runResourceT $ runReaderT ((enqueueFIFO url group dedupId message).unAmazon) env
Listen throttle url callback -> do
callbackS <- bindS callback
s <- getInitialStateS
pure $ runResourceT $ runReaderT ((listen throttle url $ callbackS . (s $>)).unAmazon) env
4 changes: 4 additions & 0 deletions libs/wire-subsystems/wire-subsystems.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ common common-all
, aeson-pretty
, amazonka
, amazonka-core
, amazonka-dynamodb
, amazonka-ses
, amazonka-sqs
, amqp
, async
, attoparsec
Expand Down Expand Up @@ -193,6 +195,8 @@ library
Wire.AuthenticationSubsystem.Interpreter
Wire.AuthenticationSubsystem.ZAuth
Wire.AWS
Wire.AWSSubsystem
Wire.AWSSubsystem.AWS
Wire.BlockListStore
Wire.BlockListStore.Cassandra
Wire.BrigAPIAccess
Expand Down
2 changes: 0 additions & 2 deletions services/brig/brig.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ library
Brig.API.User
Brig.API.Util
Brig.App
Brig.AWS
Brig.AWS.SesNotification
Brig.AWS.Types
Brig.Budget
Expand Down Expand Up @@ -213,7 +212,6 @@ library
, amazonka >=2
, amazonka-core >=2
, amazonka-dynamodb >=2
, amazonka-ses >=2
, amazonka-sqs >=2
, amqp
, async >=2.1
Expand Down
2 changes: 0 additions & 2 deletions services/brig/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
, amazonka
, amazonka-core
, amazonka-dynamodb
, amazonka-ses
, amazonka-sqs
, amqp
, async
Expand Down Expand Up @@ -165,7 +164,6 @@ mkDerivation {
amazonka
amazonka-core
amazonka-dynamodb
amazonka-ses
amazonka-sqs
amqp
async
Expand Down
Loading