Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit batcher #584

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/miner/address_selector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ namespace fc::mining {

/**
* SelectAddress takes the maximal possible transaction fee from configs and
* chooses one of control addresses with minimal balance that is more than good
* funds to make miner work as long as possible. If no suitble control address
* were found, function returns worker address.
* chooses one of control addresses with minimal balance that is more than
* good funds to make miner work as long as possible. If no suitble control
* address were found, function returns worker address.
*/
inline outcome::result<Address> SelectAddress(
const MinerInfo &miner_info,
const TokenAmount &good_funds,
const std::shared_ptr<FullNodeApi> &api) {
const std::shared_ptr<FullNodeApi>
&api) { // TODO update from lotus (Markuuu-s)
TokenAmount finder_balance;
auto finder = miner_info.control.end();
for (auto address = miner_info.control.begin();
Expand Down
1 change: 1 addition & 0 deletions core/miner/storage_fsm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ target_link_libraries(events

add_library(batcher
impl/precommit_batcher_impl.cpp
impl/commit_batcher_impl.cpp
)
target_link_libraries(batcher
api
Expand Down
37 changes: 37 additions & 0 deletions core/miner/storage_fsm/commit_batcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once
#include "api/full_node/node_api.hpp"
Comment on lines +6 to +7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#pragma once
#include "api/full_node/node_api.hpp"
#pragma once
#include "api/full_node/node_api.hpp"

#include "miner/storage_fsm/types.hpp"
#include "primitives/sector/sector.hpp"

namespace fc::mining {
using CommitCallback = std::function<void(const outcome::result<CID> &)>;
using api::FullNodeApi;
using primitives::sector::AggregateSealVerifyInfo;
using primitives::sector::RegisteredSealProof;
using sector_storage::Proof;
using types::SectorInfo;

struct AggregateInput {
Proof proof;
AggregateSealVerifyInfo info;
RegisteredSealProof spt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RegisteredSealProof spt;
RegisteredSealProof seal_proof_type{RegisteredSealProof::kUndefined};

};

class CommitBatcher {
public:
virtual ~CommitBatcher() = default;

virtual outcome::result<void> addCommit(
const SectorInfo &sector_info,
const AggregateInput &aggregate_input,
const CommitCallback &callBack) = 0;

virtual void forceSend() = 0;
};

} // namespace fc::mining
246 changes: 246 additions & 0 deletions core/miner/storage_fsm/impl/commit_batcher_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#pragma once

Not in .cpp file.

#include "commit_batcher_impl.hpp"
Comment on lines +6 to +7
Copy link
Collaborator

@wer1st wer1st Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#pragma once
#include "commit_batcher_impl.hpp"
#include "commit_batcher_impl.hpp"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use full path

#include <iostream>
#include <utility>
#include "vm/actor/builtin/v5/miner/miner_actor.hpp"
#include "vm/actor/builtin/v6/monies.hpp"

#include <string>
Comment on lines +8 to +13
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include <iostream>
#include <utility>
#include "vm/actor/builtin/v5/miner/miner_actor.hpp"
#include "vm/actor/builtin/v6/monies.hpp"
#include <string>
#include <iostream>
#include <string>
#include <utility>
#include "vm/actor/builtin/v5/miner/miner_actor.hpp"
#include "vm/actor/builtin/v6/monies.hpp"


namespace fc::mining {
using api::kPushNoSpec;
using fc::BytesIn;
using fc::proofs::ProofEngine;
Comment on lines +17 to +18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using fc::BytesIn;
using fc::proofs::ProofEngine;
using proofs::ProofEngine;

using primitives::ActorId;
using primitives::go::bigdiv;
using primitives::sector::AggregateSealVerifyInfo;
using primitives::sector::AggregateSealVerifyProofAndInfos;
using primitives::tipset::TipsetCPtr;
using vm::actor::MethodParams;
using vm::actor::builtin::types::miner::kChainFinality;
using vm::actor::builtin::types::miner::SectorPreCommitOnChainInfo;
using vm::actor::builtin::v5::miner::ProveCommitAggregate;
using vm::actor::builtin::v6::miner::AggregateProveCommitNetworkFee;

CommitBatcherImpl::CommitBatcherImpl(const std::chrono::seconds &max_time,
std::shared_ptr<FullNodeApi> api,
Address miner_address,
std::shared_ptr<Scheduler> scheduler,
AddressSelector address_selector,
std::shared_ptr<FeeConfig> fee_config,
const size_t &max_size_callback,
std::shared_ptr<ProofEngine> proof)
: scheduler_(std::move(scheduler)),
max_delay_(max_time),
closest_cutoff_(max_time),
max_size_callback_(max_size_callback),
api_(std::move(api)),
miner_address_(std::move(miner_address)),
fee_config_(std::move(fee_config)),
proof_(std::move(proof)),
address_selector_(std::move(address_selector)) {
cutoff_start_ = std::chrono::system_clock::now();
Copy link
Contributor

@turuslan turuslan Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use scheduler

Suggested change
cutoff_start_ = std::chrono::system_clock::now();
cutoff_start_ = scheduler_->now();

reschedule(max_delay_);
}

outcome::result<void> CommitBatcherImpl::addCommit(
const SectorInfo &sector_info,
const AggregateInput &aggregate_input,
const CommitCallback &callback) {
std::unique_lock<std::mutex> locker(mutex_storage_);

const SectorNumber &sector_number = sector_info.sector_number;
OUTCOME_TRY(head, api_->ChainHead());

pair_storage_[sector_number] = PairStorage{aggregate_input, callback};

if (pair_storage_.size() >= max_size_callback_) {
locker.unlock();
forceSend();
return outcome::success();
}

setCommitCutoff(head->epoch(), sector_info);
return outcome::success();
}

void CommitBatcherImpl::forceSend() {
MapPairStorage pair_storage_for_send_;

std::unique_lock<std::mutex> locker(mutex_storage_);
pair_storage_for_send_ = std::move(pair_storage_);
locker.unlock();

const auto maybe_result = sendBatch(pair_storage_for_send_);

for (auto &[key, pair_storage] : pair_storage_for_send_) {
pair_storage.commit_callback(maybe_result);
}

cutoff_start_ = std::chrono::system_clock::now();
closest_cutoff_ = max_delay_;
reschedule(max_delay_);
Comment on lines +85 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to store queued commits with cutoff, sorted by cutoff, and reschedule by minimal cutoff from queue

}

void CommitBatcherImpl::reschedule(std::chrono::milliseconds time) {
handle_ = scheduler_->scheduleWithHandle(
[&]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be segfault

MapPairStorage pair_storage_for_send_;

std::unique_lock<std::mutex> locker(mutex_storage_);
pair_storage_for_send_ = std::move(pair_storage_);
locker.unlock();

const auto maybe_result = sendBatch(pair_storage_for_send_);

for (auto &[key, pair_storage] : pair_storage_for_send_) {
pair_storage.commit_callback(maybe_result);
}

cutoff_start_ = std::chrono::system_clock::now();
closest_cutoff_ = max_delay_;

handle_.reschedule(max_delay_).value();
},
time);
}

outcome::result<CID> CommitBatcherImpl::sendBatch(
const MapPairStorage &pair_storage_for_send) {
if (pair_storage_for_send.empty()) {
cutoff_start_ = std::chrono::system_clock::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it, because after sendBatch call we set it to now

return ERROR_TEXT("Empty Batcher");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it error, maybe just outcome::success()?

}
OUTCOME_TRY(head, api_->ChainHead());

const size_t total = pair_storage_for_send.size();

ProveCommitAggregate::Params params;

std::vector<Proof> proofs;
proofs.reserve(total);

std::vector<AggregateSealVerifyInfo> infos;
infos.reserve(total);

BigInt collateral = 0;

for (const auto &[sector_number, pair_storage] : pair_storage_for_send) {
OUTCOME_TRY(sector_collateral,
getSectorCollateral(sector_number, head->key));
collateral = collateral + sector_collateral;

params.sectors.insert(sector_number);
infos.push_back(pair_storage.aggregate_input.info);
}

for (const auto &[sector_number, pair_storage] : pair_storage_for_send) {
proofs.push_back(pair_storage.aggregate_input.proof);
}

const ActorId mid = miner_address_.getId();
// TODO maybe long (AggregateSealProofs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO maybe long (AggregateSealProofs)
// TODO(Markuuu-s) maybe long (AggregateSealProofs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO maybe long (AggregateSealProofs)

AggregateSealVerifyProofAndInfos aggregate_seal{
.miner = mid,
.seal_proof =
pair_storage_for_send.at(infos[0].number).aggregate_input.spt,
.aggregate_proof = arp_,
.infos = infos};

std::vector<BytesIn> proofsSpan;
proofsSpan.reserve(proofs.size());

for (const Proof &proof : proofs) {
proofsSpan.push_back(gsl::make_span(proof));
}
Comment on lines +158 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proofs is redundant. You could fill proofsSpan directly


OUTCOME_TRY(proof_->aggregateSealProofs(aggregate_seal, proofsSpan));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OUTCOME_TRY(proof_->aggregateSealProofs(aggregate_seal, proofsSpan));
// TODO(Markuuu-s) maybe long (AggregateSealProofs)
OUTCOME_TRY(proof_->aggregateSealProofs(aggregate_seal, proofsSpan));


params.proof = aggregate_seal.proof;
OUTCOME_TRY(encode, codec::cbor::encode(params));
OUTCOME_TRY(miner_info, api_->StateMinerInfo(miner_address_, head->key));

const TokenAmount max_fee =
fee_config_->max_commit_batch_gas_fee.FeeForSector(proofs.size());

OUTCOME_TRY(tipset, api_->ChainGetTipSet(head->key));
const BigInt base_fee = tipset->blks[0].parent_base_fee;

TokenAmount agg_fee_raw =
AggregateProveCommitNetworkFee(infos.size(), base_fee); // TODO change to aggregateNetworkFee

TokenAmount agg_fee = bigdiv(agg_fee_raw * agg_fee_num_, agg_fee_den_);
TokenAmount need_funds = collateral + agg_fee;
TokenAmount good_funds = max_fee + need_funds;
Comment on lines +177 to +179
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TokenAmount agg_fee = bigdiv(agg_fee_raw * agg_fee_num_, agg_fee_den_);
TokenAmount need_funds = collateral + agg_fee;
TokenAmount good_funds = max_fee + need_funds;
const TokenAmount agg_fee = bigdiv(agg_fee_raw * agg_fee_num_, agg_fee_den_);
const TokenAmount need_funds = collateral + agg_fee;
const TokenAmount good_funds = max_fee + need_funds;


OUTCOME_TRY(address,
address_selector_(miner_info, good_funds, need_funds, api_));
OUTCOME_TRY(signed_messege,
api_->MpoolPushMessage(
vm::message::UnsignedMessage(miner_address_,
address,
0,
need_funds,
max_fee,
{},
ProveCommitAggregate::Number,
MethodParams{encode}),
kPushNoSpec));

cutoff_start_ = std::chrono::system_clock::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cutoff_start_ = std::chrono::system_clock::now();

return signed_messege.getCid();
}

void CommitBatcherImpl::setCommitCutoff(const ChainEpoch &current_epoch,
const SectorInfo &sector_info) {
ChainEpoch cutoff_epoch =
sector_info.ticket_epoch
+ static_cast<int64_t>(kEpochsInDay + kChainFinality);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ static_cast<int64_t>(kEpochsInDay + kChainFinality);
+ static_cast<ChainEpoch>(kEpochsInDay + kChainFinality);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChainEpoch start_epoch{};
for (const auto &piece : sector_info.pieces) {
if (!piece.deal_info) {
continue;
}
start_epoch = piece.deal_info->deal_schedule.start_epoch;
if (start_epoch < cutoff_epoch) {
cutoff_epoch = start_epoch;
}
}
if (cutoff_epoch <= current_epoch) {
forceSend();
} else {
const auto temp_cutoff = std::chrono::milliseconds(
(cutoff_epoch - current_epoch) * kEpochDurationSeconds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(cutoff_epoch - current_epoch) * kEpochDurationSeconds);
(cutoff_epoch - current_epoch) * kBlockDelaySecs);

if ((closest_cutoff_
- std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - cutoff_start_)
> temp_cutoff)) {
cutoff_start_ = std::chrono::system_clock::now();
reschedule(temp_cutoff);
closest_cutoff_ = temp_cutoff;
}
}
}

outcome::result<TokenAmount> CommitBatcherImpl::getSectorCollateral(
const SectorNumber &sector_number, const TipsetKey &tip_set_key) {
OUTCOME_TRY(pci,
api_->StateSectorPreCommitInfo(
miner_address_, sector_number, tip_set_key));

OUTCOME_TRY(collateral,
api_->StateMinerInitialPledgeCollateral(
miner_address_, pci.info, tip_set_key));

collateral = collateral + pci.precommit_deposit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
collateral = collateral + pci.precommit_deposit;
collateral = collateral - pci.precommit_deposit;

collateral = std::max(BigInt(0), collateral);

return collateral;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return collateral;
return std::move(collateral);

}

} // namespace fc::mining
83 changes: 83 additions & 0 deletions core/miner/storage_fsm/impl/commit_batcher_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once
#include <libp2p/basic/scheduler.hpp>
Comment on lines +6 to +7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#pragma once
#include <libp2p/basic/scheduler.hpp>
#pragma once
#include <libp2p/basic/scheduler.hpp>

#include "miner/storage_fsm/commit_batcher.hpp"

namespace fc::mining {
using api::MinerInfo;
using libp2p::basic::Scheduler;
using primitives::BigInt;
using primitives::SectorNumber;
using primitives::address::Address;
using primitives::sector::RegisteredAggregationProof;
using primitives::tipset::Tipset;
using primitives::tipset::TipsetKey;
using proofs::ProofEngine;
using types::FeeConfig;

using AddressSelector = std::function<outcome::result<Address>(
const MinerInfo &miner_info,
const TokenAmount &good_funds,
const TokenAmount &need_funds,
const std::shared_ptr<FullNodeApi> &api)>;

class CommitBatcherImpl : public CommitBatcher {
public:
struct PairStorage {
AggregateInput aggregate_input;
CommitCallback commit_callback;
};
using MapPairStorage = std::map<SectorNumber, PairStorage>;

CommitBatcherImpl(const std::chrono::seconds &max_time,
std::shared_ptr<FullNodeApi> api,
Address miner_address,
std::shared_ptr<Scheduler> scheduler,
AddressSelector address_selector,
std::shared_ptr<FeeConfig> fee_config,
const size_t &max_size_callback,
std::shared_ptr<ProofEngine> proof);

outcome::result<void> addCommit(const SectorInfo &sector_info,
const AggregateInput &aggregate_input,
const CommitCallback &callBack) override;

void forceSend() override;

void setCommitCutoff(const ChainEpoch &current_epoch,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we want to make it private

const SectorInfo &sector_info);

private:
std::shared_ptr<Scheduler> scheduler_;
Scheduler::Handle handle_;
std::chrono::milliseconds max_delay_;
std::chrono::milliseconds closest_cutoff_;
std::chrono::system_clock::time_point cutoff_start_;
size_t max_size_callback_;
MapPairStorage pair_storage_;
std::shared_ptr<FullNodeApi> api_;
Address miner_address_;
std::shared_ptr<FeeConfig> fee_config_;
std::shared_ptr<ProofEngine> proof_;
std::mutex mutex_storage_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::mutex mutex_storage_;
mutable std::mutex mutex_storage_;

AddressSelector address_selector_;
common::Logger logger_;

const BigInt agg_fee_num_ = BigInt(110);
const BigInt agg_fee_den_ = BigInt(100);
const RegisteredAggregationProof arp_ = RegisteredAggregationProof(0);
Comment on lines +70 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use const naming for constants.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const RegisteredAggregationProof arp_ = RegisteredAggregationProof(0);
const RegisteredAggregationProof arp_ = RegisteredAggregationProof::SnarkPackV1;


// TODO (Markuu-s) Add processIndividually
void reschedule(std::chrono::milliseconds time);

outcome::result<CID> sendBatch(const MapPairStorage &pair_storage_for_send);

outcome::result<TokenAmount> getSectorCollateral(
const SectorNumber &sector_number, const TipsetKey &tip_set_key);
};

} // namespace fc::mining
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed empty line

Loading