Skip to content

Commit

Permalink
Add Kafka consumer module (#113)
Browse files Browse the repository at this point in the history
* Kafka consumer module

Transplanted from: freckle/progress#42

* Documentation

* Inline `keyValueMap`

This isn't a novel enough function to warrant inclusion into `Freckle.App.Env`

* Abstract `timeout` from Database

This is planned to be used for reading in the polling timeout setting in progress consumer

* Formatting

* Fix doctest

* Use `Timeout` in `runConsumer`

* Use `logDebug`

* Move Kafka to Producer module

* `v1.9.2.0`

* Poll single topic

* Commit offset only after processing valid message
  • Loading branch information
stackptr authored Aug 14, 2023
1 parent 7470f9a commit 41f9715
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 190 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## [_Unreleased_](https://github.com/freckle/freckle-app/compare/v1.9.1.1...main)

## [v1.9.2.0](https://github.com/freckle/freckle-app/compare/v1.9.1.1...v1.9.2.0)

- Add `Freckle.App.Kafka.Consumer` for consuming Kafka events.
- Move producer to `Freckle.App.Kafka.Producer`.
- Re-export both modules in `Freckle.App.Kafka`.

## [v1.9.1.1](https://github.com/freckle/freckle-app/compare/v1.9.1.0...v1.9.1.1)

- Add `KafkaProducerPoolConfig` for controlling Kafka producer pool parameters
Expand Down
6 changes: 4 additions & 2 deletions freckle-app.cabal
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
cabal-version: 1.18

-- This file has been generated from package.yaml by hpack version 0.35.1.
-- This file has been generated from package.yaml by hpack version 0.35.2.
--
-- see: https://github.com/sol/hpack

name: freckle-app
version: 1.9.1.1
version: 1.9.2.0
synopsis: Haskell application toolkit used at Freckle
description: Please see README.md
category: Utils
Expand Down Expand Up @@ -43,6 +43,8 @@ library
Freckle.App.Http.Paginate
Freckle.App.Http.Retry
Freckle.App.Kafka
Freckle.App.Kafka.Consumer
Freckle.App.Kafka.Producer
Freckle.App.Memcached
Freckle.App.Memcached.CacheKey
Freckle.App.Memcached.CacheTTL
Expand Down
43 changes: 6 additions & 37 deletions library/Freckle/App/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Freckle.App.Database
, PostgresConnectionConf (..)
, PostgresPasswordSource (..)
, PostgresPassword (..)
, PostgresStatementTimeout (..)
, PostgresStatementTimeout
, postgresStatementTimeoutMilliseconds
, envParseDatabaseConf
, envPostgresPasswordSource
Expand All @@ -28,7 +28,6 @@ import Control.Monad.Reader
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS8
import qualified Data.ByteString.Lazy as BSL
import Data.Char (isDigit)
import Data.Pool
import qualified Data.Text as T
import Database.Persist.Postgresql
Expand All @@ -47,6 +46,7 @@ import Database.PostgreSQL.Simple
, execute
)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Freckle.App.Env (Timeout (..))
import qualified Freckle.App.Env as Env
import Freckle.App.OpenTelemetry (MonadTracer (..))
import Freckle.App.Stats (HasStatsClient)
Expand All @@ -58,7 +58,6 @@ import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Exception (displayException)
import UnliftIO.IORef
import Yesod.Core.Types (HandlerData (..), RunHandlerEnv (..))
import qualified Prelude as Unsafe (read)

type SqlPool = Pool SqlBackend

Expand Down Expand Up @@ -149,41 +148,12 @@ data PostgresPassword
| PostgresPasswordStatic String
deriving stock (Show, Eq)

data PostgresStatementTimeout
= PostgresStatementTimeoutSeconds Int
| PostgresStatementTimeoutMilliseconds Int
deriving stock (Show, Eq)
type PostgresStatementTimeout = Timeout

postgresStatementTimeoutMilliseconds :: PostgresStatementTimeout -> Int
postgresStatementTimeoutMilliseconds = \case
PostgresStatementTimeoutSeconds s -> s * 1000
PostgresStatementTimeoutMilliseconds ms -> ms

-- | Read @PGSTATEMENTTIMEOUT@ as seconds or milliseconds
--
-- >>> readPostgresStatementTimeout "10"
-- Right (PostgresStatementTimeoutSeconds 10)
--
-- >>> readPostgresStatementTimeout "10s"
-- Right (PostgresStatementTimeoutSeconds 10)
--
-- >>> readPostgresStatementTimeout "10ms"
-- Right (PostgresStatementTimeoutMilliseconds 10)
--
-- >>> readPostgresStatementTimeout "20m"
-- Left "..."
--
-- >>> readPostgresStatementTimeout "2m0"
-- Left "..."
readPostgresStatementTimeout
:: String -> Either String PostgresStatementTimeout
readPostgresStatementTimeout x = case span isDigit x of
("", _) -> Left "must be {digits}(s|ms)"
(digits, "") -> Right $ PostgresStatementTimeoutSeconds $ Unsafe.read digits
(digits, "s") -> Right $ PostgresStatementTimeoutSeconds $ Unsafe.read digits
(digits, "ms") ->
Right $ PostgresStatementTimeoutMilliseconds $ Unsafe.read digits
_ -> Left "must be {digits}(s|ms)"
TimeoutSeconds s -> s * 1000
TimeoutMilliseconds ms -> ms

envPostgresPasswordSource :: Env.Parser Env.Error PostgresPasswordSource
envPostgresPasswordSource =
Expand All @@ -207,8 +177,7 @@ envParseDatabaseConf source = do
poolSize <- Env.var Env.auto "PGPOOLSIZE" $ Env.def 10
schema <- optional $ Env.var Env.nonempty "PGSCHEMA" mempty
statementTimeout <-
Env.var (Env.eitherReader readPostgresStatementTimeout) "PGSTATEMENTTIMEOUT" $
Env.def (PostgresStatementTimeoutSeconds 120)
Env.var Env.timeout "PGSTATEMENTTIMEOUT" $ Env.def (TimeoutSeconds 120)
pure
PostgresConnectionConf
{ pccHost = host
Expand Down
37 changes: 37 additions & 0 deletions library/Freckle/App/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,25 @@ module Freckle.App.Env
, flag

-- * Extensions
, Timeout (..)
, kept
, eitherReader
, time
, keyValues
, timeout
) where

import Freckle.App.Prelude

import Control.Error.Util (note)
import Data.Char (isDigit)
import qualified Data.Text as T
import Data.Time (defaultTimeLocale, parseTimeM)
import Env hiding (flag)
import qualified Env
import Env.Internal.Free (hoistAlt)
import Env.Internal.Parser (Parser (..), VarF (..))
import qualified Prelude as Unsafe (read)

-- | Designates the value of a parameter when a flag is not provided.
newtype Off a = Off a
Expand Down Expand Up @@ -139,3 +143,36 @@ keyValues = eitherReader $ traverse keyValue . T.splitOn "," . pack
(k, v) | T.null v -> Left $ "Key " <> unpack k <> " has no value"
(k, v) | T.null k -> Left $ "Value " <> unpack v <> " has no key"
(k, v) -> Right (k, v)

-- | Represents a timeout in seconds or milliseconds
data Timeout
= TimeoutSeconds Int
| TimeoutMilliseconds Int
deriving stock (Show, Eq)

-- | Read a timeout value as seconds or milliseconds
--
-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10")]
-- Right (TimeoutSeconds 10)
--
-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10s")]
-- Right (TimeoutSeconds 10)
--
-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "10ms")]
-- Right (TimeoutMilliseconds 10)
--
-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "20m")]
-- Left [("TIMEOUT",UnreadError "must be {digits}(s|ms): \"20m\"")]
--
-- >>> var timeout "TIMEOUT" mempty `parsePure` [("TIMEOUT", "2m0")]
-- Left [("TIMEOUT",UnreadError "must be {digits}(s|ms): \"2m0\"")]
timeout :: Reader Error Timeout
timeout = eitherReader $ parseTimeout . span isDigit
where
parseTimeout = \case
("", _) -> Left "must be {digits}(s|ms)"
(digits, "") -> Right $ TimeoutSeconds $ Unsafe.read digits
(digits, "s") -> Right $ TimeoutSeconds $ Unsafe.read digits
(digits, "ms") ->
Right $ TimeoutMilliseconds $ Unsafe.read digits
_ -> Left "must be {digits}(s|ms)"
154 changes: 4 additions & 150 deletions library/Freckle/App/Kafka.hs
Original file line number Diff line number Diff line change
@@ -1,153 +1,7 @@
{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE NamedFieldPuns #-}

module Freckle.App.Kafka
( envKafkaBrokerAddresses
, KafkaProducerPoolConfig (..)
, envKafkaProducerPoolConfig
, KafkaProducerPool (..)
, HasKafkaProducerPool (..)
, createKafkaProducerPool
, produceKeyedOn
, produceKeyedOnAsync
( module Freckle.App.Kafka.Consumer
, module Freckle.App.Kafka.Producer
) where

import Freckle.App.Prelude

import Blammo.Logging
import Control.Lens (Lens', view)
import Data.Aeson (ToJSON, encode)
import Data.ByteString.Lazy (toStrict)
import qualified Data.List.NonEmpty as NE
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Data.Text as T
import qualified Freckle.App.Env as Env
import Kafka.Producer
import UnliftIO.Async (async)
import UnliftIO.Exception (throwString)
import Yesod.Core.Lens
import Yesod.Core.Types (HandlerData)

envKafkaBrokerAddresses
:: Env.Parser Env.Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses =
Env.var
(Env.eitherReader readKafkaBrokerAddresses)
"KAFKA_BROKER_ADDRESSES"
mempty

readKafkaBrokerAddresses :: String -> Either String (NonEmpty BrokerAddress)
readKafkaBrokerAddresses t = case NE.nonEmpty $ T.splitOn "," $ T.pack t of
Just xs@(x NE.:| _)
| x /= "" -> Right $ BrokerAddress <$> xs
_ -> Left "Broker Address cannot be empty"

data KafkaProducerPoolConfig = KafkaProducerPoolConfig
{ kafkaProducerPoolConfigStripes :: Int
-- ^ The number of stripes (distinct sub-pools) to maintain.
-- The smallest acceptable value is 1.
, kafkaProducerPoolConfigIdleTimeout :: NominalDiffTime
-- ^ Amount of time for which an unused resource is kept open.
-- The smallest acceptable value is 0.5 seconds.
--
-- The elapsed time before destroying a resource may be a little
-- longer than requested, as the reaper thread wakes at 1-second
-- intervals.
, kafkaProducerPoolConfigSize :: Int
-- ^ Maximum number of resources to keep open per stripe. The
-- smallest acceptable value is 1.
--
-- Requests for resources will block if this limit is reached on a
-- single stripe, even if other stripes have idle resources
-- available.
}
deriving stock (Show)

-- | Same defaults as 'Database.Persist.Sql.ConnectionPoolConfig'
defaultKafkaProducerPoolConfig :: KafkaProducerPoolConfig
defaultKafkaProducerPoolConfig = KafkaProducerPoolConfig 1 600 10

envKafkaProducerPoolConfig
:: Env.Parser Env.Error KafkaProducerPoolConfig
envKafkaProducerPoolConfig = do
poolSize <- Env.var Env.auto "KAFKA_PRODUCER_POOL_SIZE" $ Env.def 10
pure $ defaultKafkaProducerPoolConfig {kafkaProducerPoolConfigSize = poolSize}

data KafkaProducerPool
= NullKafkaProducerPool
| KafkaProducerPool (Pool KafkaProducer)

class HasKafkaProducerPool env where
kafkaProducerPoolL :: Lens' env KafkaProducerPool

instance HasKafkaProducerPool site => HasKafkaProducerPool (HandlerData child site) where
kafkaProducerPoolL = envL . siteL . kafkaProducerPoolL

createKafkaProducerPool
:: NonEmpty BrokerAddress
-> KafkaProducerPoolConfig
-> IO (Pool KafkaProducer)
createKafkaProducerPool addresses KafkaProducerPoolConfig {..} =
Pool.createPool
mkProducer
closeProducer
kafkaProducerPoolConfigStripes
kafkaProducerPoolConfigIdleTimeout
kafkaProducerPoolConfigSize
where
mkProducer =
either throw pure =<< newProducer (brokersList $ toList addresses)
throw err = throwString $ "Failed to open kafka producer: " <> show err

produceKeyedOn
:: ( ToJSON value
, ToJSON key
, MonadLogger m
, MonadReader env m
, HasKafkaProducerPool env
, MonadUnliftIO m
)
=> TopicName
-> NonEmpty value
-> (value -> key)
-> m ()
produceKeyedOn prTopic values keyF = do
logDebugNS "kafka" $ "Producing Kafka events" :# ["events" .= values]
view kafkaProducerPoolL >>= \case
NullKafkaProducerPool -> pure ()
KafkaProducerPool producerPool -> do
errors <-
liftIO $
Pool.withResource producerPool $ \producer ->
produceMessageBatch producer $
toList $
mkProducerRecord <$> values
unless (null errors) $
logErrorNS "kafka" $
"Failed to send events" :# ["errors" .= fmap (tshow . snd) errors]
where
mkProducerRecord value =
ProducerRecord
{ prTopic
, prPartition = UnassignedPartition
, prKey = Just $ toStrict $ encode $ keyF value
, prValue =
Just $
toStrict $
encode value
}

produceKeyedOnAsync
:: ( ToJSON value
, ToJSON key
, MonadLogger m
, MonadReader env m
, HasKafkaProducerPool env
, MonadUnliftIO m
)
=> TopicName
-> NonEmpty value
-> (value -> key)
-> m ()
produceKeyedOnAsync prTopic values = void . async . produceKeyedOn prTopic values
import Freckle.App.Kafka.Consumer
import Freckle.App.Kafka.Producer
Loading

0 comments on commit 41f9715

Please sign in to comment.