Skip to content

Commit

Permalink
[Turbopack] handle state serialization (#69670)
Browse files Browse the repository at this point in the history
### What?

State need some special handling for persistent caching. In particular
we need to differ between two kind of states:

* Persistent State: It will be stored in the persistent cache and value
will be kept between builds.
* Transient State: It's only valid for a session and will reset when
restoring the persistent cache.

We didn't have the separating before, so this PR introduces a new type
`TransientState<T>` next to `State<T>` which handles transient state.
The value will always be an `Option<T>` and resets to `None` when
restoring the persistent cache. This also means all task that depend on
transient state will be invalidated on restoring.

Transient State can also be used when the value is not serializable. e.
g. this is the case for the `last_successful_parse` state in ecmascript
modules. We use that to avoid large structure changes to the module
graph when introducing parse errors to modules. Using transient state
for that means we only apply this optimization on a session, and it
resets when restoring from persistent cache.
  • Loading branch information
sokra authored Sep 6, 2024
1 parent 87e91d5 commit a9bfc64
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions turbopack/crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use tokio::{
};
use tracing::Instrument;
use turbo_tasks::{
mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef, ValueToString, Vc,
mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef,
SerializationInvalidator, ValueToString, Vc,
};
use turbo_tasks_hash::{hash_xxh3_hash64, DeterministicHash, DeterministicHasher};
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};
Expand Down Expand Up @@ -106,6 +107,8 @@ pub struct DiskFileSystem {
invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
dir_invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
serialization_invalidator: SerializationInvalidator,
/// Lock that makes invalidation atomic. It will keep a write lock during
/// watcher invalidation and a read lock during other operations.
#[turbo_tasks(debug_ignore, trace_ignore)]
Expand All @@ -126,6 +129,7 @@ impl DiskFileSystem {
fn register_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
self.invalidator_map.insert(path_to_key(path), invalidator);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
Expand All @@ -140,6 +144,8 @@ impl DiskFileSystem {
let invalidator = turbo_tasks::get_invalidator();
let mut invalidator_map = self.invalidator_map.lock().unwrap();
let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into());
drop(invalidator_map);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
Expand All @@ -153,6 +159,7 @@ impl DiskFileSystem {
let invalidator = turbo_tasks::get_invalidator();
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
self.watcher.ensure_watching(path, self.root_path())?;
Ok(())
Expand All @@ -172,6 +179,7 @@ impl DiskFileSystem {
for (_, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
self.serialization_invalidator.invalidate();
}

pub fn invalidate_with_reason(&self) {
Expand All @@ -189,6 +197,7 @@ impl DiskFileSystem {
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
self.serialization_invalidator.invalidate();
}

pub fn start_watching(&self) -> Result<()> {
Expand Down Expand Up @@ -217,6 +226,7 @@ impl DiskFileSystem {
invalidator_map,
dir_invalidator_map,
)?;
self.serialization_invalidator.invalidate();

Ok(())
}
Expand Down Expand Up @@ -293,7 +303,7 @@ impl DiskFileSystem {
/// ignore specific subpaths from each.
#[turbo_tasks::function]
pub async fn new(name: RcStr, root: RcStr, ignored_subpaths: Vec<RcStr>) -> Result<Vc<Self>> {
mark_stateful();
let serialization_invalidator = mark_stateful();
// create the directory for the filesystem on disk, if it doesn't exist
fs::create_dir_all(&root).await?;

Expand All @@ -304,6 +314,7 @@ impl DiskFileSystem {
invalidation_lock: Default::default(),
invalidator_map: Arc::new(InvalidatorMap::new()),
dir_invalidator_map: Arc::new(InvalidatorMap::new()),
serialization_invalidator,
watcher: Arc::new(DiskWatcher::new(
ignored_subpaths.into_iter().map(PathBuf::from).collect(),
)),
Expand Down Expand Up @@ -561,6 +572,7 @@ impl FileSystem for DiskFileSystem {
for i in old_invalidators {
self.invalidator_map.insert(key.clone(), i);
}
self.serialization_invalidator.invalidate();
}
return Ok(Completion::unchanged());
}
Expand Down
8 changes: 8 additions & 0 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ impl TurboTasksApi for VcStorage {
unreachable!()
}

fn invalidate_serialization(&self, _task: TaskId) {
// ingore
}

fn notify_scheduled_tasks(&self) {
// ignore
}
Expand Down Expand Up @@ -309,6 +313,10 @@ impl TurboTasksApi for VcStorage {
// no-op
}

fn mark_own_task_as_dirty_when_persisted(&self, _task: TaskId) {
// no-op
}

fn detached_for_testing(
&self,
_f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ futures = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
mopa = "0.2.0"
once_cell = { workspace = true }
parking_lot = { workspace = true }
parking_lot = { workspace = true, features = ["serde"]}
pin-project-lite = { workspace = true }
regex = { workspace = true }
rustc-hash = { workspace = true }
Expand Down
15 changes: 15 additions & 0 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ pub trait Backend: Sync + Send {
fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);

fn invalidate_serialization(
&self,
_task: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
}

fn get_task_description(&self, task: TaskId) -> String;

/// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with
Expand Down Expand Up @@ -623,6 +630,14 @@ pub trait Backend: Sync + Send {
// Do nothing by default
}

fn mark_own_task_as_dirty_when_persisted(
&self,
_task: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
// Do nothing by default
}

fn create_transient_task(
&self,
task_type: TransientTaskType,
Expand Down
12 changes: 7 additions & 5 deletions turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mod raw_vc;
mod rcstr;
mod read_ref;
pub mod registry;
mod serialization_invalidation;
pub mod small_duration;
mod state;
pub mod task;
Expand Down Expand Up @@ -91,16 +92,17 @@ pub use invalidation::{
pub use join_iter_ext::{JoinIterExt, TryFlatJoinIterExt, TryJoinIterExt};
pub use magic_any::MagicAny;
pub use manager::{
dynamic_call, dynamic_this_call, emit, mark_finished, mark_stateful, prevent_gc, run_once,
run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, CurrentCellRef,
ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi, TurboTasksBackendApi,
TurboTasksBackendApiExt, TurboTasksCallApi, Unused, UpdateInfo,
dynamic_call, dynamic_this_call, emit, mark_dirty_when_persisted, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi,
TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused, UpdateInfo,
};
pub use native_function::{FunctionMeta, NativeFunction};
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
pub use read_ref::ReadRef;
use rustc_hash::FxHasher;
pub use state::State;
pub use serialization_invalidation::SerializationInvalidator;
pub use state::{State, TransientState};
pub use task::{task_input::TaskInput, SharedReference};
pub use trait_ref::{IntoTraitRef, TraitRef};
pub use turbo_tasks_macros::{function, value, value_impl, value_trait, TaskInput};
Expand Down
31 changes: 29 additions & 2 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
magic_any::MagicAny,
raw_vc::{CellId, RawVc},
registry::{self, get_function},
serialization_invalidation::SerializationInvalidator,
task::shared_reference::TypedSharedReference,
trace::TraceRawVcs,
trait_helpers::get_trait_method,
Expand Down Expand Up @@ -115,6 +116,8 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
fn invalidate(&self, task: TaskId);
fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);

fn invalidate_serialization(&self, task: TaskId);

/// Eagerly notifies all tasks that were scheduled for notifications via
/// `schedule_notify_tasks_set()`
fn notify_scheduled_tasks(&self);
Expand Down Expand Up @@ -180,6 +183,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
fn mark_own_task_as_finished(&self, task: TaskId);
fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId);

fn connect_task(&self, task: TaskId);

Expand Down Expand Up @@ -1256,6 +1260,10 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
self.backend.invalidate_task(task, self);
}

fn invalidate_serialization(&self, task: TaskId) {
self.backend.invalidate_serialization(task, self);
}

fn notify_scheduled_tasks(&self) {
let _ = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| {
let tasks = {
Expand Down Expand Up @@ -1395,6 +1403,11 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
self.backend.mark_own_task_as_finished(task, self);
}

fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId) {
self.backend
.mark_own_task_as_dirty_when_persisted(task, self);
}

/// Creates a future that inherits the current task id and task state. The current global task
/// will wait for this future to be dropped before exiting.
fn detached_for_testing(
Expand Down Expand Up @@ -1677,6 +1690,15 @@ pub fn current_task_for_testing() -> TaskId {
CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
}

/// Marks the current task as dirty when restored from persistent cache.
pub fn mark_dirty_when_persisted() {
with_turbo_tasks(|tt| {
tt.mark_own_task_as_dirty_when_persisted(current_task(
"turbo_tasks::mark_dirty_when_persisted()",
))
});
}

/// Marks the current task as finished. This excludes it from waiting for
/// strongly consistency.
pub fn mark_finished() {
Expand All @@ -1687,10 +1709,15 @@ pub fn mark_finished() {

/// Marks the current task as stateful. This prevents the tasks from being
/// dropped without persisting the state.
pub fn mark_stateful() {
/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
/// serialization of the current task cells
pub fn mark_stateful() -> SerializationInvalidator {
CURRENT_GLOBAL_TASK_STATE.with(|cell| {
let CurrentGlobalTaskState { stateful, .. } = &mut *cell.write().unwrap();
let CurrentGlobalTaskState {
stateful, task_id, ..
} = &mut *cell.write().unwrap();
*stateful = true;
SerializationInvalidator::new(*task_id)
})
}

Expand Down
95 changes: 95 additions & 0 deletions turbopack/crates/turbo-tasks/src/serialization_invalidation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::{
hash::Hash,
sync::{Arc, Weak},
};

use serde::{de::Visitor, Deserialize, Serialize};
use tokio::runtime::Handle;

use crate::{manager::with_turbo_tasks, trace::TraceRawVcs, TaskId, TurboTasksApi};

pub struct SerializationInvalidator {
task: TaskId,
turbo_tasks: Weak<dyn TurboTasksApi>,
handle: Handle,
}

impl Hash for SerializationInvalidator {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.task.hash(state);
}
}

impl PartialEq for SerializationInvalidator {
fn eq(&self, other: &Self) -> bool {
self.task == other.task
}
}

impl Eq for SerializationInvalidator {}

impl SerializationInvalidator {
pub fn invalidate(&self) {
let SerializationInvalidator {
task,
turbo_tasks,
handle,
} = self;
let _ = handle.enter();
if let Some(turbo_tasks) = turbo_tasks.upgrade() {
turbo_tasks.invalidate_serialization(*task);
}
}

pub(crate) fn new(task_id: TaskId) -> Self {
Self {
task: task_id,
turbo_tasks: with_turbo_tasks(Arc::downgrade),
handle: Handle::current(),
}
}
}

impl TraceRawVcs for SerializationInvalidator {
fn trace_raw_vcs(&self, _context: &mut crate::trace::TraceRawVcsContext) {
// nothing here
}
}

impl Serialize for SerializationInvalidator {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_newtype_struct("SerializationInvalidator", &self.task)
}
}

impl<'de> Deserialize<'de> for SerializationInvalidator {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;

impl<'de> Visitor<'de> for V {
type Value = SerializationInvalidator;

fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "an SerializationInvalidator")
}

fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(SerializationInvalidator {
task: TaskId::deserialize(deserializer)?,
turbo_tasks: with_turbo_tasks(Arc::downgrade),
handle: tokio::runtime::Handle::current(),
})
}
}
deserializer.deserialize_newtype_struct("SerializationInvalidator", V)
}
}
Loading

0 comments on commit a9bfc64

Please sign in to comment.