Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observability #40

Merged
merged 5 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.11

- Optional observability event stream, which can be interpreted into log records and metrics.

# 0.10.1

- Avoid releasing connections on exceptions thrown in session
Expand Down
8 changes: 7 additions & 1 deletion hasql-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ common base-settings
library
import: base-settings
hs-source-dirs: library
exposed-modules: Hasql.Pool
exposed-modules:
Hasql.Pool
Hasql.Pool.Observation

other-modules: Hasql.Pool.Prelude
build-depends:
, base >=4.11 && <5
, bytestring >=0.10 && <0.14
, hasql >=1.6.0.1 && <1.7
, stm >=2.5 && <3
, text >=1.2 && <3
, time >=1.9 && <2
, uuid >=1.3 && <2

test-suite test
import: base-settings
Expand Down
101 changes: 75 additions & 26 deletions library/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@ module Hasql.Pool

-- * Errors
UsageError (..),

-- * Observations
module Hasql.Pool.Observation,
)
where

import qualified Data.Text.Encoding as Text
import qualified Data.Text.Encoding.Error as Text
import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session

-- | A connection tagged with metadata.
data Entry = Entry
{ entryConnection :: Connection,
entryCreationTimeNSec :: Word64,
entryUseTimeNSec :: Word64
entryUseTimeNSec :: Word64,
entryId :: UUID
}

entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive maxLifetime maxIdletime now Entry {..} =
now
<= entryCreationTimeNSec
+ maxLifetime
&& now
<= entryUseTimeNSec
+ maxIdletime
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged maxLifetime now Entry {..} =
now > entryCreationTimeNSec + maxLifetime

entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle maxIdletime now Entry {..} =
now > entryUseTimeNSec + maxIdletime

-- | Pool of connections to DB.
data Pool = Pool
Expand All @@ -54,7 +61,9 @@ data Pool = Pool
-- | Whether to return a connection to the pool.
poolReuseVar :: TVar (TVar Bool),
-- | To stop the manager thread via garbage collection.
poolReaperRef :: IORef ()
poolReaperRef :: IORef (),
-- | Action for reporting the observations.
poolObserver :: Observation -> IO ()
}

-- | Create a connection-pool, with default settings.
Expand All @@ -72,9 +81,16 @@ acquire ::
DiffTime ->
-- | Connection settings.
Connection.Settings ->
-- | Observation handler.
--
-- Typically it's used for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
Comment on lines +88 to +89

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could this happen? When the pool size is big and there are lots of connections?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the user supplied action here. It should be quick since it will block the pool processes.

(Observation -> IO ()) ->
IO Pool
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings)
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings observer =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) observer

-- | Create a connection-pool.
--
Expand All @@ -94,8 +110,15 @@ acquireDynamically ::
DiffTime ->
-- | Action fetching connection settings.
IO Connection.Settings ->
-- | Observation handler.
--
-- Use it for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings = do
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings observer = do
connectionQueue <- newTQueueIO
capVar <- newTVarIO poolSize
reuseVar <- newTVarIO =<< newTVarIO True
Expand All @@ -106,17 +129,24 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe
now <- getMonotonicTimeNSec
join . atomically $ do
entries <- flushTQueue connectionQueue
let (keep, close) = partition (entryIsAlive maxLifetimeNanos maxIdletimeNanos now) entries
traverse_ (writeTQueue connectionQueue) keep
return $ forM_ close $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
let (agedEntries, unagedEntries) = partition (entryIsAged maxLifetimeNanos now) entries
(idleEntries, liveEntries) = partition (entryIsIdle maxLifetimeNanos now) unagedEntries
traverse_ (writeTQueue connectionQueue) liveEntries
return $ do
forM_ agedEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
forM_ idleEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))

void . mkWeakIORef reaperRef $ do
-- When the pool goes out of scope, stop the manager.
killThread managerTid

return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef
return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef observer
where
acqTimeoutMicros =
div (fromIntegral (diffTimeToPicoseconds acqTimeout)) 1_000_000
Expand All @@ -143,6 +173,7 @@ release Pool {..} =
return $ forM_ entries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason))

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
Expand All @@ -152,8 +183,7 @@ release Pool {..} =
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
--
-- __Warning:__ Due to the mechanism mentioned above you should avoid consuming
-- errors within sessions.
-- __Warning:__ Due to the mechanism mentioned above you should avoid intercepting this error type from within sessions.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use Pool {..} sess = do
timeout <- do
Expand All @@ -180,37 +210,55 @@ use Pool {..} sess = do
onNewConn reuseVar = do
settings <- poolFetchConnectionSettings
now <- getMonotonicTimeNSec
id <- Uuid.nextRandom
poolObserver (ConnectionObservation id ConnectingConnectionStatus)
connRes <- Connection.acquire settings
case connRes of
Left connErr -> do
poolObserver (ConnectionObservation id (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) connErr))))
atomically $ modifyTVar' poolCapacity succ
return $ Left $ ConnectionUsageError connErr
Right entry -> onLiveConn reuseVar (Entry entry now now)
Right entry -> do
poolObserver (ConnectionObservation id ReadyForUseConnectionStatus)
onLiveConn reuseVar (Entry entry now now id)

onConn reuseVar entry = do
now <- getMonotonicTimeNSec
if entryIsAlive poolMaxLifetime poolMaxIdletime now entry
then onLiveConn reuseVar entry {entryUseTimeNSec = now}
else do
if entryIsAged poolMaxLifetime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
onNewConn reuseVar
else
if entryIsIdle poolMaxIdletime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))
onNewConn reuseVar
else do
onLiveConn reuseVar entry {entryUseTimeNSec = now}

onLiveConn reuseVar entry = do
poolObserver (ConnectionObservation (entryId entry) InUseConnectionStatus)
sessRes <- try @SomeException (Session.run sess (entryConnection entry))

case sessRes of
Left exc -> do
returnConn
throwIO exc
Right (Left err) -> case err of
Session.QueryError _ _ (Session.ClientError _) -> do
Session.QueryError _ _ (Session.ClientError details) -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) details))))
return $ Left $ SessionUsageError err
_ -> do
returnConn
poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus)
return $ Left $ SessionUsageError err
Right (Right res) -> do
returnConn
poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus)
return $ Right res
where
returnConn =
Expand All @@ -221,6 +269,7 @@ use Pool {..} sess = do
else return $ do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason))

-- | Union over all errors that 'use' can result in.
data UsageError
Expand Down
36 changes: 36 additions & 0 deletions library/Hasql/Pool/Observation.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Hasql.Pool.Observation where

import Hasql.Pool.Prelude

data Observation
= ConnectionObservation
-- | Generated connection ID.
-- For grouping the observations by one connection.
UUID
-- | Connection status that it has entered.
ConnectionStatus
deriving (Show, Eq)

data ConnectionStatus
= -- | Connection is being established.
ConnectingConnectionStatus
| -- | Connection is established and not occupied.
ReadyForUseConnectionStatus
| -- | Is being used by some session.
--
-- After it's done the status will transition to 'ReadyForUseConnectionStatus' or 'TerminatedConnectionStatus'.
InUseConnectionStatus
| -- | Connection terminated.
TerminatedConnectionStatus ConnectionTerminationReason
deriving (Show, Eq)

data ConnectionTerminationReason
= -- | The age timeout of the connection has passed.
AgingConnectionTerminationReason
| -- | The timeout of how long a connection may remain idle in the pool has passed.
IdlenessConnectionTerminationReason
| -- | Connectivity issues with the server.
NetworkErrorConnectionTerminationReason (Maybe Text)
| -- | User has invoked the 'Hasql.Pool.release' procedure.
ReleaseConnectionTerminationReason
deriving (Show, Eq)
3 changes: 3 additions & 0 deletions library/Hasql/Pool/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Control.Monad.ST as Exports
import Data.Bifunctor as Exports
import Data.Bits as Exports
import Data.Bool as Exports
import Data.ByteString as Exports (ByteString)
import Data.Char as Exports
import Data.Coerce as Exports
import Data.Complex as Exports
Expand All @@ -40,9 +41,11 @@ import Data.Proxy as Exports
import Data.Ratio as Exports
import Data.STRef as Exports
import Data.String as Exports
import Data.Text as Exports (Text)
import Data.Time as Exports
import Data.Traversable as Exports
import Data.Tuple as Exports
import Data.UUID as Exports (UUID)
import Data.Unique as Exports
import Data.Version as Exports
import Data.Void as Exports
Expand Down
2 changes: 1 addition & 1 deletion test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ main :: IO ()
main = do
connectionSettings <- getConnectionSettings
let withPool poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings) release
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings (const (pure ()))) release
withDefaultPool =
withPool 3 10 1_800 1_800 connectionSettings

Expand Down
Loading