Skip to content

Commit

Permalink
Update SetMultimap to use Map2 internally and match conventions.
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth committed Feb 29, 2024
1 parent ca68aa2 commit 649f0a5
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 324 deletions.
6 changes: 3 additions & 3 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use fil_actors_runtime::runtime::builtins::Type;
use fil_actors_runtime::runtime::{ActorCode, Policy, Runtime};
use fil_actors_runtime::{
actor_dispatch, actor_error, deserialize_block, ActorContext, ActorDowncast, ActorError,
AsActorError, Set, BURNT_FUNDS_ACTOR_ADDR, CRON_ACTOR_ADDR, DATACAP_TOKEN_ACTOR_ADDR,
AsActorError, BURNT_FUNDS_ACTOR_ADDR, CRON_ACTOR_ADDR, DATACAP_TOKEN_ACTOR_ADDR,
REWARD_ACTOR_ADDR, STORAGE_POWER_ACTOR_ADDR, SYSTEM_ACTOR_ADDR, VERIFIED_REGISTRY_ACTOR_ADDR,
};
use fil_actors_runtime::{extract_send_result, BatchReturnGen, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
Expand Down Expand Up @@ -1425,7 +1425,7 @@ fn preactivate_deal<BS: Blockstore>(
deal_id: DealID,
proposals: &DealArray<BS>,
states: &DealMetaArray<BS>,
pending_proposals: &Set<BS>,
pending_proposals: &PendingProposalsSet<&BS>,
provider: &Address,
sector_commitment: ChainEpoch,
curr_epoch: ChainEpoch,
Expand Down Expand Up @@ -1456,7 +1456,7 @@ fn preactivate_deal<BS: Blockstore>(
// The pending deals set exists to prevent duplicate proposals.
// It should be impossible to have a proposal, no deal state, and not be in pending deals.
let deal_cid = deal_cid(rt, &proposal)?;
if !has_pending_deal(pending_proposals, &deal_cid)? {
if !pending_proposals.has(&deal_cid)? {
return Ok(Err(actor_error!(illegal_state, "deal {} is not in pending set", deal_cid)));
}

Expand Down
136 changes: 55 additions & 81 deletions actors/market/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use num_traits::Zero;

use fil_actors_runtime::{
actor_error, ActorContext, ActorError, Array, AsActorError, Config, Map2, Set, SetMultimap,
DEFAULT_HAMT_CONFIG,
SetMultimapConfig, DEFAULT_HAMT_CONFIG,
};

use crate::balance_table::BalanceTable;
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct State {

/// PendingProposals tracks dealProposals that have not yet reached their deal start date.
/// We track them here to ensure that miners can't publish the same deal proposal twice
/// Set<CID>
pub pending_proposals: Cid,

/// Total amount held in escrow, indexed by actor address (including both locked and unlocked amounts).
Expand Down Expand Up @@ -87,6 +88,13 @@ pub struct State {
pub provider_sectors: Cid,
}

pub type PendingProposalsSet<BS> = Set<BS, Cid>;
pub const PENDING_PROPOSALS_CONFIG: Config = DEFAULT_HAMT_CONFIG;

pub type DealOpsByEpoch<BS> = SetMultimap<BS, ChainEpoch, DealID>;
pub const DEAL_OPS_BY_EPOCH_CONFIG: SetMultimapConfig =
SetMultimapConfig { outer: DEFAULT_HAMT_CONFIG, inner: DEFAULT_HAMT_CONFIG };

pub type PendingDealAllocationsMap<BS> = Map2<BS, DealID, AllocationID>;
pub const PENDING_ALLOCATIONS_CONFIG: Config =
Config { bit_width: HAMT_BIT_WIDTH, ..DEFAULT_HAMT_CONFIG };
Expand All @@ -112,16 +120,12 @@ impl State {
.flush()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to create empty states array")?;

let empty_pending_proposals_map = Set::new(store).root().context_code(
ExitCode::USR_ILLEGAL_STATE,
"failed to create empty pending proposals map state",
)?;

let empty_pending_proposals =
PendingProposalsSet::empty(store, PENDING_PROPOSALS_CONFIG, "pending proposals")
.flush()?;
let empty_balance_table = BalanceTable::new(store, "balance table").root()?;

let empty_deal_ops_hamt = SetMultimap::new(store)
.root()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to create empty multiset")?;
let empty_deal_ops =
DealOpsByEpoch::empty(store, DEAL_OPS_BY_EPOCH_CONFIG, "deal ops").flush()?;

let empty_pending_deal_allocation_map = PendingDealAllocationsMap::empty(
store,
Expand All @@ -136,11 +140,11 @@ impl State {
Ok(Self {
proposals: empty_proposals_array,
states: empty_states_array,
pending_proposals: empty_pending_proposals_map,
pending_proposals: empty_pending_proposals,
escrow_table: empty_balance_table,
locked_table: empty_balance_table,
next_id: 0,
deal_ops_by_epoch: empty_deal_ops_hamt,
deal_ops_by_epoch: empty_deal_ops,
last_cron: EPOCH_UNDEFINED,

total_client_locked_collateral: TokenAmount::default(),
Expand Down Expand Up @@ -381,6 +385,16 @@ impl State {
Ok(maybe_alloc_id)
}

pub fn load_deal_ops<BS>(
&self,
store: BS,
) -> Result<SetMultimap<BS, ChainEpoch, DealID>, ActorError>
where
BS: Blockstore + Clone,
{
DealOpsByEpoch::load(store, &self.deal_ops_by_epoch, DEAL_OPS_BY_EPOCH_CONFIG, "deal ops")
}

pub fn put_deals_by_epoch<BS>(
&mut self,
store: &BS,
Expand All @@ -389,20 +403,13 @@ impl State {
where
BS: Blockstore,
{
let mut deals_by_epoch = SetMultimap::from_root(store, &self.deal_ops_by_epoch)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to load deals by epoch")?;

let mut deals_by_epoch = self.load_deal_ops(store)?;
new_deals_by_epoch.iter().try_for_each(|(epoch, id)| -> Result<(), ActorError> {
deals_by_epoch
.put(*epoch, *id)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to set deal")?;
deals_by_epoch.put(epoch, *id)?;
Ok(())
})?;

self.deal_ops_by_epoch = deals_by_epoch
.root()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to flush deals by epoch")?;

self.deal_ops_by_epoch = deals_by_epoch.flush()?;
Ok(())
}

Expand All @@ -414,22 +421,13 @@ impl State {
where
BS: Blockstore,
{
let mut deals_by_epoch = SetMultimap::from_root(store, &self.deal_ops_by_epoch)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to load deals by epoch")?;

let mut deals_by_epoch = self.load_deal_ops(store)?;
new_deals_by_epoch.iter().try_for_each(|(epoch, deals)| -> Result<(), ActorError> {
deals_by_epoch
.put_many(*epoch, deals)
.with_context_code(ExitCode::USR_ILLEGAL_STATE, || {
format!("failed to reinsert deal IDs for epoch {}", epoch)
})?;
deals_by_epoch.put_many(epoch, deals)?;
Ok(())
})?;

self.deal_ops_by_epoch = deals_by_epoch
.root()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to flush deals by epoch")?;

self.deal_ops_by_epoch = deals_by_epoch.flush()?;
Ok(())
}

Expand All @@ -442,16 +440,11 @@ impl State {
BS: Blockstore,
{
let mut deal_ids = Vec::new();

let deals_by_epoch = SetMultimap::from_root(store, &self.deal_ops_by_epoch)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to load deals by epoch")?;

deals_by_epoch
.for_each(key, |deal_id| {
deal_ids.push(deal_id);
Ok(())
})
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to set deal state")?;
let deals_by_epoch = self.load_deal_ops(store)?;
deals_by_epoch.for_each_in(&key, |deal_id| {
deal_ids.push(deal_id);
Ok(())
})?;

Ok(deal_ids)
}
Expand All @@ -464,22 +457,13 @@ impl State {
where
BS: Blockstore,
{
let mut deals_by_epoch = SetMultimap::from_root(store, &self.deal_ops_by_epoch)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to load deals by epoch")?;

let mut deals_by_epoch = self.load_deal_ops(store)?;
epochs_to_remove.iter().try_for_each(|epoch| -> Result<(), ActorError> {
deals_by_epoch
.remove_all(*epoch)
.with_context_code(ExitCode::USR_ILLEGAL_STATE, || {
format!("failed to delete deal ops for epoch {}", epoch)
})?;
deals_by_epoch.remove_all(epoch)?;
Ok(())
})?;

self.deal_ops_by_epoch = deals_by_epoch
.root()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to flush deals by epoch")?;

self.deal_ops_by_epoch = deals_by_epoch.flush()?;
Ok(())
}

Expand Down Expand Up @@ -517,21 +501,26 @@ impl State {
Ok(ex)
}

pub fn load_pending_deals<'bs, BS>(&self, store: &'bs BS) -> Result<Set<'bs, BS>, ActorError>
pub fn load_pending_deals<BS>(&self, store: BS) -> Result<PendingProposalsSet<BS>, ActorError>
where
BS: Blockstore,
{
Set::from_root(store, &self.pending_proposals)
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to get pending deals")
PendingProposalsSet::load(
store,
&self.pending_proposals,
PENDING_PROPOSALS_CONFIG,
"pending proposals",
)
}

fn save_pending_deals<BS>(&mut self, pending_deals: &mut Set<BS>) -> Result<(), ActorError>
fn save_pending_deals<BS>(
&mut self,
pending_deals: &mut PendingProposalsSet<BS>,
) -> Result<(), ActorError>
where
BS: Blockstore,
{
self.pending_proposals = pending_deals
.root()
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to flush pending deals")?;
self.pending_proposals = pending_deals.flush()?;
Ok(())
}

Expand All @@ -540,7 +529,7 @@ impl State {
BS: Blockstore,
{
let pending_deals = self.load_pending_deals(store)?;
has_pending_deal(&pending_deals, key)
pending_deals.has(key)
}

pub fn put_pending_deals<BS>(
Expand All @@ -553,9 +542,7 @@ impl State {
{
let mut pending_deals = self.load_pending_deals(store)?;
new_pending_deals.iter().try_for_each(|key: &Cid| -> Result<(), ActorError> {
pending_deals
.put(key.to_bytes().into())
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to set deal")?;
pending_deals.put(key)?;
Ok(())
})?;

Expand All @@ -571,11 +558,7 @@ impl State {
BS: Blockstore,
{
let mut pending_deals = self.load_pending_deals(store)?;
let removed = pending_deals
.delete(&pending_deal_key.to_bytes())
.with_context_code(ExitCode::USR_ILLEGAL_STATE, || {
format!("failed to delete pending proposal {}", pending_deal_key)
})?;
let removed = pending_deals.delete(&pending_deal_key)?;

self.save_pending_deals(&mut pending_deals)?;
Ok(removed)
Expand Down Expand Up @@ -1264,15 +1247,6 @@ where
Ok(state.cloned())
}

pub fn has_pending_deal<BS>(pending: &Set<BS>, key: &Cid) -> Result<bool, ActorError>
where
BS: Blockstore,
{
pending
.has(&key.to_bytes())
.context_code(ExitCode::USR_ILLEGAL_STATE, "failed to lookup pending deal")
}

pub fn load_provider_sector_deals<BS>(
store: BS,
provider_sectors: &ProviderSectorsMap<BS>,
Expand Down
48 changes: 24 additions & 24 deletions actors/market/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use cid::multihash::{Code, MultihashDigest};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::DAG_CBOR;
use fvm_shared::error::ExitCode;
use fvm_shared::sector::SectorNumber;
use fvm_shared::{
address::{Address, Protocol},
Expand All @@ -22,13 +23,14 @@ use num_traits::Zero;
use fil_actors_runtime::builtin::HAMT_BIT_WIDTH;
use fil_actors_runtime::cbor::serialize;
use fil_actors_runtime::{
make_map_with_root_and_bitwidth, parse_uint_key, ActorError, MessageAccumulator, SetMultimap,
make_map_with_root_and_bitwidth, ActorError, AsActorError, MessageAccumulator,
};

use crate::ext::verifreg::AllocationID;
use crate::{
balance_table::BalanceTable, DealArray, DealMetaArray, DealProposal, ProviderSectorsMap,
SectorDealsMap, State, PROPOSALS_AMT_BITWIDTH, PROVIDER_SECTORS_CONFIG, SECTOR_DEALS_CONFIG,
balance_table::BalanceTable, DealArray, DealMetaArray, DealOpsByEpoch, DealProposal,
PendingProposalsSet, ProviderSectorsMap, SectorDealsMap, State, DEAL_OPS_BY_EPOCH_CONFIG,
PENDING_PROPOSALS_CONFIG, PROVIDER_SECTORS_CONFIG, SECTOR_DEALS_CONFIG,
};

#[derive(Clone)]
Expand Down Expand Up @@ -292,15 +294,16 @@ pub fn check_state_invariants<BS: Blockstore>(

// pending proposals
let mut pending_proposal_count = 0;
match make_map_with_root_and_bitwidth::<_, ()>(
&state.pending_proposals,
match PendingProposalsSet::load(
store,
PROPOSALS_AMT_BITWIDTH,
&state.pending_proposals,
PENDING_PROPOSALS_CONFIG,
"pending proposals",
) {
Ok(pending_proposals) => {
let ret = pending_proposals.for_each(|key, _| {
let proposal_cid = Cid::try_from(key.0.to_owned())?;

let ret = pending_proposals.for_each(|key| {
let proposal_cid = Cid::try_from(key.to_owned())
.context_code(ExitCode::USR_ILLEGAL_STATE, "not a CID")?;
acc.require(
proposal_cids.contains(&proposal_cid),
format!("pending proposal with cid {proposal_cid} not found within proposals"),
Expand Down Expand Up @@ -364,23 +367,20 @@ pub fn check_state_invariants<BS: Blockstore>(

// deals ops by epoch
let (mut deal_op_epoch_count, mut deal_op_count) = (0, 0);
match SetMultimap::from_root(store, &state.deal_ops_by_epoch) {
match DealOpsByEpoch::load(
store,
&state.deal_ops_by_epoch,
DEAL_OPS_BY_EPOCH_CONFIG,
"deal ops",
) {
Ok(deal_ops) => {
// get into internals just to iterate through full data structure
let ret = deal_ops.0.for_each(|key, _| {
let epoch = parse_uint_key(key)? as i64;

let ret = deal_ops.for_each(|epoch: ChainEpoch, _| {
deal_op_epoch_count += 1;

deal_ops
.for_each(epoch, |deal_id| {
expected_deal_ops.remove(&deal_id);
deal_op_count += 1;
Ok(())
})
.map_err(|e| {
anyhow::anyhow!("error iterating deal ops for epoch {}: {}", epoch, e)
})
deal_ops.for_each_in(&epoch, |deal_id: DealID| {
expected_deal_ops.remove(&deal_id);
deal_op_count += 1;
Ok(())
})
});
acc.require_no_error(ret, "error iterating all deal ops");
}
Expand Down
Loading

0 comments on commit 649f0a5

Please sign in to comment.