Skip to content

Commit

Permalink
add thread to do shake restart
Browse files Browse the repository at this point in the history
  • Loading branch information
soulomoon committed May 26, 2024
1 parent e32468d commit 15f9892
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 72 deletions.
1 change: 1 addition & 0 deletions ghcide/ghcide.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ library
Development.IDE.Core.Shake
Development.IDE.Core.Tracing
Development.IDE.Core.UseStale
Development.IDE.Core.Thread
Development.IDE.GHC.Compat
Development.IDE.GHC.Compat.Core
Development.IDE.GHC.Compat.CmdLine
Expand Down
75 changes: 36 additions & 39 deletions ghcide/session-loader/Development/IDE/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ module Development.IDE.Session
,loadSessionWithOptions
,setInitialDynFlags
,getHieDbLoc
,runWithDb
-- ,runWithDb
,retryOnSqliteBusy
,retryOnException
,Log(..)
,dbThreadRun
,WithHieDbShield(..)
) where

-- Unfortunately, we cannot use loadSession with ghc-lib since hie-bios uses
Expand Down Expand Up @@ -121,6 +123,9 @@ import qualified Data.Set as OS
import qualified Development.IDE.GHC.Compat.Util as Compat
import GHC.Data.Graph.Directed

import Control.Monad.Cont (ContT (ContT), evalContT)
import Development.IDE.Core.Thread (ThreadRun (..),
runInThread)
import GHC.Data.Bag
import GHC.Driver.Env (hsc_all_home_unit_ids)
import GHC.Driver.Errors.Types
Expand Down Expand Up @@ -370,48 +375,37 @@ makeWithHieDbRetryable :: RandomGen g => Recorder (WithPriority Log) -> g -> Hie
makeWithHieDbRetryable recorder rng hieDb f =
retryOnSqliteBusy recorder rng (f hieDb)

-- | Wraps `withHieDb` to provide a database connection for reading, and a `HieWriterChan` for
-- writing. Actions are picked off one by one from the `HieWriterChan` and executed in serial
-- by a worker thread using a dedicated database connection.
-- This is done in order to serialize writes to the database, or else SQLite becomes unhappy
runWithDb :: Recorder (WithPriority Log) -> FilePath -> (WithHieDb -> IndexQueue -> IO ()) -> IO ()
runWithDb recorder fp k = do
-- use non-deterministic seed because maybe multiple HLS start at same time
-- and send bursts of requests
rng <- Random.newStdGen
-- Delete the database if it has an incompatible schema version
retryOnSqliteBusy
recorder
rng
(withHieDb fp (const $ pure ()) `Safe.catch` \IncompatibleSchemaVersion{} -> removeFile fp)

withHieDb fp $ \writedb -> do
-- the type signature is necessary to avoid concretizing the tyvar
-- e.g. `withWriteDbRetryable initConn` without type signature will
-- instantiate tyvar `a` to `()`
let withWriteDbRetryable :: WithHieDb
withWriteDbRetryable = makeWithHieDbRetryable recorder rng writedb
withWriteDbRetryable initConn

chan <- newTQueueIO

withAsync (writerThread withWriteDbRetryable chan) $ \_ -> do
withHieDb fp (\readDb -> k (makeWithHieDbRetryable recorder rng readDb) chan)
where
writerThread :: WithHieDb -> IndexQueue -> IO ()
writerThread withHieDbRetryable chan = do
-- Clear the index of any files that might have been deleted since the last run
_ <- withHieDbRetryable deleteMissingRealFiles
_ <- withHieDbRetryable garbageCollectTypeNames
forever $ do
l <- atomically $ readTQueue chan
-- TODO: probably should let exceptions be caught/logged/handled by top level handler
l withHieDbRetryable
dbThreadRun ::
ThreadRun
(Recorder (WithPriority Log), FilePath)
WithHieDbShield
WithHieDbShield
(((HieDb -> IO a) -> IO a) -> IO ())
dbThreadRun = ThreadRun {
tRunner = \(recorder, _fp) (WithHieDbShield withWriter) l -> l withWriter
`Safe.catch` \e@SQLError{} -> do
logWith recorder Error $ LogHieDbWriterThreadSQLiteError e
`Safe.catchAny` \f -> do
logWith recorder Error $ LogHieDbWriterThreadException f

,
tCreateResource = \(recorder, fp) f -> do
rng <- Random.newStdGen
retryOnSqliteBusy
recorder
rng
(withHieDb fp (const $ pure ()) `Safe.catch` \IncompatibleSchemaVersion{} -> removeFile fp)
evalContT $ do
writedb <- ContT $ withHieDb fp
readDb <- ContT $ withHieDb fp
let withWriteDbRetryable :: WithHieDb
withWriteDbRetryable = makeWithHieDbRetryable recorder rng writedb
liftIO $ withWriteDbRetryable initConn
liftIO $ f (WithHieDbShield withWriteDbRetryable) (WithHieDbShield (makeWithHieDbRetryable recorder rng readDb))
}
-- | Wraps `withHieDb` to provide a database connection for reading, and a `HieWriterChan` for
-- writing. Actions are picked off one by one from the `HieWriterChan` and executed in serial
-- by a worker thread using a dedicated database connection.
-- This is done in order to serialize writes to the database, or else SQLite becomes unhappy

getHieDbLoc :: FilePath -> IO FilePath
getHieDbLoc dir = do
Expand All @@ -437,6 +431,9 @@ getHieDbLoc dir = do
loadSession :: Recorder (WithPriority Log) -> FilePath -> IO (Action IdeGhcSession)
loadSession recorder = loadSessionWithOptions recorder def

-- used to smuggle RankNType WithHieDb through dbMVar
newtype WithHieDbShield = WithHieDbShield WithHieDb

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} dir = do
cradle_files <- newIORef []
Expand Down
4 changes: 3 additions & 1 deletion ghcide/src/Development/IDE/Core/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import Ide.Plugin.Config
import qualified Language.LSP.Protocol.Types as LSP
import qualified Language.LSP.Server as LSP

import Control.Concurrent.STM (TQueue)
import Control.Monad
import qualified Development.IDE.Core.FileExists as FileExists
import qualified Development.IDE.Core.OfInterest as OfInterest
Expand All @@ -53,6 +54,7 @@ instance Pretty Log where
LogOfInterest msg -> pretty msg
LogFileExists msg -> pretty msg


------------------------------------------------------------
-- Exposed API

Expand All @@ -65,7 +67,7 @@ initialise :: Recorder (WithPriority Log)
-> Debouncer LSP.NormalizedUri
-> IdeOptions
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> Monitoring
-> IO IdeState
initialise recorder defaultConfig plugins mainRule lspEnv debouncer options withHieDb hiedbChan metrics = do
Expand Down
69 changes: 46 additions & 23 deletions ghcide/src/Development/IDE/Core/Shake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ module Development.IDE.Core.Shake(
garbageCollectDirtyKeysOlderThan,
Log(..),
VFSModified(..), getClientConfigAction,
ThreadQueue(..)
) where

import Control.Concurrent.Async
Expand Down Expand Up @@ -182,6 +183,9 @@ import Development.IDE.GHC.Compat (NameCacheUpdater (NCU),
#endif

#if MIN_VERSION_ghc(9,3,0)
import Control.Concurrent.STM (atomically,
writeTQueue)
import Development.IDE.Core.Thread
import Development.IDE.GHC.Compat (NameCacheUpdater)
#endif

Expand Down Expand Up @@ -262,6 +266,12 @@ data HieDbWriter
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- storing semantic tokens cache for each file in shakeExtras might
Expand Down Expand Up @@ -334,6 +344,10 @@ data ShakeExtras = ShakeExtras
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
-- ^ Queue of loader actions to be run.
}

type WithProgressFunc = forall a.
Expand Down Expand Up @@ -619,15 +633,18 @@ shakeOpen :: Recorder (WithPriority Log)
-> IdeReportProgress
-> IdeTesting
-> WithHieDb
-> IndexQueue
-> ThreadQueue
-> ShakeOptions
-> Monitoring
-> Rules ()
-> IO IdeState
shakeOpen recorder lspEnv defaultConfig idePlugins debouncer
shakeProfileDir (IdeReportProgress reportProgress)
ideTesting@(IdeTesting testing)
withHieDb indexQueue opts monitoring rules = mdo
withHieDb threadQueue opts monitoring rules = mdo
let indexQueue = tIndexQueue threadQueue
restartQueue = tRestartQueue threadQueue
loaderQueue = tLoaderQueue threadQueue

#if MIN_VERSION_ghc(9,3,0)
ideNc <- initNameCache 'r' knownKeyNames
Expand Down Expand Up @@ -752,31 +769,37 @@ delayedAction a = do
extras <- ask
liftIO $ shakeEnqueue extras a


-- | Restart the current 'ShakeSession' with the given system actions.
-- Any actions running in the current session will be aborted,
-- but actions added via 'shakeEnqueue' will be requeued.
shakeRestart :: Recorder (WithPriority Log) -> IdeState -> VFSModified -> String -> [DelayedAction ()] -> IO [Key] -> IO ()
shakeRestart recorder IdeState{..} vfs reason acts ioActionBetweenShakeSession =
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
shakeRestart recorder IdeState{..} vfs reason acts ioActionBetweenShakeSession = do
b <- newBarrier
atomically $ writeTQueue (restartQueue shakeExtras) $ do
withMVar'
shakeSession
(\runner -> do
(stopTime,()) <- duration $ logErrorAfter 10 $ cancelShakeSession runner
keys <- ioActionBetweenShakeSession
-- it is every important to update the dirty keys after we enter the critical section
-- see Note [Housekeeping rule cache and dirty key outside of hls-graph]
atomically $ modifyTVar' (dirtyKeys shakeExtras) $ \x -> foldl' (flip insertKeySet) x keys
res <- shakeDatabaseProfile shakeDb
backlog <- readTVarIO $ dirtyKeys shakeExtras
queue <- atomicallyNamed "actionQueue - peek" $ peekInProgress $ actionQueue shakeExtras

-- this log is required by tests
logWith recorder Debug $ LogBuildSessionRestart reason queue backlog stopTime res
)
-- It is crucial to be masked here, otherwise we can get killed
-- between spawning the new thread and updating shakeSession.
-- See https://github.com/haskell/ghcide/issues/79
(\() -> do
(,()) <$> newSession recorder shakeExtras vfs shakeDb acts reason)
-- fill barrier to signal that the restart is done
signalBarrier b ()
waitBarrier b
where
logErrorAfter :: Seconds -> IO () -> IO ()
logErrorAfter seconds action = flip withAsync (const action) $ do
Expand Down
28 changes: 28 additions & 0 deletions ghcide/src/Development/IDE/Core/Thread.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module Development.IDE.Core.Thread where

Check warning on line 1 in ghcide/src/Development/IDE/Core/Thread.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in module Development.IDE.Core.Thread: Use module export list ▫︎ Found: "module Development.IDE.Core.Thread where" ▫︎ Perhaps: "module Development.IDE.Core.Thread (\n module Development.IDE.Core.Thread\n ) where" ▫︎ Note: an explicit list is usually better
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad (forever)


data ThreadRun input threadResource resource arg = ThreadRun {
tCreateResource ::
input -- ^ input of running
-> (threadResource -> resource -> IO ()) -- ^ the long running action
-> IO (),
tRunner -- ^ run a single action with writer resource
:: input -- ^ input of running
-> threadResource -- ^ writer resource
-> arg -- ^ argument to run
-> IO ()
}

runInThread :: ThreadRun input threadResource resource arg -> input -> ((resource, TQueue arg) -> IO ()) -> IO ()
runInThread ThreadRun{..} ip f = do
tCreateResource ip $ \w r -> do
q <- newTQueueIO
withAsync (writerThread w q) $ \_ -> f (r, q)
where
writerThread r q =
forever $ do
l <- atomically $ readTQueue q
tRunner ip r l
39 changes: 33 additions & 6 deletions ghcide/src/Development/IDE/LSP/LanguageServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ module Development.IDE.LSP.LanguageServer
( runLanguageServer
, setupLSP
, Log(..)
, ThreadQueue
, sessionRestartThread
, sessionLoaderThread
, runWithDb
) where

import Control.Concurrent.STM
Expand All @@ -21,7 +25,8 @@ import Data.Maybe
import qualified Data.Set as Set
import qualified Data.Text as T
import Development.IDE.LSP.Server
import Development.IDE.Session (runWithDb)
import Development.IDE.Session (WithHieDbShield (..),
dbThreadRun)
import Ide.Types (traceWithSpan)
import Language.LSP.Protocol.Message
import Language.LSP.Protocol.Types
Expand All @@ -33,9 +38,13 @@ import UnliftIO.Directory
import UnliftIO.Exception

import qualified Colog.Core as Colog
import Control.Monad.Cont (ContT (ContT),
evalContT)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Development.IDE.Core.IdeConfiguration
import Development.IDE.Core.Shake hiding (Log)
import Development.IDE.Core.Thread (ThreadRun (..),
runInThread)
import Development.IDE.Core.Tracing
import qualified Development.IDE.Session as Session
import Development.IDE.Types.Shake (WithHieDb)
Expand Down Expand Up @@ -77,8 +86,6 @@ instance Pretty Log where
LogLspServer msg -> pretty msg
LogServerShutdownMessage -> "Received shutdown message"

-- used to smuggle RankNType WithHieDb through dbMVar
newtype WithHieDbShield = WithHieDbShield WithHieDb

runLanguageServer
:: forall config a m. (Show config)
Expand Down Expand Up @@ -129,7 +136,7 @@ setupLSP ::
Recorder (WithPriority Log)
-> (FilePath -> IO FilePath) -- ^ Map root paths to the location of the hiedb for the project
-> LSP.Handlers (ServerM config)
-> (LSP.LanguageContextEnv config -> Maybe FilePath -> WithHieDb -> IndexQueue -> IO IdeState)
-> (LSP.LanguageContextEnv config -> Maybe FilePath -> WithHieDb -> ThreadQueue -> IO IdeState)
-> MVar ()
-> IO (LSP.LanguageContextEnv config -> TRequestMessage Method_Initialize -> IO (Either err (LSP.LanguageContextEnv config, IdeState)),
LSP.Handlers (ServerM config),
Expand Down Expand Up @@ -187,7 +194,7 @@ setupLSP recorder getHieDbLoc userHandlers getIdeState clientMsgVar = do
handleInit
:: Recorder (WithPriority Log)
-> (FilePath -> IO FilePath)
-> (LSP.LanguageContextEnv config -> Maybe FilePath -> WithHieDb -> IndexQueue -> IO IdeState)
-> (LSP.LanguageContextEnv config -> Maybe FilePath -> WithHieDb -> ThreadQueue -> IO IdeState)
-> MVar ()
-> IO ()
-> (SomeLspId -> IO ())
Expand Down Expand Up @@ -240,12 +247,32 @@ handleInit recorder getHieDbLoc getIdeState lifetime exitClientMsg clearReqId wa
ReactorRequest _id act k -> void $ async $ checkCancelled _id act k
logWith recorder Info LogReactorThreadStopped

(WithHieDbShield withHieDb,hieChan) <- takeMVar dbMVar
(WithHieDbShield withHieDb, hieChan) <- takeMVar dbMVar
ide <- getIdeState env root withHieDb hieChan
registerIdeConfiguration (shakeExtras ide) initConfig
pure $ Right (env,ide)


runWithDb :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO ()
runWithDb recorder dbLoc f = evalContT $ do
(_, sessionRestartTQueue) <- ContT $ runInThread sessionRestartThread ()
(_, sessionLoaderTQueue) <- ContT $ runInThread sessionLoaderThread ()
(WithHieDbShield hiedb, hieChan) <- ContT $ runInThread dbThreadRun (recorder, dbLoc)
liftIO $ f hiedb (ThreadQueue hieChan sessionRestartTQueue sessionLoaderTQueue)


sessionRestartThread :: ThreadRun () () () (IO ())
sessionRestartThread = ThreadRun {
tRunner = \_ _ run -> run,
tCreateResource = \_ f -> do f () ()
}

sessionLoaderThread :: ThreadRun () () () (IO ())
sessionLoaderThread = ThreadRun {
tRunner = \_ _ run -> run,
tCreateResource = \_ f -> do f () ()
}

-- | Runs the action until it ends or until the given MVar is put.
-- Rethrows any exceptions.
untilMVar :: MonadUnliftIO m => MVar () -> m () -> m ()
Expand Down
Loading

0 comments on commit 15f9892

Please sign in to comment.