From 4f18b6c89dca3d4bddb7d65faa2c6e9dd0693e61 Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Wed, 14 Feb 2024 23:36:46 +0300 Subject: [PATCH 1/5] State the goals --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ffd14a..a4accc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.11 + +- Optional observability event stream, which can be interpreted into log records and metrics. + # 0.10.1 - Avoid releasing connections on exceptions thrown in session From edc33d27adb134aefc39b7aba1fd00c07166305f Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Wed, 14 Feb 2024 23:56:05 +0300 Subject: [PATCH 2/5] Implement observability --- hasql-pool.cabal | 7 ++- library/Hasql/Pool.hs | 97 ++++++++++++++++++++++--------- library/Hasql/Pool/Observation.hs | 17 ++++++ library/Hasql/Pool/Prelude.hs | 2 + test/Main.hs | 2 +- 5 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 library/Hasql/Pool/Observation.hs diff --git a/hasql-pool.cabal b/hasql-pool.cabal index f5aa477..a5c5807 100644 --- a/hasql-pool.cabal +++ b/hasql-pool.cabal @@ -65,13 +65,18 @@ common base-settings library import: base-settings hs-source-dirs: library - exposed-modules: Hasql.Pool + exposed-modules: + Hasql.Pool + Hasql.Pool.Observation + other-modules: Hasql.Pool.Prelude build-depends: , base >=4.11 && <5 + , bytestring >=0.10 && <0.14 , hasql >=1.6.0.1 && <1.7 , stm >=2.5 && <3 , time >=1.9 && <2 + , uuid >=1.3 && <2 test-suite test import: base-settings diff --git a/library/Hasql/Pool.hs b/library/Hasql/Pool.hs index 75abd42..d945336 100644 --- a/library/Hasql/Pool.hs +++ b/library/Hasql/Pool.hs @@ -8,11 +8,17 @@ module Hasql.Pool -- * Errors UsageError (..), + + -- * Observations + Observation (..), + ReleaseReason (..), ) where +import qualified Data.UUID.V4 as Uuid import Hasql.Connection (Connection) import qualified Hasql.Connection as Connection +import Hasql.Pool.Observation import Hasql.Pool.Prelude import qualified Hasql.Session as Session @@ -20,17 +26,17 @@ import qualified Hasql.Session as Session data Entry = Entry { entryConnection :: Connection, entryCreationTimeNSec :: Word64, - entryUseTimeNSec :: Word64 + entryUseTimeNSec :: Word64, + entryId :: UUID } -entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool -entryIsAlive maxLifetime maxIdletime now Entry {..} = - now - <= entryCreationTimeNSec - + maxLifetime - && now - <= entryUseTimeNSec - + maxIdletime +entryIsAged :: Word64 -> Word64 -> Entry -> Bool +entryIsAged maxLifetime now Entry {..} = + now > entryCreationTimeNSec + maxLifetime + +entryIsIdle :: Word64 -> Word64 -> Entry -> Bool +entryIsIdle maxIdletime now Entry {..} = + now > entryUseTimeNSec + maxIdletime -- | Pool of connections to DB. data Pool = Pool @@ -54,7 +60,9 @@ data Pool = Pool -- | Whether to return a connection to the pool. poolReuseVar :: TVar (TVar Bool), -- | To stop the manager thread via garbage collection. - poolReaperRef :: IORef () + poolReaperRef :: IORef (), + -- | Action for reporting the observations. + poolObserver :: Observation -> IO () } -- | Create a connection-pool, with default settings. @@ -72,9 +80,16 @@ acquire :: DiffTime -> -- | Connection settings. Connection.Settings -> + -- | Observation handler. + -- + -- Typically it's used for monitoring the state of the pool via metrics and logging. + -- + -- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue. + -- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread. + (Observation -> IO ()) -> IO Pool -acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings = - acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) +acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings observer = + acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) observer -- | Create a connection-pool. -- @@ -94,8 +109,15 @@ acquireDynamically :: DiffTime -> -- | Action fetching connection settings. IO Connection.Settings -> + -- | Observation handler. + -- + -- Use it for monitoring the state of the pool via metrics and logging. + -- + -- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue. + -- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread. + (Observation -> IO ()) -> IO Pool -acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings = do +acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings observer = do connectionQueue <- newTQueueIO capVar <- newTVarIO poolSize reuseVar <- newTVarIO =<< newTVarIO True @@ -106,17 +128,24 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe now <- getMonotonicTimeNSec join . atomically $ do entries <- flushTQueue connectionQueue - let (keep, close) = partition (entryIsAlive maxLifetimeNanos maxIdletimeNanos now) entries - traverse_ (writeTQueue connectionQueue) keep - return $ forM_ close $ \entry -> do - Connection.release (entryConnection entry) - atomically $ modifyTVar' capVar succ + let (agedEntries, unagedEntries) = partition (entryIsAged maxLifetimeNanos now) entries + (idleEntries, liveEntries) = partition (entryIsIdle maxLifetimeNanos now) unagedEntries + traverse_ (writeTQueue connectionQueue) liveEntries + return $ do + forM_ agedEntries $ \entry -> do + Connection.release (entryConnection entry) + atomically $ modifyTVar' capVar succ + observer (ConnectionReleasedObservation (entryId entry) AgingReleaseReason) + forM_ idleEntries $ \entry -> do + Connection.release (entryConnection entry) + atomically $ modifyTVar' capVar succ + observer (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason) void . mkWeakIORef reaperRef $ do -- When the pool goes out of scope, stop the manager. killThread managerTid - return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef + return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef observer where acqTimeoutMicros = div (fromIntegral (diffTimeToPicoseconds acqTimeout)) 1_000_000 @@ -143,6 +172,7 @@ release Pool {..} = return $ forM_ entries $ \entry -> do Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ + poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason) -- | Use a connection from the pool to run a session and return the connection -- to the pool, when finished. @@ -152,8 +182,7 @@ release Pool {..} = -- and a slot gets freed up for a new connection to be established the next -- time one is needed. The error still gets returned from this function. -- --- __Warning:__ Due to the mechanism mentioned above you should avoid consuming --- errors within sessions. +-- __Warning:__ Due to the mechanism mentioned above you should avoid intercepting this error type from within sessions. use :: Pool -> Session.Session a -> IO (Either UsageError a) use Pool {..} sess = do timeout <- do @@ -180,20 +209,33 @@ use Pool {..} sess = do onNewConn reuseVar = do settings <- poolFetchConnectionSettings now <- getMonotonicTimeNSec + id <- Uuid.nextRandom + poolObserver (AttemptingToConnectObservation id) connRes <- Connection.acquire settings case connRes of Left connErr -> do + poolObserver (FailedToConnectObservation id connErr) atomically $ modifyTVar' poolCapacity succ return $ Left $ ConnectionUsageError connErr - Right entry -> onLiveConn reuseVar (Entry entry now now) + Right entry -> do + poolObserver (ConnectionEstablishedObservation id) + onLiveConn reuseVar (Entry entry now now id) onConn reuseVar entry = do now <- getMonotonicTimeNSec - if entryIsAlive poolMaxLifetime poolMaxIdletime now entry - then onLiveConn reuseVar entry {entryUseTimeNSec = now} - else do + if entryIsAged poolMaxLifetime now entry + then do Connection.release (entryConnection entry) + poolObserver (ConnectionReleasedObservation (entryId entry) AgingReleaseReason) onNewConn reuseVar + else + if entryIsIdle poolMaxIdletime now entry + then do + Connection.release (entryConnection entry) + poolObserver (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason) + onNewConn reuseVar + else do + onLiveConn reuseVar entry {entryUseTimeNSec = now} onLiveConn reuseVar entry = do sessRes <- try @SomeException (Session.run sess (entryConnection entry)) @@ -203,8 +245,10 @@ use Pool {..} sess = do returnConn throwIO exc Right (Left err) -> case err of - Session.QueryError _ _ (Session.ClientError _) -> do + Session.QueryError _ _ (Session.ClientError details) -> do + Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ + poolObserver (ConnectionReleasedObservation (entryId entry) (TransportErrorReleaseReason details)) return $ Left $ SessionUsageError err _ -> do returnConn @@ -221,6 +265,7 @@ use Pool {..} sess = do else return $ do Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ + poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason) -- | Union over all errors that 'use' can result in. data UsageError diff --git a/library/Hasql/Pool/Observation.hs b/library/Hasql/Pool/Observation.hs new file mode 100644 index 0000000..e51d35f --- /dev/null +++ b/library/Hasql/Pool/Observation.hs @@ -0,0 +1,17 @@ +module Hasql.Pool.Observation where + +import Hasql.Pool.Prelude + +data Observation + = ConnectionEstablishedObservation UUID + | AttemptingToConnectObservation UUID + | FailedToConnectObservation UUID (Maybe ByteString) + | ConnectionReleasedObservation UUID ReleaseReason + deriving (Show, Eq) + +data ReleaseReason + = AgingReleaseReason + | IdlenessReleaseReason + | TransportErrorReleaseReason (Maybe ByteString) + | ReleaseActionCallReleaseReason + deriving (Show, Eq) diff --git a/library/Hasql/Pool/Prelude.hs b/library/Hasql/Pool/Prelude.hs index 3ac731e..584f5ac 100644 --- a/library/Hasql/Pool/Prelude.hs +++ b/library/Hasql/Pool/Prelude.hs @@ -17,6 +17,7 @@ import Control.Monad.ST as Exports import Data.Bifunctor as Exports import Data.Bits as Exports import Data.Bool as Exports +import Data.ByteString as Exports (ByteString) import Data.Char as Exports import Data.Coerce as Exports import Data.Complex as Exports @@ -43,6 +44,7 @@ import Data.String as Exports import Data.Time as Exports import Data.Traversable as Exports import Data.Tuple as Exports +import Data.UUID as Exports (UUID) import Data.Unique as Exports import Data.Version as Exports import Data.Void as Exports diff --git a/test/Main.hs b/test/Main.hs index 873d6c2..b3659f4 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -18,7 +18,7 @@ main :: IO () main = do connectionSettings <- getConnectionSettings let withPool poolSize acqTimeout maxLifetime maxIdletime connectionSettings = - bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings) release + bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings (const (pure ()))) release withDefaultPool = withPool 3 10 1_800 1_800 connectionSettings From fc48a102585a832f3021360c5588ef9854573cc3 Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Thu, 22 Feb 2024 21:02:18 +0300 Subject: [PATCH 3/5] Move to a status model --- hasql-pool.cabal | 1 + library/Hasql/Pool.hs | 27 +++++++++++++--------- library/Hasql/Pool/Observation.hs | 37 +++++++++++++++++++++++-------- library/Hasql/Pool/Prelude.hs | 1 + 4 files changed, 46 insertions(+), 20 deletions(-) diff --git a/hasql-pool.cabal b/hasql-pool.cabal index a5c5807..d8b6bcd 100644 --- a/hasql-pool.cabal +++ b/hasql-pool.cabal @@ -75,6 +75,7 @@ library , bytestring >=0.10 && <0.14 , hasql >=1.6.0.1 && <1.7 , stm >=2.5 && <3 + , text >=1.2 && <3 , time >=1.9 && <2 , uuid >=1.3 && <2 diff --git a/library/Hasql/Pool.hs b/library/Hasql/Pool.hs index d945336..1b1cb4a 100644 --- a/library/Hasql/Pool.hs +++ b/library/Hasql/Pool.hs @@ -11,10 +11,12 @@ module Hasql.Pool -- * Observations Observation (..), - ReleaseReason (..), + ConnectionTerminationReason (..), ) where +import qualified Data.Text.Encoding as Text +import qualified Data.Text.Encoding.Error as Text import qualified Data.UUID.V4 as Uuid import Hasql.Connection (Connection) import qualified Hasql.Connection as Connection @@ -135,11 +137,11 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe forM_ agedEntries $ \entry -> do Connection.release (entryConnection entry) atomically $ modifyTVar' capVar succ - observer (ConnectionReleasedObservation (entryId entry) AgingReleaseReason) + observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason)) forM_ idleEntries $ \entry -> do Connection.release (entryConnection entry) atomically $ modifyTVar' capVar succ - observer (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason) + observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason)) void . mkWeakIORef reaperRef $ do -- When the pool goes out of scope, stop the manager. @@ -172,7 +174,7 @@ release Pool {..} = return $ forM_ entries $ \entry -> do Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ - poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason) + poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason)) -- | Use a connection from the pool to run a session and return the connection -- to the pool, when finished. @@ -210,15 +212,15 @@ use Pool {..} sess = do settings <- poolFetchConnectionSettings now <- getMonotonicTimeNSec id <- Uuid.nextRandom - poolObserver (AttemptingToConnectObservation id) + poolObserver (ConnectionObservation id ConnectingConnectionStatus) connRes <- Connection.acquire settings case connRes of Left connErr -> do - poolObserver (FailedToConnectObservation id connErr) + poolObserver (ConnectionObservation id (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) connErr)))) atomically $ modifyTVar' poolCapacity succ return $ Left $ ConnectionUsageError connErr Right entry -> do - poolObserver (ConnectionEstablishedObservation id) + poolObserver (ConnectionObservation id ReadyForUseConnectionStatus) onLiveConn reuseVar (Entry entry now now id) onConn reuseVar entry = do @@ -226,18 +228,19 @@ use Pool {..} sess = do if entryIsAged poolMaxLifetime now entry then do Connection.release (entryConnection entry) - poolObserver (ConnectionReleasedObservation (entryId entry) AgingReleaseReason) + poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason)) onNewConn reuseVar else if entryIsIdle poolMaxIdletime now entry then do Connection.release (entryConnection entry) - poolObserver (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason) + poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason)) onNewConn reuseVar else do onLiveConn reuseVar entry {entryUseTimeNSec = now} onLiveConn reuseVar entry = do + poolObserver (ConnectionObservation (entryId entry) InUseConnectionStatus) sessRes <- try @SomeException (Session.run sess (entryConnection entry)) case sessRes of @@ -248,13 +251,15 @@ use Pool {..} sess = do Session.QueryError _ _ (Session.ClientError details) -> do Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ - poolObserver (ConnectionReleasedObservation (entryId entry) (TransportErrorReleaseReason details)) + poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) details)))) return $ Left $ SessionUsageError err _ -> do returnConn + poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus) return $ Left $ SessionUsageError err Right (Right res) -> do returnConn + poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus) return $ Right res where returnConn = @@ -265,7 +270,7 @@ use Pool {..} sess = do else return $ do Connection.release (entryConnection entry) atomically $ modifyTVar' poolCapacity succ - poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason) + poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason)) -- | Union over all errors that 'use' can result in. data UsageError diff --git a/library/Hasql/Pool/Observation.hs b/library/Hasql/Pool/Observation.hs index e51d35f..1ee14ba 100644 --- a/library/Hasql/Pool/Observation.hs +++ b/library/Hasql/Pool/Observation.hs @@ -3,15 +3,34 @@ module Hasql.Pool.Observation where import Hasql.Pool.Prelude data Observation - = ConnectionEstablishedObservation UUID - | AttemptingToConnectObservation UUID - | FailedToConnectObservation UUID (Maybe ByteString) - | ConnectionReleasedObservation UUID ReleaseReason + = ConnectionObservation + -- | Generated connection ID. + -- For grouping the observations by one connection. + UUID + -- | Connection status that it has entered. + ConnectionStatus deriving (Show, Eq) -data ReleaseReason - = AgingReleaseReason - | IdlenessReleaseReason - | TransportErrorReleaseReason (Maybe ByteString) - | ReleaseActionCallReleaseReason +data ConnectionStatus + = -- | Connection is being established. + ConnectingConnectionStatus + | -- | Connection is established and not occupied. + ReadyForUseConnectionStatus + | -- | Is being used by some session. + -- + -- After it's done the status will transition to 'ReadyForUseConnectionStatus' or 'ReleasedConnectionStatus'. + InUseConnectionStatus + | -- | Connection terminated. + TerminatedConnectionStatus ConnectionTerminationReason + deriving (Show, Eq) + +data ConnectionTerminationReason + = -- | The age timeout of the connection has passed. + AgingConnectionTerminationReason + | -- | The timeout of how long a connection may remain idle in the pool has passed. + IdlenessConnectionTerminationReason + | -- | Connectivity issues with the server. + NetworkErrorConnectionTerminationReason (Maybe Text) + | -- | User has invoked the 'Hasql.Pool.release' procedure. + ReleaseConnectionTerminationReason deriving (Show, Eq) diff --git a/library/Hasql/Pool/Prelude.hs b/library/Hasql/Pool/Prelude.hs index 584f5ac..f291d0d 100644 --- a/library/Hasql/Pool/Prelude.hs +++ b/library/Hasql/Pool/Prelude.hs @@ -41,6 +41,7 @@ import Data.Proxy as Exports import Data.Ratio as Exports import Data.STRef as Exports import Data.String as Exports +import Data.Text as Exports (Text) import Data.Time as Exports import Data.Traversable as Exports import Data.Tuple as Exports From 4a290714f9ce529bd23cbe404caebc5d8d4277a7 Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Thu, 22 Feb 2024 21:06:18 +0300 Subject: [PATCH 4/5] Fix export --- library/Hasql/Pool.hs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/library/Hasql/Pool.hs b/library/Hasql/Pool.hs index 1b1cb4a..0f171f3 100644 --- a/library/Hasql/Pool.hs +++ b/library/Hasql/Pool.hs @@ -10,8 +10,7 @@ module Hasql.Pool UsageError (..), -- * Observations - Observation (..), - ConnectionTerminationReason (..), + module Hasql.Pool.Observation, ) where From 7a0e6201f5f03fde226edb7116770d706a622b8f Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Fri, 23 Feb 2024 14:35:42 +0300 Subject: [PATCH 5/5] Correct refs --- library/Hasql/Pool/Observation.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library/Hasql/Pool/Observation.hs b/library/Hasql/Pool/Observation.hs index 1ee14ba..d0e26a0 100644 --- a/library/Hasql/Pool/Observation.hs +++ b/library/Hasql/Pool/Observation.hs @@ -18,7 +18,7 @@ data ConnectionStatus ReadyForUseConnectionStatus | -- | Is being used by some session. -- - -- After it's done the status will transition to 'ReadyForUseConnectionStatus' or 'ReleasedConnectionStatus'. + -- After it's done the status will transition to 'ReadyForUseConnectionStatus' or 'TerminatedConnectionStatus'. InUseConnectionStatus | -- | Connection terminated. TerminatedConnectionStatus ConnectionTerminationReason