Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observability #40

Merged
merged 5 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.11

- Optional observability event stream, which can be interpreted into log records and metrics.

# 0.10.1

- Avoid releasing connections on exceptions thrown in session
Expand Down
7 changes: 6 additions & 1 deletion hasql-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,18 @@ common base-settings
library
import: base-settings
hs-source-dirs: library
exposed-modules: Hasql.Pool
exposed-modules:
Hasql.Pool
Hasql.Pool.Observation

other-modules: Hasql.Pool.Prelude
build-depends:
, base >=4.11 && <5
, bytestring >=0.10 && <0.14
, hasql >=1.6.0.1 && <1.7
, stm >=2.5 && <3
, time >=1.9 && <2
, uuid >=1.3 && <2

test-suite test
import: base-settings
Expand Down
97 changes: 71 additions & 26 deletions library/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,35 @@ module Hasql.Pool

-- * Errors
UsageError (..),

-- * Observations
Observation (..),
ReleaseReason (..),
)
where

import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session

-- | A connection tagged with metadata.
data Entry = Entry
{ entryConnection :: Connection,
entryCreationTimeNSec :: Word64,
entryUseTimeNSec :: Word64
entryUseTimeNSec :: Word64,
entryId :: UUID
}

entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive maxLifetime maxIdletime now Entry {..} =
now
<= entryCreationTimeNSec
+ maxLifetime
&& now
<= entryUseTimeNSec
+ maxIdletime
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged maxLifetime now Entry {..} =
now > entryCreationTimeNSec + maxLifetime

entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle maxIdletime now Entry {..} =
now > entryUseTimeNSec + maxIdletime

-- | Pool of connections to DB.
data Pool = Pool
Expand All @@ -54,7 +60,9 @@ data Pool = Pool
-- | Whether to return a connection to the pool.
poolReuseVar :: TVar (TVar Bool),
-- | To stop the manager thread via garbage collection.
poolReaperRef :: IORef ()
poolReaperRef :: IORef (),
-- | Action for reporting the observations.
poolObserver :: Observation -> IO ()
}

-- | Create a connection-pool, with default settings.
Expand All @@ -72,9 +80,16 @@ acquire ::
DiffTime ->
-- | Connection settings.
Connection.Settings ->
-- | Observation handler.
--
-- Typically it's used for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
Comment on lines +88 to +89

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could this happen? When the pool size is big and there are lots of connections?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about the user supplied action here. It should be quick since it will block the pool processes.

(Observation -> IO ()) ->
IO Pool
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings)
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings observer =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) observer

-- | Create a connection-pool.
--
Expand All @@ -94,8 +109,15 @@ acquireDynamically ::
DiffTime ->
-- | Action fetching connection settings.
IO Connection.Settings ->
-- | Observation handler.
--
-- Use it for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings = do
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings observer = do
connectionQueue <- newTQueueIO
capVar <- newTVarIO poolSize
reuseVar <- newTVarIO =<< newTVarIO True
Expand All @@ -106,17 +128,24 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe
now <- getMonotonicTimeNSec
join . atomically $ do
entries <- flushTQueue connectionQueue
let (keep, close) = partition (entryIsAlive maxLifetimeNanos maxIdletimeNanos now) entries
traverse_ (writeTQueue connectionQueue) keep
return $ forM_ close $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
let (agedEntries, unagedEntries) = partition (entryIsAged maxLifetimeNanos now) entries
(idleEntries, liveEntries) = partition (entryIsIdle maxLifetimeNanos now) unagedEntries
traverse_ (writeTQueue connectionQueue) liveEntries
return $ do
forM_ agedEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionReleasedObservation (entryId entry) AgingReleaseReason)
forM_ idleEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason)

void . mkWeakIORef reaperRef $ do
-- When the pool goes out of scope, stop the manager.
killThread managerTid

return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef
return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef observer
where
acqTimeoutMicros =
div (fromIntegral (diffTimeToPicoseconds acqTimeout)) 1_000_000
Expand All @@ -143,6 +172,7 @@ release Pool {..} =
return $ forM_ entries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason)

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
Expand All @@ -152,8 +182,7 @@ release Pool {..} =
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
--
-- __Warning:__ Due to the mechanism mentioned above you should avoid consuming
-- errors within sessions.
-- __Warning:__ Due to the mechanism mentioned above you should avoid intercepting this error type from within sessions.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use Pool {..} sess = do
timeout <- do
Expand All @@ -180,20 +209,33 @@ use Pool {..} sess = do
onNewConn reuseVar = do
settings <- poolFetchConnectionSettings
now <- getMonotonicTimeNSec
id <- Uuid.nextRandom
poolObserver (AttemptingToConnectObservation id)
connRes <- Connection.acquire settings
case connRes of
Left connErr -> do
poolObserver (FailedToConnectObservation id connErr)
atomically $ modifyTVar' poolCapacity succ
return $ Left $ ConnectionUsageError connErr
Right entry -> onLiveConn reuseVar (Entry entry now now)
Right entry -> do
poolObserver (ConnectionEstablishedObservation id)
onLiveConn reuseVar (Entry entry now now id)

onConn reuseVar entry = do
now <- getMonotonicTimeNSec
if entryIsAlive poolMaxLifetime poolMaxIdletime now entry
then onLiveConn reuseVar entry {entryUseTimeNSec = now}
else do
if entryIsAged poolMaxLifetime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionReleasedObservation (entryId entry) AgingReleaseReason)
onNewConn reuseVar
else
if entryIsIdle poolMaxIdletime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionReleasedObservation (entryId entry) IdlenessReleaseReason)
onNewConn reuseVar
else do
onLiveConn reuseVar entry {entryUseTimeNSec = now}

onLiveConn reuseVar entry = do
sessRes <- try @SomeException (Session.run sess (entryConnection entry))
Expand All @@ -203,8 +245,10 @@ use Pool {..} sess = do
returnConn
throwIO exc
Right (Left err) -> case err of
Session.QueryError _ _ (Session.ClientError _) -> do
Session.QueryError _ _ (Session.ClientError details) -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionReleasedObservation (entryId entry) (TransportErrorReleaseReason details))
return $ Left $ SessionUsageError err
_ -> do
returnConn
Expand All @@ -221,6 +265,7 @@ use Pool {..} sess = do
else return $ do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionReleasedObservation (entryId entry) ReleaseActionCallReleaseReason)

-- | Union over all errors that 'use' can result in.
data UsageError
Expand Down
17 changes: 17 additions & 0 deletions library/Hasql/Pool/Observation.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Hasql.Pool.Observation where

import Hasql.Pool.Prelude

data Observation
= ConnectionEstablishedObservation UUID
| AttemptingToConnectObservation UUID
| FailedToConnectObservation UUID (Maybe ByteString)
| ConnectionReleasedObservation UUID ReleaseReason

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Why does the UUID need to be generated internally? (just wondering if it can be left out for simplifying the code)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for identifying the connection. Lets the user isolate events on a particular connection

deriving (Show, Eq)

data ReleaseReason
= AgingReleaseReason
| IdlenessReleaseReason
| TransportErrorReleaseReason (Maybe ByteString)
| ReleaseActionCallReleaseReason
deriving (Show, Eq)
2 changes: 2 additions & 0 deletions library/Hasql/Pool/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Control.Monad.ST as Exports
import Data.Bifunctor as Exports
import Data.Bits as Exports
import Data.Bool as Exports
import Data.ByteString as Exports (ByteString)
import Data.Char as Exports
import Data.Coerce as Exports
import Data.Complex as Exports
Expand All @@ -43,6 +44,7 @@ import Data.String as Exports
import Data.Time as Exports
import Data.Traversable as Exports
import Data.Tuple as Exports
import Data.UUID as Exports (UUID)
import Data.Unique as Exports
import Data.Version as Exports
import Data.Void as Exports
Expand Down
2 changes: 1 addition & 1 deletion test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ main :: IO ()
main = do
connectionSettings <- getConnectionSettings
let withPool poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings) release
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings (const (pure ()))) release
withDefaultPool =
withPool 3 10 1_800 1_800 connectionSettings

Expand Down
Loading