Skip to content

Commit

Permalink
Make etcd network resistant against crash recovery of sender
Browse files Browse the repository at this point in the history
This uses a basic persistent queue that is backed by files in a
directory. The peek/pop API ensures that files are only removed when the
consumer has processed the event.
  • Loading branch information
ch1bo committed Sep 26, 2024
1 parent 39ec1ab commit 54a86bf
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 13 deletions.
91 changes: 80 additions & 11 deletions hydra-node/src/Hydra/Network/Etcd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,30 @@ module Hydra.Network.Etcd where

import Hydra.Prelude

import Cardano.Binary (decodeFull', serialize)
import Control.Concurrent.Class.MonadSTM (MonadSTM (readTBQueue, unGetTBQueue), newTBQueueIO, writeTBQueue)
import Cardano.Binary (decodeFull', serialize, serialize')
import Control.Concurrent.Class.MonadSTM (MonadSTM (newTVarIO, peekTBQueue, readTBQueue, unGetTBQueue, writeTBQueue), modifyTVar', newTBQueueIO, writeTBQueue)
import Control.Exception (IOException)
import Data.Aeson (withObject, (.:))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (Parser, Value, parseEither)
import Data.ByteString qualified as BS
import Data.ByteString.Base16 qualified as Base16
import Data.ByteString.Base16.Lazy qualified as LBase16
import Data.ByteString.Base64 qualified as Base64
import Data.List qualified as List
import Data.Text.IO qualified as Text
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (Host (..), Network (..), NetworkCallback (..), NetworkComponent, PortNumber)
import Hydra.Node.Network (NetworkConfiguration (..))
import System.Directory (createDirectoryIfMissing, listDirectory, removeFile)
import System.FilePath ((</>))
import System.Posix (Handler (Catch), installHandler, sigTERM)
import System.Process.Typed (ExitCodeException (..), byteStringInput, createPipe, getStderr, getStdout, proc, readProcessStdout_, runProcess_, setStderr, setStdin, setStdout, stopProcess, waitExitCode, withProcessTerm, withProcessWait)

-- | Concrete network component that broadcasts messages to an etcd cluster and
-- listens for incoming messages.
withEtcdNetwork ::
(ToCBOR msg, FromCBOR msg) =>
(ToCBOR msg, FromCBOR msg, Eq msg) =>
Tracer IO Text ->
NetworkConfiguration msg ->
NetworkComponent IO msg msg ()
Expand All @@ -43,11 +46,11 @@ withEtcdNetwork tracer config callback action = do
threadDelay 2
race_ (traceStderr p) $
race_ (waitMessages clientUrl callback) $ do
queue <- newTBQueueIO 100
queue <- newPersistentQueue (persistenceDir </> "pending-broadcast") 100
race_ (broadcastMessages clientUrl port queue) $
action
Network
{ broadcast = atomically . writeTBQueue queue
{ broadcast = writePersistentQueue queue
}
where
traceStderr p =
Expand Down Expand Up @@ -94,15 +97,12 @@ withEtcdNetwork tracer config callback action = do
-- | Broadcast messages from a queue to the etcd cluster.
--
-- Retries on failure to 'putMessage' in case we are on a minority cluster.
--
-- TODO: use a persistent queue
broadcastMessages :: ToCBOR msg => String -> PortNumber -> TBQueue IO msg -> IO ()
broadcastMessages :: (ToCBOR msg, Eq msg) => String -> PortNumber -> PersistentQueue IO msg -> IO ()
broadcastMessages endpoint port queue =
forever $ do
msg <- atomically $ readTBQueue queue
putMessage endpoint port msg
msg <- peekPersistentQueue queue
(putMessage endpoint port msg >> popPersistentQueue queue msg)
`catch` \PutFailed{reason} -> do
atomically $ unGetTBQueue queue msg
putTextLn $ "put failed: " <> reason
threadDelay 1

Expand Down Expand Up @@ -199,3 +199,72 @@ parseEtcdEntry = withObject "EtcdEntry" $ \o -> do
-- HACK: lenient decoding
parseBase64 :: Text -> Parser ByteString
parseBase64 = either fail pure . Base64.decode . encodeUtf8

-- * Persistent queue

data PersistentQueue m a = PersistentQueue
{ queue :: TBQueue m (Natural, a)
, nextIx :: TVar m Natural
, directory :: FilePath
}

-- | Create a new persistent queue at file path and given capacity.
newPersistentQueue ::
(MonadSTM m, MonadIO m, FromCBOR a, MonadCatch m, MonadFail m) =>
FilePath ->
Natural ->
m (PersistentQueue m a)
newPersistentQueue path capacity = do
queue <- newTBQueueIO capacity
highestId <-
try (loadExisting queue) >>= \case
Left (_ :: IOException) -> do
liftIO $ createDirectoryIfMissing True path
pure 0
Right highest -> pure highest
nextIx <- newTVarIO $ highestId + 1
pure PersistentQueue{queue, nextIx, directory = path}
where
loadExisting queue = do
paths <- liftIO $ listDirectory path
case sort $ mapMaybe readMaybe paths of
[] -> pure 0
idxs -> do
forM_ idxs $ \(idx :: Natural) -> do
bs <- readFileBS (path </> show idx)
case decodeFull' bs of
Left err ->
fail $ "Failed to decode item: " <> show err
Right item ->
atomically $ writeTBQueue queue (idx, item)
pure $ List.last idxs

-- | Write a value to the queue, blocking if the queue is full.
writePersistentQueue :: (ToCBOR a, MonadSTM m, MonadIO m) => PersistentQueue m a -> a -> m ()
writePersistentQueue PersistentQueue{queue, nextIx, directory} item = do
next <- atomically $ do
next <- readTVar nextIx
modifyTVar' nextIx (+ 1)
pure next
writeFileBS (directory </> show next) $ serialize' item
atomically $ writeTBQueue queue (next, item)

-- | Get the next value from the queue without removing it, blocking if the
-- queue is empty.
peekPersistentQueue :: MonadSTM m => PersistentQueue m a -> m a
peekPersistentQueue PersistentQueue{queue} = do
snd <$> atomically (peekTBQueue queue)

-- | Remove an element from the queue if it matches the given item. Use
-- 'peekPersistentQueue' to wait for next items before popping it.
popPersistentQueue :: (MonadSTM m, MonadIO m, Eq a) => PersistentQueue m a -> a -> m ()
popPersistentQueue PersistentQueue{queue, directory} item = do
popped <- atomically $ do
(ix, next) <- peekTBQueue queue
if next == item
then readTBQueue queue $> Just ix
else pure Nothing
case popped of
Nothing -> pure ()
Just index -> do
liftIO . removeFile $ directory </> show index
8 changes: 6 additions & 2 deletions hydra-node/test/Hydra/NetworkSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ spec = do

it "handles broadcast to minority" $ \tracer -> do
withTempDir "test-etcd" $ \tmp -> do
putStrLn $ "tmp: " <> show tmp
failAfter 20 $ do
[port1, port2, port3] <- fmap fromIntegral <$> randomUnusedTCPPorts 3
let aliceConfig =
Expand Down Expand Up @@ -127,9 +128,12 @@ spec = do
withEtcdNetwork @Int tracer carolConfig noopCallback $ \_ -> do
threadDelay 3
putStrLn "Bob and Carol stopped"
-- Alice sends a message while she is the only online (= minority)
-- Alice sends a message while she is the only one online (= minority)
broadcast n1 123
-- Start bob and carol again
-- Now, alice stops too!
putStrLn "Alice stopped"
-- Start alice, bob and carol again
withEtcdNetwork @Int tracer aliceConfig recordReceived $ \_ -> do
withEtcdNetwork @Int tracer bobConfig noopCallback $ \_ -> do
withEtcdNetwork @Int tracer carolConfig noopCallback $ \_ -> do
-- Alice should see her own message eventually (when part of majority again)
Expand Down

0 comments on commit 54a86bf

Please sign in to comment.