diff --git a/hydra-chain-observer/hydra-chain-observer.cabal b/hydra-chain-observer/hydra-chain-observer.cabal index 974f433d0c9..80ac1962edc 100644 --- a/hydra-chain-observer/hydra-chain-observer.cabal +++ b/hydra-chain-observer/hydra-chain-observer.cabal @@ -71,6 +71,7 @@ library , hydra-plutus , hydra-prelude , hydra-tx + , io-classes , optparse-applicative , ouroboros-network-protocols , retry diff --git a/hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs b/hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs index b07c017775d..d32fef44b68 100644 --- a/hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs +++ b/hydra-chain-observer/src/Hydra/Blockfrost/ChainObserver.hs @@ -9,7 +9,12 @@ import Blockfrost.Client ( runBlockfrost, ) import Blockfrost.Client qualified as Blockfrost -import Control.Retry (RetryPolicyM, RetryStatus, exponentialBackoff, limitRetries, retrying) +import Control.Concurrent.Class.MonadSTM ( + MonadSTM (readTVarIO), + newTVarIO, + writeTVar, + ) +import Control.Retry (RetryPolicyM, RetryStatus (..), exponentialBackoff, limitRetries, retrying) import Hydra.Cardano.Api ( BlockHeader, ChainPoint (..), @@ -50,7 +55,7 @@ runBlockfrostM :: BlockfrostClientT IO a -> ExceptT APIBlockfrostError IO a runBlockfrostM prj action = do - result <- liftIO $ runBlockfrost prj action + result <- lift $ runBlockfrost prj action case result of Left err -> throwError (BlockfrostError $ show err) Right val -> pure val @@ -93,26 +98,43 @@ blockfrostClient tracer projectPath startFromBlockHash = do let blockTime = realToFrac _genesisSlotLength / realToFrac _genesisActiveSlotsCoefficient + stateTVar <- newTVarIO (block, mempty) void $ - retrying retryPolicy shouldRetry $ \_ -> + retrying retryPolicy shouldRetry $ \RetryStatus{rsIterNumber} -> do + -- XXX: wait on any iteration number, except 0 as it's the first try. + when (rsIterNumber > 0) $ threadDelay blockTime either (error . show) id - <$> runExceptT - ( do - threadDelay blockTime - loop tracer prj block networkId blockTime observerHandler mempty - ) + <$> runExceptT (loop tracer prj networkId blockTime observerHandler stateTVar) } +-- | Iterative process that follows the chain using a naive roll-forward approach, +-- keeping track of the latest known current block and UTxO view. +-- This process operates at full speed without waiting between calls, +-- favoring the catch-up process. loop :: Tracer IO ChainObserverLog -> Blockfrost.Project -> - Blockfrost.Block -> NetworkId -> DiffTime -> ObserverHandler IO -> - UTxO -> + TVar IO (Blockfrost.Block, UTxO) -> ExceptT APIBlockfrostError IO a -loop tracer prj block networkId blockTime observerHandler utxo = do +loop tracer prj networkId blockTime observerHandler stateTVar = do + current <- lift $ readTVarIO stateTVar + next <- rollForward tracer prj networkId observerHandler current + atomically $ writeTVar stateTVar next + loop tracer prj networkId blockTime observerHandler stateTVar + +-- | From the current block and UTxO view, we collect Hydra observations +-- and yield the next block and adjusted UTxO view. +rollForward :: + Tracer IO ChainObserverLog -> + Blockfrost.Project -> + NetworkId -> + ObserverHandler IO -> + (Blockfrost.Block, UTxO) -> + ExceptT APIBlockfrostError IO (Blockfrost.Block, UTxO) +rollForward tracer prj networkId observerHandler (block, utxo) = do let Blockfrost.Block { _blockHash , _blockConfirmations @@ -151,11 +173,11 @@ loop tracer prj block networkId blockTime observerHandler utxo = do then [Tick point blockNo] else observationsAt - -- [7] Loop next. + -- [7] Next. case _blockNextBlock of Just nextBlockHash -> do block' <- runBlockfrostM prj (Blockfrost.getBlock $ Right nextBlockHash) - loop tracer prj block' networkId blockTime observerHandler adjustedUTxO + pure (block', adjustedUTxO) Nothing -> throwError (MissingNextBlockHash _blockHash) diff --git a/hydra-chain-observer/src/Hydra/ChainObserver.hs b/hydra-chain-observer/src/Hydra/ChainObserver.hs index a0dbfff7553..39289088bfa 100644 --- a/hydra-chain-observer/src/Hydra/ChainObserver.hs +++ b/hydra-chain-observer/src/Hydra/ChainObserver.hs @@ -23,4 +23,5 @@ main observerHandler = do follow networkId startChainFrom observerHandler BlockfrostOptions{projectPath, startFromBlockHash} -> do let NodeClient{follow} = blockfrostClient tracer projectPath startFromBlockHash + -- FIXME! follow (error "not-used") (error "not-used") observerHandler