Skip to content

Commit

Permalink
enhance: batch ops support batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
kingwingfly committed Jun 13, 2024
1 parent 76c2551 commit af8336e
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repository = "https://github.com/kingwingfly/fav"
documentation = ""

[workspace.dependencies]
fav_core = { path = "fav_core", version = "0.1.1" }
fav_core = { path = "fav_core", version = "0.1.2" }
fav_derive = { path = "fav_derive", version = "0.0.1" }
fav_utils = { path = "fav_utils", version = "0.0.11" }
fav_cli = { path = "fav_cli", version = "0.2.28" }
Expand Down
10 changes: 5 additions & 5 deletions fav_cli/src/bili/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ pub(super) async fn fetch(sets: &mut BiliSets) -> FavCoreResult<()> {
let bili = Bili::read()?;
bili.fetch_sets(sets).await?;
let mut sub = sets.subset(|s| s.check_status(StatusFlags::TRACK));
bili.batch_fetch_set(&mut sub).await?;
bili.batch_fetch_set(&mut sub, 8).await?;
for set in sub.iter_mut() {
let mut sub = set.subset(|r| {
r.check_status(StatusFlags::TRACK)
& !r.check_status(StatusFlags::FETCHED)
& !r.check_status(StatusFlags::EXPIRED)
});
bili.batch_fetch_res(&mut sub).await?;
bili.batch_fetch_res(&mut sub, 8).await?;
}
Ok(())
}
Expand Down Expand Up @@ -154,7 +154,7 @@ pub(super) async fn pull_all(sets: &mut BiliSets) -> FavCoreResult<()> {
& !r.check_status(StatusFlags::EXPIRED)
& r.check_status(StatusFlags::TRACK | StatusFlags::FETCHED)
});
bili.batch_pull_res(&mut sub).await?;
bili.batch_pull_res(&mut sub, 8).await?;
}
Ok(())
}
Expand All @@ -170,7 +170,7 @@ pub(super) async fn pull(sets: &mut BiliSets, ids: Vec<String>) -> FavCoreResult
& !r.check_status(StatusFlags::EXPIRED)
& r.check_status(StatusFlags::TRACK | StatusFlags::FETCHED)
});
bili.batch_pull_res(&mut sub).await?;
bili.batch_pull_res(&mut sub, 8).await?;
}
}
let mut sub = sets.subset(|s| s.check_status(StatusFlags::TRACK));
Expand All @@ -181,7 +181,7 @@ pub(super) async fn pull(sets: &mut BiliSets, ids: Vec<String>) -> FavCoreResult
& r.check_status(StatusFlags::TRACK | StatusFlags::FETCHED)
& ids.contains(&r.id())
});
bili.batch_pull_res(&mut sub).await?;
bili.batch_pull_res(&mut sub, 8).await?;
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion fav_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fav_core"
version = "0.1.1"
version = "0.1.2"
authors.workspace = true
description = "Fav's core crate; A collection of traits."
license.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion fav_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ An example can be found in [fav](https://github.com/kingwingfly/fav) repo.

# CHANGELOG

- 0.0.X -> 0.1.X: `Ops` related traits' methods need `F: Fn() -> Future<...>`, if Future is ready, one can cleanup, shutdown gracefully and return `FavCoreError::Cancel`. And `OpsExt` methods handle SIGINT based on this, keeps things reliable.
- 0.1.1 -> 0.1.2: `XXOpsExt` needs `batch_size` passed so that users can define the number of jobs concurrently.
- 0.0.X -> 0.1.X: `Ops` related traits' methods need `F: Fn() -> Future<...>`, if Future is ready, one can cleanup, shutdown gracefully and return `FavCoreError::Cancel`. And `OpsExt` methods handle SIGINT based on this, keeps things reliable.
46 changes: 33 additions & 13 deletions fav_core/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,32 @@ pub trait LocalResOps: Net + HttpConfig {
/// let app = App::default();
/// let mut sets = TestSets::default();
/// let mut sub = sets.subset(|r| r.check_status(StatusFlags::TRACK));
/// app.batch_fetch_set(&mut sub);
/// app.batch_fetch_set(&mut sub, 8);
/// # };
/// ```
pub trait SetOpsExt: SetOps {
/// **Asynchronously** fetch sets in sets using [`SetOps::fetch_set`].
fn batch_fetch_set<'a, SS>(&self, sets: &'a mut SS) -> impl Future<Output = FavCoreResult<()>>
fn batch_fetch_set<'a, SS>(
&self,
sets: &'a mut SS,
batch_size: usize,
) -> impl Future<Output = FavCoreResult<()>>
where
SS: Sets<Set = Self::Set>,
{
batch_op_set(sets, |s, fut| self.fetch_set(s, fut))
batch_op_set(sets, |s, fut| self.fetch_set(s, fut), batch_size)
}
}

/// A helper function to batch do operation on sets.
/// You can use it like [`batch_op_set`]
/// However, it's better to use [`Sets::subset`] and [`SetOpsExt`] instead.
/// See [`SetOpsExt`] for more information.
pub async fn batch_op_set<'a, SS, F, T>(sets: &'a mut SS, mut f: F) -> FavCoreResult<()>
pub async fn batch_op_set<'a, SS, F, T>(
sets: &'a mut SS,
mut f: F,
batch_size: usize,
) -> FavCoreResult<()>
where
SS: Sets + 'a,
F: FnMut(&'a mut SS::Set, WaitForCancellationFutureOwned) -> T,
Expand All @@ -171,7 +179,7 @@ where
}
}
})
.buffer_unordered(8);
.buffer_unordered(batch_size);
let mut result = Ok(());
tokio::select! {
_ = async {
Expand Down Expand Up @@ -208,24 +216,32 @@ impl<T> SetOpsExt for T where T: SetOps {}
/// let app = App::default();
/// let mut set = TestSet::default();
/// let mut sub = set.subset(|r| r.check_status(StatusFlags::TRACK));
/// app.batch_fetch_res(&mut sub);
/// app.batch_fetch_res(&mut sub, 8);
/// # };
/// ```
pub trait ResOpsExt: ResOps {
/// **Asynchronously** fetch resourses in set using [`ResOps::fetch_res`].
fn batch_fetch_res<'a, S>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
fn batch_fetch_res<'a, S>(
&self,
set: &'a mut S,
batch_size: usize,
) -> impl Future<Output = FavCoreResult<()>>
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r, fut| self.fetch_res(r, fut))
batch_op_res(set, |r, fut| self.fetch_res(r, fut), batch_size)
}

/// **Asynchronously** pull resourses in set using [`ResOps::pull_res`].
fn batch_pull_res<'a, S>(&self, set: &'a mut S) -> impl Future<Output = FavCoreResult<()>>
fn batch_pull_res<'a, S>(
&self,
set: &'a mut S,
batch_size: usize,
) -> impl Future<Output = FavCoreResult<()>>
where
S: Set<Res = Self::Res>,
{
batch_op_res(set, |r, fut| self.pull_res(r, fut))
batch_op_res(set, |r, fut| self.pull_res(r, fut), batch_size)
}
}

Expand Down Expand Up @@ -258,12 +274,16 @@ pub trait ResOpsExt: ResOps {
/// set: &mut set,
/// f: |r| r.check_status(StatusFlags::TRACK)
/// };
/// batch_op_res(&mut sub, |r, fut| app.fetch_res(r, fut)).await.unwrap();
/// batch_op_res(&mut sub, |r, fut| app.fetch_res(r, fut), 8).await.unwrap();
/// # };
/// ```
/// However, it's better to use [`Set::subset`] and [`ResOpsExt`] instead.
/// See [`ResOpsExt`] for more information.
pub async fn batch_op_res<'a, S, F, T>(set: &'a mut S, mut f: F) -> FavCoreResult<()>
pub async fn batch_op_res<'a, S, F, T>(
set: &'a mut S,
mut f: F,
batch_size: usize,
) -> FavCoreResult<()>
where
S: Set + 'a,
F: FnMut(&'a mut S::Res, WaitForCancellationFutureOwned) -> T,
Expand All @@ -283,7 +303,7 @@ where
}
}
})
.buffer_unordered(8);
.buffer_unordered(batch_size);
let mut result = Ok(());
tokio::select! {
_ = async {
Expand Down
4 changes: 2 additions & 2 deletions fav_core/src/res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait Set {
/// let app = App::default();
/// let mut set = TestSet::default();
/// let mut sub = set.subset(|r| r.check_status(StatusFlags::TRACK));
/// app.batch_fetch_res(&mut sub);
/// app.batch_fetch_res(&mut sub, 8);
/// # };
/// ```
fn subset<F>(&mut self, filter: F) -> SubSet<Self, F>
Expand Down Expand Up @@ -57,7 +57,7 @@ pub trait Sets {
/// let app = App::default();
/// let mut sets = TestSets::default();
/// let mut sub = sets.subset(|r| r.check_status(StatusFlags::TRACK));
/// app.batch_fetch_set(&mut sub);
/// app.batch_fetch_set(&mut sub, 8);
/// # };
/// ```
fn subset<F>(&mut self, filter: F) -> SubSets<Self, F>
Expand Down
6 changes: 3 additions & 3 deletions fav_utils/src/bili/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ mod tests {
bili.fetch_sets(&mut sets).await.unwrap();
let set = sets.iter_mut().min_by_key(|s| s.media_count).unwrap();
bili.fetch_set(set, tokio::signal::ctrl_c()).await.unwrap();
bili.batch_fetch_res(set).await.unwrap();
bili.batch_pull_res(set).await.unwrap();
bili.batch_fetch_res(set, 8).await.unwrap();
bili.batch_pull_res(set, 8).await.unwrap();
sets.write().unwrap();
}

Expand All @@ -289,6 +289,6 @@ mod tests {
bili.fetch_set(set, tokio::signal::ctrl_c()).await.unwrap();
set.on_res_status(StatusFlags::TRACK);
let mut sub = set.subset(|r| r.check_status(StatusFlags::TRACK));
bili.batch_fetch_res(&mut sub).await.unwrap();
bili.batch_fetch_res(&mut sub, 8).await.unwrap();
}
}

0 comments on commit af8336e

Please sign in to comment.