Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] handle state serialization #69670

Merged
merged 4 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions turbopack/crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use tokio::{
};
use tracing::Instrument;
use turbo_tasks::{
mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef, ValueToString, Vc,
mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef,
SerializationInvalidator, ValueToString, Vc,
};
use turbo_tasks_hash::{hash_xxh3_hash64, DeterministicHash, DeterministicHasher};
use util::{extract_disk_access, join_path, normalize_path, sys_to_unix, unix_to_sys};
Expand Down Expand Up @@ -106,6 +107,8 @@ pub struct DiskFileSystem {
invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
dir_invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
serialization_invalidator: SerializationInvalidator,
/// Lock that makes invalidation atomic. It will keep a write lock during
/// watcher invalidation and a read lock during other operations.
#[turbo_tasks(debug_ignore, trace_ignore)]
Expand All @@ -126,6 +129,7 @@ impl DiskFileSystem {
fn register_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
self.invalidator_map.insert(path_to_key(path), invalidator);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
Expand All @@ -140,6 +144,8 @@ impl DiskFileSystem {
let invalidator = turbo_tasks::get_invalidator();
let mut invalidator_map = self.invalidator_map.lock().unwrap();
let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into());
drop(invalidator_map);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
Expand All @@ -153,6 +159,7 @@ impl DiskFileSystem {
let invalidator = turbo_tasks::get_invalidator();
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
self.serialization_invalidator.invalidate();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
self.watcher.ensure_watching(path, self.root_path())?;
Ok(())
Expand All @@ -172,6 +179,7 @@ impl DiskFileSystem {
for (_, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
self.serialization_invalidator.invalidate();
}

pub fn invalidate_with_reason(&self) {
Expand All @@ -189,6 +197,7 @@ impl DiskFileSystem {
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
self.serialization_invalidator.invalidate();
}

pub fn start_watching(&self) -> Result<()> {
Expand Down Expand Up @@ -217,6 +226,7 @@ impl DiskFileSystem {
invalidator_map,
dir_invalidator_map,
)?;
self.serialization_invalidator.invalidate();

Ok(())
}
Expand Down Expand Up @@ -293,7 +303,7 @@ impl DiskFileSystem {
/// ignore specific subpaths from each.
#[turbo_tasks::function]
pub async fn new(name: RcStr, root: RcStr, ignored_subpaths: Vec<RcStr>) -> Result<Vc<Self>> {
mark_stateful();
let serialization_invalidator = mark_stateful();
// create the directory for the filesystem on disk, if it doesn't exist
fs::create_dir_all(&root).await?;

Expand All @@ -304,6 +314,7 @@ impl DiskFileSystem {
invalidation_lock: Default::default(),
invalidator_map: Arc::new(InvalidatorMap::new()),
dir_invalidator_map: Arc::new(InvalidatorMap::new()),
serialization_invalidator,
watcher: Arc::new(DiskWatcher::new(
ignored_subpaths.into_iter().map(PathBuf::from).collect(),
)),
Expand Down Expand Up @@ -561,6 +572,7 @@ impl FileSystem for DiskFileSystem {
for i in old_invalidators {
self.invalidator_map.insert(key.clone(), i);
}
self.serialization_invalidator.invalidate();
}
return Ok(Completion::unchanged());
}
Expand Down
8 changes: 8 additions & 0 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ impl TurboTasksApi for VcStorage {
unreachable!()
}

fn invalidate_serialization(&self, _task: TaskId) {
// ingore
}

fn notify_scheduled_tasks(&self) {
// ignore
}
Expand Down Expand Up @@ -309,6 +313,10 @@ impl TurboTasksApi for VcStorage {
// no-op
}

fn mark_own_task_as_dirty_when_persisted(&self, _task: TaskId) {
// no-op
}

fn detached_for_testing(
&self,
_f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ futures = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
mopa = "0.2.0"
once_cell = { workspace = true }
parking_lot = { workspace = true }
parking_lot = { workspace = true, features = ["serde"]}
pin-project-lite = { workspace = true }
regex = { workspace = true }
rustc-hash = { workspace = true }
Expand Down
15 changes: 15 additions & 0 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ pub trait Backend: Sync + Send {
fn invalidate_tasks(&self, tasks: &[TaskId], turbo_tasks: &dyn TurboTasksBackendApi<Self>);
fn invalidate_tasks_set(&self, tasks: &TaskIdSet, turbo_tasks: &dyn TurboTasksBackendApi<Self>);

fn invalidate_serialization(
&self,
_task: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
}

fn get_task_description(&self, task: TaskId) -> String;

/// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with
Expand Down Expand Up @@ -623,6 +630,14 @@ pub trait Backend: Sync + Send {
// Do nothing by default
}

fn mark_own_task_as_dirty_when_persisted(
&self,
_task: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
// Do nothing by default
}

fn create_transient_task(
&self,
task_type: TransientTaskType,
Expand Down
12 changes: 7 additions & 5 deletions turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mod raw_vc;
mod rcstr;
mod read_ref;
pub mod registry;
mod serialization_invalidation;
pub mod small_duration;
mod state;
pub mod task;
Expand Down Expand Up @@ -91,16 +92,17 @@ pub use invalidation::{
pub use join_iter_ext::{JoinIterExt, TryFlatJoinIterExt, TryJoinIterExt};
pub use magic_any::MagicAny;
pub use manager::{
dynamic_call, dynamic_this_call, emit, mark_finished, mark_stateful, prevent_gc, run_once,
run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, CurrentCellRef,
ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi, TurboTasksBackendApi,
TurboTasksBackendApiExt, TurboTasksCallApi, Unused, UpdateInfo,
dynamic_call, dynamic_this_call, emit, mark_dirty_when_persisted, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi,
TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused, UpdateInfo,
};
pub use native_function::{FunctionMeta, NativeFunction};
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
pub use read_ref::ReadRef;
use rustc_hash::FxHasher;
pub use state::State;
pub use serialization_invalidation::SerializationInvalidator;
pub use state::{State, TransientState};
pub use task::{task_input::TaskInput, SharedReference};
pub use trait_ref::{IntoTraitRef, TraitRef};
pub use turbo_tasks_macros::{function, value, value_impl, value_trait, TaskInput};
Expand Down
31 changes: 29 additions & 2 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
magic_any::MagicAny,
raw_vc::{CellId, RawVc},
registry::{self, get_function},
serialization_invalidation::SerializationInvalidator,
task::shared_reference::TypedSharedReference,
trace::TraceRawVcs,
trait_helpers::get_trait_method,
Expand Down Expand Up @@ -115,6 +116,8 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
fn invalidate(&self, task: TaskId);
fn invalidate_with_reason(&self, task: TaskId, reason: StaticOrArc<dyn InvalidationReason>);

fn invalidate_serialization(&self, task: TaskId);

/// Eagerly notifies all tasks that were scheduled for notifications via
/// `schedule_notify_tasks_set()`
fn notify_scheduled_tasks(&self);
Expand Down Expand Up @@ -180,6 +183,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send {
fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent>;
fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent);
fn mark_own_task_as_finished(&self, task: TaskId);
fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId);

fn connect_task(&self, task: TaskId);

Expand Down Expand Up @@ -1256,6 +1260,10 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
self.backend.invalidate_task(task, self);
}

fn invalidate_serialization(&self, task: TaskId) {
self.backend.invalidate_serialization(task, self);
}

fn notify_scheduled_tasks(&self) {
let _ = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| {
let tasks = {
Expand Down Expand Up @@ -1395,6 +1403,11 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
self.backend.mark_own_task_as_finished(task, self);
}

fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId) {
self.backend
.mark_own_task_as_dirty_when_persisted(task, self);
}

/// Creates a future that inherits the current task id and task state. The current global task
/// will wait for this future to be dropped before exiting.
fn detached_for_testing(
Expand Down Expand Up @@ -1677,6 +1690,15 @@ pub fn current_task_for_testing() -> TaskId {
CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.read().unwrap().task_id)
}

/// Marks the current task as dirty when restored from persistent cache.
pub fn mark_dirty_when_persisted() {
with_turbo_tasks(|tt| {
tt.mark_own_task_as_dirty_when_persisted(current_task(
"turbo_tasks::mark_dirty_when_persisted()",
))
});
}

/// Marks the current task as finished. This excludes it from waiting for
/// strongly consistency.
pub fn mark_finished() {
Expand All @@ -1687,10 +1709,15 @@ pub fn mark_finished() {

/// Marks the current task as stateful. This prevents the tasks from being
/// dropped without persisting the state.
pub fn mark_stateful() {
/// Returns a [`SerializationInvalidator`] that can be used to invalidate the
/// serialization of the current task cells
pub fn mark_stateful() -> SerializationInvalidator {
CURRENT_GLOBAL_TASK_STATE.with(|cell| {
let CurrentGlobalTaskState { stateful, .. } = &mut *cell.write().unwrap();
let CurrentGlobalTaskState {
stateful, task_id, ..
} = &mut *cell.write().unwrap();
*stateful = true;
SerializationInvalidator::new(*task_id)
})
}

Expand Down
95 changes: 95 additions & 0 deletions turbopack/crates/turbo-tasks/src/serialization_invalidation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::{
hash::Hash,
sync::{Arc, Weak},
};

use serde::{de::Visitor, Deserialize, Serialize};
use tokio::runtime::Handle;

use crate::{manager::with_turbo_tasks, trace::TraceRawVcs, TaskId, TurboTasksApi};

pub struct SerializationInvalidator {
task: TaskId,
turbo_tasks: Weak<dyn TurboTasksApi>,
handle: Handle,
}

impl Hash for SerializationInvalidator {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.task.hash(state);
}
}

impl PartialEq for SerializationInvalidator {
fn eq(&self, other: &Self) -> bool {
self.task == other.task
}
}

impl Eq for SerializationInvalidator {}

impl SerializationInvalidator {
pub fn invalidate(&self) {
let SerializationInvalidator {
task,
turbo_tasks,
handle,
} = self;
let _ = handle.enter();
if let Some(turbo_tasks) = turbo_tasks.upgrade() {
turbo_tasks.invalidate_serialization(*task);
}
}

pub(crate) fn new(task_id: TaskId) -> Self {
Self {
task: task_id,
turbo_tasks: with_turbo_tasks(Arc::downgrade),
handle: Handle::current(),
}
}
}

impl TraceRawVcs for SerializationInvalidator {
fn trace_raw_vcs(&self, _context: &mut crate::trace::TraceRawVcsContext) {
// nothing here
}
}

impl Serialize for SerializationInvalidator {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_newtype_struct("SerializationInvalidator", &self.task)
}
}

impl<'de> Deserialize<'de> for SerializationInvalidator {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;

impl<'de> Visitor<'de> for V {
type Value = SerializationInvalidator;

fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "an SerializationInvalidator")
}

fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(SerializationInvalidator {
task: TaskId::deserialize(deserializer)?,
turbo_tasks: with_turbo_tasks(Arc::downgrade),
handle: tokio::runtime::Handle::current(),
})
}
}
deserializer.deserialize_newtype_struct("SerializationInvalidator", V)
}
}
Loading
Loading