From 41f9715f72131a38b8c6e962a7cd141983f9d52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=9C=BF=20corey?= Date: Mon, 14 Aug 2023 11:32:09 -0700 Subject: [PATCH] Add Kafka consumer module (#113) * Kafka consumer module Transplanted from: https://github.com/freckle/progress/pull/42 * Documentation * Inline `keyValueMap` This isn't a novel enough function to warrant inclusion into `Freckle.App.Env` * Abstract `timeout` from Database This is planned to be used for reading in the polling timeout setting in progress consumer * Formatting * Fix doctest * Use `Timeout` in `runConsumer` * Use `logDebug` * Move Kafka to Producer module * `v1.9.2.0` * Poll single topic * Commit offset only after processing valid message --- CHANGELOG.md | 6 + freckle-app.cabal | 6 +- library/Freckle/App/Database.hs | 43 +------ library/Freckle/App/Env.hs | 37 ++++++ library/Freckle/App/Kafka.hs | 154 +----------------------- library/Freckle/App/Kafka/Consumer.hs | 166 ++++++++++++++++++++++++++ library/Freckle/App/Kafka/Producer.hs | 153 ++++++++++++++++++++++++ package.yaml | 2 +- 8 files changed, 377 insertions(+), 190 deletions(-) create mode 100644 library/Freckle/App/Kafka/Consumer.hs create mode 100644 library/Freckle/App/Kafka/Producer.hs diff --git a/CHANGELOG.md b/CHANGELOG.md index e67c9a72..52fb55b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## [_Unreleased_](https://github.com/freckle/freckle-app/compare/v1.9.1.1...main) +## [v1.9.2.0](https://github.com/freckle/freckle-app/compare/v1.9.1.1...v1.9.2.0) + +- Add `Freckle.App.Kafka.Consumer` for consuming Kafka events. +- Move producer to `Freckle.App.Kafka.Producer`. +- Re-export both modules in `Freckle.App.Kafka`. + ## [v1.9.1.1](https://github.com/freckle/freckle-app/compare/v1.9.1.0...v1.9.1.1) - Add `KafkaProducerPoolConfig` for controlling Kafka producer pool parameters diff --git a/freckle-app.cabal b/freckle-app.cabal index f0ce790b..89955335 100644 --- a/freckle-app.cabal +++ b/freckle-app.cabal @@ -1,11 +1,11 @@ cabal-version: 1.18 --- This file has been generated from package.yaml by hpack version 0.35.1. +-- This file has been generated from package.yaml by hpack version 0.35.2. -- -- see: https://github.com/sol/hpack name: freckle-app -version: 1.9.1.1 +version: 1.9.2.0 synopsis: Haskell application toolkit used at Freckle description: Please see README.md category: Utils @@ -43,6 +43,8 @@ library Freckle.App.Http.Paginate Freckle.App.Http.Retry Freckle.App.Kafka + Freckle.App.Kafka.Consumer + Freckle.App.Kafka.Producer Freckle.App.Memcached Freckle.App.Memcached.CacheKey Freckle.App.Memcached.CacheTTL diff --git a/library/Freckle/App/Database.hs b/library/Freckle/App/Database.hs index d13f8b65..59f49974 100644 --- a/library/Freckle/App/Database.hs +++ b/library/Freckle/App/Database.hs @@ -13,7 +13,7 @@ module Freckle.App.Database , PostgresConnectionConf (..) , PostgresPasswordSource (..) , PostgresPassword (..) - , PostgresStatementTimeout (..) + , PostgresStatementTimeout , postgresStatementTimeoutMilliseconds , envParseDatabaseConf , envPostgresPasswordSource @@ -28,7 +28,6 @@ import Control.Monad.Reader import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as BS8 import qualified Data.ByteString.Lazy as BSL -import Data.Char (isDigit) import Data.Pool import qualified Data.Text as T import Database.Persist.Postgresql @@ -47,6 +46,7 @@ import Database.PostgreSQL.Simple , execute ) import Database.PostgreSQL.Simple.SqlQQ (sql) +import Freckle.App.Env (Timeout (..)) import qualified Freckle.App.Env as Env import Freckle.App.OpenTelemetry (MonadTracer (..)) import Freckle.App.Stats (HasStatsClient) @@ -58,7 +58,6 @@ import UnliftIO.Concurrent (threadDelay) import UnliftIO.Exception (displayException) import UnliftIO.IORef import Yesod.Core.Types (HandlerData (..), RunHandlerEnv (..)) -import qualified Prelude as Unsafe (read) type SqlPool = Pool SqlBackend @@ -149,41 +148,12 @@ data PostgresPassword | PostgresPasswordStatic String deriving stock (Show, Eq) -data PostgresStatementTimeout - = PostgresStatementTimeoutSeconds Int - | PostgresStatementTimeoutMilliseconds Int - deriving stock (Show, Eq) +type PostgresStatementTimeout = Timeout postgresStatementTimeoutMilliseconds :: PostgresStatementTimeout -> Int postgresStatementTimeoutMilliseconds = \case - PostgresStatementTimeoutSeconds s -> s * 1000 - PostgresStatementTimeoutMilliseconds ms -> ms - --- | Read @PGSTATEMENTTIMEOUT@ as seconds or milliseconds --- --- >>> readPostgresStatementTimeout "10" --- Right (PostgresStatementTimeoutSeconds 10) --- --- >>> readPostgresStatementTimeout "10s" --- Right (PostgresStatementTimeoutSeconds 10) --- --- >>> readPostgresStatementTimeout "10ms" --- Right (PostgresStatementTimeoutMilliseconds 10) --- --- >>> readPostgresStatementTimeout "20m" --- Left "..." --- --- >>> readPostgresStatementTimeout "2m0" --- Left "..." -readPostgresStatementTimeout - :: String -> Either String PostgresStatementTimeout -readPostgresStatementTimeout x = case span isDigit x of - ("", _) -> Left "must be {digits}(s|ms)" - (digits, "") -> Right $ PostgresStatementTimeoutSeconds $ Unsafe.read digits - (digits, "s") -> Right $ PostgresStatementTimeoutSeconds $ Unsafe.read digits - (digits, "ms") -> - Right $ PostgresStatementTimeoutMilliseconds $ Unsafe.read digits - _ -> Left "must be {digits}(s|ms)" + TimeoutSeconds s -> s * 1000 + TimeoutMilliseconds ms -> ms envPostgresPasswordSource :: Env.Parser Env.Error PostgresPasswordSource envPostgresPasswordSource = @@ -207,8 +177,7 @@ envParseDatabaseConf source = do poolSize <- Env.var Env.auto "PGPOOLSIZE" $ Env.def 10 schema <- optional $ Env.var Env.nonempty "PGSCHEMA" mempty statementTimeout <- - Env.var (Env.eitherReader readPostgresStatementTimeout) "PGSTATEMENTTIMEOUT" $ - Env.def (PostgresStatementTimeoutSeconds 120) + Env.var Env.timeout "PGSTATEMENTTIMEOUT" $ Env.def (TimeoutSeconds 120) pure PostgresConnectionConf { pccHost = host diff --git a/library/Freckle/App/Env.hs b/library/Freckle/App/Env.hs index 9b07ebf5..2c34047c 100644 --- a/library/Freckle/App/Env.hs +++ b/library/Freckle/App/Env.hs @@ -28,21 +28,25 @@ module Freckle.App.Env , flag -- * Extensions + , Timeout (..) , kept , eitherReader , time , keyValues + , timeout ) where import Freckle.App.Prelude import Control.Error.Util (note) +import Data.Char (isDigit) import qualified Data.Text as T import Data.Time (defaultTimeLocale, parseTimeM) import Env hiding (flag) import qualified Env import Env.Internal.Free (hoistAlt) import Env.Internal.Parser (Parser (..), VarF (..)) +import qualified Prelude as Unsafe (read) -- | Designates the value of a parameter when a flag is not provided. newtype Off a = Off a @@ -139,3 +143,36 @@ keyValues = eitherReader $ traverse keyValue . T.splitOn "," . pack (k, v) | T.null v -> Left $ "Key " <> unpack k <> " has no value" (k, v) | T.null k -> Left $ "Value " <> unpack v <> " has no key" (k, v) -> Right (k, v) + +-- | Represents a timeout in seconds or milliseconds +data Timeout + = TimeoutSeconds Int + | TimeoutMilliseconds Int + deriving stock (Show, Eq) + +-- | Read a timeout value as seconds or milliseconds +-- +-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10")] +-- Right (TimeoutSeconds 10) +-- +-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10s")] +-- Right (TimeoutSeconds 10) +-- +-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10ms")] +-- Right (TimeoutMilliseconds 10) +-- +-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "20m")] +-- Left [("TIMEOUT",UnreadError "must be {digits}(s|ms): \"20m\"")] +-- +-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "2m0")] +-- Left [("TIMEOUT",UnreadError "must be {digits}(s|ms): \"2m0\"")] +timeout :: Reader Error Timeout +timeout = eitherReader $ parseTimeout . span isDigit + where + parseTimeout = \case + ("", _) -> Left "must be {digits}(s|ms)" + (digits, "") -> Right $ TimeoutSeconds $ Unsafe.read digits + (digits, "s") -> Right $ TimeoutSeconds $ Unsafe.read digits + (digits, "ms") -> + Right $ TimeoutMilliseconds $ Unsafe.read digits + _ -> Left "must be {digits}(s|ms)" diff --git a/library/Freckle/App/Kafka.hs b/library/Freckle/App/Kafka.hs index a4e46fe3..502ea58b 100644 --- a/library/Freckle/App/Kafka.hs +++ b/library/Freckle/App/Kafka.hs @@ -1,153 +1,7 @@ -{-# LANGUAGE ApplicativeDo #-} -{-# LANGUAGE NamedFieldPuns #-} - module Freckle.App.Kafka - ( envKafkaBrokerAddresses - , KafkaProducerPoolConfig (..) - , envKafkaProducerPoolConfig - , KafkaProducerPool (..) - , HasKafkaProducerPool (..) - , createKafkaProducerPool - , produceKeyedOn - , produceKeyedOnAsync + ( module Freckle.App.Kafka.Consumer + , module Freckle.App.Kafka.Producer ) where -import Freckle.App.Prelude - -import Blammo.Logging -import Control.Lens (Lens', view) -import Data.Aeson (ToJSON, encode) -import Data.ByteString.Lazy (toStrict) -import qualified Data.List.NonEmpty as NE -import Data.Pool (Pool) -import qualified Data.Pool as Pool -import qualified Data.Text as T -import qualified Freckle.App.Env as Env -import Kafka.Producer -import UnliftIO.Async (async) -import UnliftIO.Exception (throwString) -import Yesod.Core.Lens -import Yesod.Core.Types (HandlerData) - -envKafkaBrokerAddresses - :: Env.Parser Env.Error (NonEmpty BrokerAddress) -envKafkaBrokerAddresses = - Env.var - (Env.eitherReader readKafkaBrokerAddresses) - "KAFKA_BROKER_ADDRESSES" - mempty - -readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress) -readKafkaBrokerAddresses t = case NE.nonEmpty $ T.splitOn "," $ T.pack t of - Just xs@(x NE.:| _) - | x /= "" -> Right $ BrokerAddress <$> xs - _ -> Left "Broker Address cannot be empty" - -data KafkaProducerPoolConfig = KafkaProducerPoolConfig - { kafkaProducerPoolConfigStripes :: Int - -- ^ The number of stripes (distinct sub-pools) to maintain. - -- The smallest acceptable value is 1. - , kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime - -- ^ Amount of time for which an unused resource is kept open. - -- The smallest acceptable value is 0.5 seconds. - -- - -- The elapsed time before destroying a resource may be a little - -- longer than requested, as the reaper thread wakes at 1-second - -- intervals. - , kafkaProducerPoolConfigSize :: Int - -- ^ Maximum number of resources to keep open per stripe. The - -- smallest acceptable value is 1. - -- - -- Requests for resources will block if this limit is reached on a - -- single stripe, even if other stripes have idle resources - -- available. - } - deriving stock (Show) - --- | Same defaults as 'Database.Persist.Sql.ConnectionPoolConfig' -defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig -defaultKafkaProducerPoolConfig = KafkaProducerPoolConfig 1 600 10 - -envKafkaProducerPoolConfig - :: Env.Parser Env.Error KafkaProducerPoolConfig -envKafkaProducerPoolConfig = do - poolSize <- Env.var Env.auto "KAFKA_PRODUCER_POOL_SIZE" $ Env.def 10 - pure $ defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize = poolSize} - -data KafkaProducerPool - = NullKafkaProducerPool - | KafkaProducerPool (Pool KafkaProducer) - -class HasKafkaProducerPool env where - kafkaProducerPoolL :: Lens' env KafkaProducerPool - -instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where - kafkaProducerPoolL = envL . siteL . kafkaProducerPoolL - -createKafkaProducerPool - :: NonEmpty BrokerAddress - -> KafkaProducerPoolConfig - -> IO (Pool KafkaProducer) -createKafkaProducerPool addresses KafkaProducerPoolConfig {..} = - Pool.createPool - mkProducer - closeProducer - kafkaProducerPoolConfigStripes - kafkaProducerPoolConfigIdleTimeout - kafkaProducerPoolConfigSize - where - mkProducer = - either throw pure =<< newProducer (brokersList $ toList addresses) - throw err = throwString $ "Failed to open kafka producer: " <> show err - -produceKeyedOn - :: ( ToJSON value - , ToJSON key - , MonadLogger m - , MonadReader env m - , HasKafkaProducerPool env - , MonadUnliftIO m - ) - => TopicName - -> NonEmpty value - -> (value -> key) - -> m () -produceKeyedOn prTopic values keyF = do - logDebugNS "kafka" $ "Producing Kafka events" :# ["events" .= values] - view kafkaProducerPoolL >>= \case - NullKafkaProducerPool -> pure () - KafkaProducerPool producerPool -> do - errors <- - liftIO $ - Pool.withResource producerPool $ \producer -> - produceMessageBatch producer $ - toList $ - mkProducerRecord <$> values - unless (null errors) $ - logErrorNS "kafka" $ - "Failed to send events" :# ["errors" .= fmap (tshow . snd) errors] - where - mkProducerRecord value = - ProducerRecord - { prTopic - , prPartition = UnassignedPartition - , prKey = Just $ toStrict $ encode $ keyF value - , prValue = - Just $ - toStrict $ - encode value - } - -produceKeyedOnAsync - :: ( ToJSON value - , ToJSON key - , MonadLogger m - , MonadReader env m - , HasKafkaProducerPool env - , MonadUnliftIO m - ) - => TopicName - -> NonEmpty value - -> (value -> key) - -> m () -produceKeyedOnAsync prTopic values = void . async . produceKeyedOn prTopic values +import Freckle.App.Kafka.Consumer +import Freckle.App.Kafka.Producer diff --git a/library/Freckle/App/Kafka/Consumer.hs b/library/Freckle/App/Kafka/Consumer.hs new file mode 100644 index 00000000..5deb6d51 --- /dev/null +++ b/library/Freckle/App/Kafka/Consumer.hs @@ -0,0 +1,166 @@ +{-# LANGUAGE ApplicativeDo #-} + +module Freckle.App.Kafka.Consumer + ( HasKafkaConsumer (..) + , withKafkaConsumer + , KafkaConsumerConfig (..) + , envKafkaConsumerConfig + , runConsumer + ) where + +import Freckle.App.Prelude + +import Blammo.Logging +import qualified Control.Immortal as Immortal +import Control.Lens (Lens', view) +import Data.Aeson +import qualified Data.List.NonEmpty as NE +import qualified Data.Map.Strict as Map +import qualified Data.Text as T +import qualified Env +import Freckle.App.Env +import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses) +import Kafka.Consumer hiding + ( Timeout + , closeConsumer + , newConsumer + , runConsumer + , subscription + ) +import qualified Kafka.Consumer as Kafka +import UnliftIO.Exception (bracket, displayException, throwIO) + +data KafkaConsumerConfig = KafkaConsumerConfig + { kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress + -- ^ The list of host/port pairs for establishing the initial connection + -- to the Kafka cluster. + -- + -- This is the `bootstrap.servers` Kafka consumer configuration property. + , kafkaConsumerConfigGroupId :: ConsumerGroupId + -- ^ The consumer group id to which the consumer belongs. + -- + -- This is the `group.id` Kafka consumer configuration property. + , kafkaConsumerConfigTopic :: TopicName + -- ^ The topic name polled for messages by the Kafka consumer. + , kafkaConsumerConfigOffsetReset :: OffsetReset + -- ^ The offset reset parameter used when there is no initial offset in Kafka. + -- + -- This is the `auto.offset.reset` Kafka consumer configuration property. + , kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text + -- ^ Extra properties used to configure the Kafka consumer. + } + deriving stock (Show) + +envKafkaTopic + :: Env.Parser Env.Error TopicName +envKafkaTopic = + Env.var + (eitherReader readKafkaTopic) + "KAFKA_TOPIC" + mempty + +readKafkaTopic :: String -> Either String TopicName +readKafkaTopic t = case T.pack t of + "" -> Left "Kafka topics cannot be empty" + x -> Right $ TopicName x + +envKafkaOffsetReset + :: Env.Parser Env.Error OffsetReset +envKafkaOffsetReset = + Env.var + (eitherReader readKafkaOffsetReset) + "KAFKA_OFFSET_RESET" + $ Env.def Earliest + +readKafkaOffsetReset :: String -> Either String OffsetReset +readKafkaOffsetReset t = case T.pack t of + "earliest" -> Right Earliest + "latest" -> Right Latest + _ -> Left "Kafka offset reset must be one of earliest or latest" + +envKafkaConsumerConfig + :: Env.Parser Env.Error KafkaConsumerConfig +envKafkaConsumerConfig = do + brokerAddresses <- envKafkaBrokerAddresses + consumerGroupId <- Env.var Env.nonempty "KAFKA_CONSUMER_GROUP_ID" mempty + kafkaTopic <- envKafkaTopic + kafkaOffsetReset <- envKafkaOffsetReset + kafkaExtraProps <- + Env.var + (fmap Map.fromList . keyValues) + "KAFKA_EXTRA_SUBSCRIPTION_PROPS" + (Env.def mempty) + pure $ + KafkaConsumerConfig + brokerAddresses + consumerGroupId + kafkaTopic + kafkaOffsetReset + kafkaExtraProps + +class HasKafkaConsumer env where + kafkaConsumerL :: Lens' env KafkaConsumer + +consumerProps :: KafkaConsumerConfig -> ConsumerProperties +consumerProps KafkaConsumerConfig {..} = + brokersList brokers + <> groupId kafkaConsumerConfigGroupId + <> noAutoCommit + <> logLevel KafkaLogInfo + where + brokers = NE.toList kafkaConsumerConfigBrokerAddresses + +subscription :: KafkaConsumerConfig -> Subscription +subscription KafkaConsumerConfig {..} = + topics [kafkaConsumerConfigTopic] + <> offsetReset kafkaConsumerConfigOffsetReset + <> extraSubscriptionProps kafkaConsumerConfigExtraSubscriptionProps + +withKafkaConsumer + :: MonadUnliftIO m + => KafkaConsumerConfig + -> (KafkaConsumer -> m a) + -> m a +withKafkaConsumer config = bracket newConsumer closeConsumer + where + (props, sub) = (consumerProps &&& subscription) config + newConsumer = either throwIO pure =<< Kafka.newConsumer props sub + closeConsumer = maybe (pure ()) throwIO <=< Kafka.closeConsumer + +timeoutMs :: Timeout -> Int +timeoutMs = \case + TimeoutSeconds s -> s * 1000 + TimeoutMilliseconds ms -> ms + +runConsumer + :: ( MonadUnliftIO m + , MonadReader env m + , MonadLogger m + , HasKafkaConsumer env + , FromJSON a + ) + => Timeout + -> (a -> m ()) + -> m () +runConsumer pollTimeout onMessage = void $ Immortal.create $ \thread -> Immortal.onUnexpectedFinish thread handleException $ do + consumer <- view kafkaConsumerL + eMessage <- + pollMessage consumer $ Kafka.Timeout $ timeoutMs pollTimeout + case eMessage of + Left (KafkaResponseError RdKafkaRespErrTimedOut) -> logDebug $ "Polling timeout" + Left err -> logError $ "Error polling for message from Kafka" :# ["error" .= show err] + Right m@(ConsumerRecord {..}) -> for_ crValue $ \bs -> do + case eitherDecodeStrict bs of + Left err -> logError $ "Could not decode message value" :# ["error" .= err] + Right a -> do + onMessage a + maybe + (logInfo "Committed offsets") + (\err -> logError $ "Error committing offsets" :# ["error" .= show err]) + =<< commitOffsetMessage OffsetCommit consumer m + where + handleException = \case + Left ex -> + logError $ + "Exception occurred in runConsumer" :# ["exception" .= displayException ex] + Right () -> pure () diff --git a/library/Freckle/App/Kafka/Producer.hs b/library/Freckle/App/Kafka/Producer.hs new file mode 100644 index 00000000..3f297efd --- /dev/null +++ b/library/Freckle/App/Kafka/Producer.hs @@ -0,0 +1,153 @@ +{-# LANGUAGE ApplicativeDo #-} +{-# LANGUAGE NamedFieldPuns #-} + +module Freckle.App.Kafka.Producer + ( envKafkaBrokerAddresses + , KafkaProducerPoolConfig (..) + , envKafkaProducerPoolConfig + , KafkaProducerPool (..) + , HasKafkaProducerPool (..) + , createKafkaProducerPool + , produceKeyedOn + , produceKeyedOnAsync + ) where + +import Freckle.App.Prelude + +import Blammo.Logging +import Control.Lens (Lens', view) +import Data.Aeson (ToJSON, encode) +import Data.ByteString.Lazy (toStrict) +import qualified Data.List.NonEmpty as NE +import Data.Pool (Pool) +import qualified Data.Pool as Pool +import qualified Data.Text as T +import qualified Freckle.App.Env as Env +import Kafka.Producer +import UnliftIO.Async (async) +import UnliftIO.Exception (throwString) +import Yesod.Core.Lens +import Yesod.Core.Types (HandlerData) + +envKafkaBrokerAddresses + :: Env.Parser Env.Error (NonEmpty BrokerAddress) +envKafkaBrokerAddresses = + Env.var + (Env.eitherReader readKafkaBrokerAddresses) + "KAFKA_BROKER_ADDRESSES" + mempty + +readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress) +readKafkaBrokerAddresses t = case NE.nonEmpty $ T.splitOn "," $ T.pack t of + Just xs@(x NE.:| _) + | x /= "" -> Right $ BrokerAddress <$> xs + _ -> Left "Broker Address cannot be empty" + +data KafkaProducerPoolConfig = KafkaProducerPoolConfig + { kafkaProducerPoolConfigStripes :: Int + -- ^ The number of stripes (distinct sub-pools) to maintain. + -- The smallest acceptable value is 1. + , kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime + -- ^ Amount of time for which an unused resource is kept open. + -- The smallest acceptable value is 0.5 seconds. + -- + -- The elapsed time before destroying a resource may be a little + -- longer than requested, as the reaper thread wakes at 1-second + -- intervals. + , kafkaProducerPoolConfigSize :: Int + -- ^ Maximum number of resources to keep open per stripe. The + -- smallest acceptable value is 1. + -- + -- Requests for resources will block if this limit is reached on a + -- single stripe, even if other stripes have idle resources + -- available. + } + deriving stock (Show) + +-- | Same defaults as 'Database.Persist.Sql.ConnectionPoolConfig' +defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig +defaultKafkaProducerPoolConfig = KafkaProducerPoolConfig 1 600 10 + +envKafkaProducerPoolConfig + :: Env.Parser Env.Error KafkaProducerPoolConfig +envKafkaProducerPoolConfig = do + poolSize <- Env.var Env.auto "KAFKA_PRODUCER_POOL_SIZE" $ Env.def 10 + pure $ defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize = poolSize} + +data KafkaProducerPool + = NullKafkaProducerPool + | KafkaProducerPool (Pool KafkaProducer) + +class HasKafkaProducerPool env where + kafkaProducerPoolL :: Lens' env KafkaProducerPool + +instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where + kafkaProducerPoolL = envL . siteL . kafkaProducerPoolL + +createKafkaProducerPool + :: NonEmpty BrokerAddress + -> KafkaProducerPoolConfig + -> IO (Pool KafkaProducer) +createKafkaProducerPool addresses KafkaProducerPoolConfig {..} = + Pool.createPool + mkProducer + closeProducer + kafkaProducerPoolConfigStripes + kafkaProducerPoolConfigIdleTimeout + kafkaProducerPoolConfigSize + where + mkProducer = + either throw pure =<< newProducer (brokersList $ toList addresses) + throw err = throwString $ "Failed to open kafka producer: " <> show err + +produceKeyedOn + :: ( ToJSON value + , ToJSON key + , MonadLogger m + , MonadReader env m + , HasKafkaProducerPool env + , MonadUnliftIO m + ) + => TopicName + -> NonEmpty value + -> (value -> key) + -> m () +produceKeyedOn prTopic values keyF = do + logDebugNS "kafka" $ "Producing Kafka events" :# ["events" .= values] + view kafkaProducerPoolL >>= \case + NullKafkaProducerPool -> pure () + KafkaProducerPool producerPool -> do + errors <- + liftIO $ + Pool.withResource producerPool $ \producer -> + produceMessageBatch producer $ + toList $ + mkProducerRecord <$> values + unless (null errors) $ + logErrorNS "kafka" $ + "Failed to send events" :# ["errors" .= fmap (tshow . snd) errors] + where + mkProducerRecord value = + ProducerRecord + { prTopic + , prPartition = UnassignedPartition + , prKey = Just $ toStrict $ encode $ keyF value + , prValue = + Just $ + toStrict $ + encode value + } + +produceKeyedOnAsync + :: ( ToJSON value + , ToJSON key + , MonadLogger m + , MonadReader env m + , HasKafkaProducerPool env + , MonadUnliftIO m + ) + => TopicName + -> NonEmpty value + -> (value -> key) + -> m () +produceKeyedOnAsync prTopic values = void . async . produceKeyedOn prTopic values diff --git a/package.yaml b/package.yaml index e66debdf..5a87f99e 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: freckle-app -version: 1.9.1.1 +version: 1.9.2.0 maintainer: Freckle Education category: Utils github: freckle/freckle-app