Skip to content

Commit

Permalink
Merge pull request #279 from songweijia/master
Browse files Browse the repository at this point in the history
Removing a copy on appending log to Persistent<T>
  • Loading branch information
songweijia authored May 9, 2024
2 parents 7109b15 + 774fc18 commit 323f068
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 66 deletions.
35 changes: 22 additions & 13 deletions include/derecho/persistent/Persistent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,22 +339,13 @@ class PersistentRegistry : public mutils::RemoteDeserializationContext {
// the log entries are applied in order. TODO: use checkpointing to accelerate it!
//
// There are three methods included in this interface:
// - 'finalizeCurrentDelta' This method is called when Persistent<T> wants to
// make a version. Its argument is a DeltaFinalizer function; T should invoke this
// function to give Persistent<T> the Delta data.
// - 'currentDeltaToBytes' This method is called when Persistent<T> wants to
// make a version. It serialize the current Delta to the given buffer.
// - 'applyDelta' This method is called on object construction from the disk. Its argument
// is a single Delta data buffer that should be applied.
// - 'create' This static method is used to create an empty object from a deserialization
// manager.

/**
* Type of a function that receives a Delta data buffer from an object with Delta support,
* for the purpose of writing the Delta to a persistent log.
* @param arg1 a pointer to the buffer
* @param arg2 the buffer's size
*/
using DeltaFinalizer = std::function<void(uint8_t const* const, std::size_t)>;

template <typename DeltaObjectType>
class IDeltaObjectFactory {
public:
Expand All @@ -366,8 +357,26 @@ class IDeltaObjectFactory {
template <typename ObjectType>
class IDeltaSupport : public IDeltaObjectFactory<ObjectType> {
public:
virtual void finalizeCurrentDelta(const DeltaFinalizer&) = 0;
virtual void applyDelta(uint8_t const* const) = 0;
/**
* @fn size_t currentDeltaToBytes(uint8_t* const)
* @brief serialize the current delta to buffer.
* @param[in] buf buffer
* @param[in] buf_size size of the given buffer
* @return number of size used.
*/
virtual size_t currentDeltaToBytes(uint8_t* const buf, size_t buf_size) = 0;
/**
* @fn size_t currentDeltaSize()
* @brief get the serialized size of the current delta.
* @return size.
*/
virtual size_t currentDeltaSize() = 0;
/**
* @fn applyDelta(uint8_t const* const)
* @brief apply the delta to current state.
* @param[in] buf The buffer for serialized data.
*/
virtual void applyDelta(uint8_t const* const buf) = 0;
};

// _NameMaker is a tool makeing the name for the log corresponding to a
Expand Down
3 changes: 3 additions & 0 deletions include/derecho/persistent/detail/FilePersistLog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ class FilePersistLog : public PersistLog {
virtual void append(const void* pdata,
uint64_t size, version_t ver,
const HLC& mhlc) override;
virtual void append(const std::function<void(void*,uint64_t)>& blob_generator,
uint64_t size, version_t ver,
const HLC& mhlc) override;
virtual void advanceVersion(int64_t ver) override;
virtual int64_t getLength() override;
virtual int64_t getEarliestIndex() override;
Expand Down
37 changes: 27 additions & 10 deletions include/derecho/persistent/detail/PersistLog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,38 @@ class PersistLog {
*/
PersistLog(const std::string& name, bool enable_signatures);
virtual ~PersistLog() noexcept(true);
/** Persistent Append
* @param pdata - serialized data to be append
* @param size - length of the data
* @param ver - version of the data, the implementation is responsible for
* making sure it grows monotonically.
* @param mhlc - the hlc clock of the data, the implementation is
* responsible for making sure it grows monotonically.
/**
* @fn void append(const void*, uint64_t, version_t, const HLC&)
* @brief append to persistent log.
* Note that the entry appended can only become persistent till the persist()
* is called on that entry.
*
* @param[in] pdata Serialized data to be append
* @param[in] size Length of the data
* @param[in] ver Version of the data, the implementation is responsible for
* making sure it grows monotonically.
* @param[in] mhlc The hlc clock of the data, the implementation is
* responsible for making sure it grows monotonically.
*/
virtual void append(const void* pdata,
uint64_t size, version_t ver,
const HLC& mhlc)
= 0;

const HLC& mhlc) = 0;
/**
* @fn void append(const std::function<void(void*)>&,uint64_t,version_t,const HLC&)
* @brief append to persistent log in a zero-copy fashion.
* Note that the entry appended can only become persistent till the persist() is
* called on that entry.
*
* @param[in] blob_generator The lambda that generator the data.
* @param[in] size The size of the data being generated.
* @param[in] ver Version of the data, the implementation is responsible
* for making sure it grows monotonically.
* @param[in] mhlc The hlc clock of the data, the implementation is
* responsible for making sure it grows monotonically.
*/
virtual void append(const std::function<void(void*,uint64_t)>& blob_generator,
uint64_t size, version_t ver,
const HLC& mhlc) = 0;
/**
* Advance the version number without appending a log. This is useful
* to create gap between versions.
Expand Down
27 changes: 12 additions & 15 deletions include/derecho/persistent/detail/Persistent_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,23 +540,20 @@ template <typename ObjectType,
void Persistent<ObjectType, storageType>::set(ObjectType& v, version_t ver, const HLC& mhlc) {
dbg_trace(m_logger, "append to log with ver({}),hlc({},{})", ver, mhlc.m_rtc_us, mhlc.m_logic);
if constexpr(std::is_base_of<IDeltaSupport<ObjectType>, ObjectType>::value) {
v.finalizeCurrentDelta([&](uint8_t const* const buf, size_t len) {
// Don't create a log entry for versions without data change.
if (len > 0) {
this->m_pLog->append((const void* const)buf, len, ver, mhlc);
} else {
// Advance the log's version so it still reports the correct "current version"
this->m_pLog->advanceVersion(ver);
}
});
if (v.currentDeltaSize() > 0) {
this->m_pLog->append([&v](void* buf, uint64_t buf_size){
v.currentDeltaToBytes(static_cast<uint8_t*>(buf),static_cast<size_t>(buf_size));
},v.currentDeltaSize(),ver,mhlc);
} else {
this->m_pLog->advanceVersion(ver);
}
} else {
// ObjectType does not support Delta, logging the whole current state.
auto size = mutils::bytes_size(v);
uint8_t* buf = new uint8_t[size];
memset(buf, 0, size);
mutils::to_bytes(v, buf);
this->m_pLog->append((void*)buf, size, ver, mhlc);
delete[] buf;
this->m_pLog->append([&v](void* buf,uint64_t buf_size){
if (mutils::bytes_size(v) <= buf_size) {
mutils::to_bytes(v,static_cast<uint8_t*>(buf));
}
},mutils::bytes_size(v),ver,mhlc);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/applications/demos/simple_replicated_objects_overlap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ int main(int argc, char** argv) {
std::vector<node_id_t> bar_members = group.get_subgroup_members<Bar>(0)[0];
auto find_in_foo_results = std::find(foo_members.begin(), foo_members.end(), my_id);
if(find_in_foo_results != foo_members.end()) {
#ifndef NOLOG
uint32_t rank_in_foo = std::distance(foo_members.begin(), find_in_foo_results);
#endif//NOLOG
// Replicated<Foo>& foo_rpc_handle = group.get_subgroup<Foo>();
dbg_default_crit("Here is FOO {}!", rank_in_foo);
dbg_default_crit("I see members of my shard: {}", foo_members);
}
auto find_in_bar_results = std::find(bar_members.begin(), bar_members.end(), my_id);
if(find_in_bar_results != bar_members.end()) {
#ifndef NOLOG
uint32_t rank_in_bar = derecho::index_of(bar_members, my_id);
#endif//NOLOG
// Replicated<Bar>& bar_rpc_handle = group.get_subgroup<Bar>();
dbg_default_crit("Here is BAR {}!", rank_in_bar);
dbg_default_crit("I see members of my shard: {}", bar_members);
Expand Down
23 changes: 15 additions & 8 deletions src/applications/tests/unit_tests/mixed_persistence_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,23 @@ class IntegerWithDelta : public mutils::ByteRepresentable, persistent::IDeltaSup
}
// Unlike the one in test.cpp, this finalizeCurrentDelta only writes any data if delta is nonzero
// This simulates a more advanced object like CascadeStoreCore
virtual void finalizeCurrentDelta(const persistent::DeltaFinalizer& finalizer) {
if(delta != 0) {
finalizer(reinterpret_cast<const uint8_t*>(&delta), sizeof(delta));
virtual size_t currentDeltaToBytes(uint8_t * const buf, size_t buf_size) override {
if (delta == 0) {
return 0;
} else {
finalizer(nullptr, 0);
if (buf_size < sizeof(delta)) {
dbg_default_error("{} failed because buffer ({}) is smaller than needed ({}).\n",
__func__,buf_size,sizeof(delta));
return 0;
}
memcpy(static_cast<void*>(buf),static_cast<void*>(&delta),sizeof(delta));
return sizeof(delta);
}
// clear delta after writing
this->delta = 0;
}
virtual void applyDelta(uint8_t const* const pdat) {
virtual size_t currentDeltaSize() override {
return delta==0?0:sizeof(delta);
}
virtual void applyDelta(uint8_t const* const pdat) override {
this->value += *reinterpret_cast<const int*>(pdat);
}
static std::unique_ptr<IntegerWithDelta> create(mutils::DeserializationManager* dm) {
Expand Down Expand Up @@ -143,4 +150,4 @@ int main(int argc, char** argv) {
std::cin.get();
group.barrier_sync();
group.leave(true);
}
}
28 changes: 18 additions & 10 deletions src/applications/tests/unit_tests/signed_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,26 @@ std::string StringWithDelta::get_current_state() const {
return current_state;
}

void StringWithDelta::finalizeCurrentDelta(const persistent::DeltaFinalizer& finalizer) {
if(delta.size() == 0) {
dbg_default_trace("StringWithDelta: Calling finalizer with null buffer");
finalizer(nullptr, 0);
size_t StringWithDelta::currentDeltaSize() {
if (delta.size()==0) {
return 0;
} else {
// Serialize the string to a byte buffer and give that buffer to the DeltaFinalizer
// (this will create an unnecessary extra copy of the string, but efficiency isn't important here)
std::vector<uint8_t> delta_buffer(mutils::bytes_size(delta));
dbg_default_trace("StringWithDelta: Serializing delta string to a buffer of size {}", delta_buffer.size());
mutils::to_bytes(delta, delta_buffer.data());
finalizer(delta_buffer.data(), delta_buffer.size());
return mutils::bytes_size(delta);
}
}

size_t StringWithDelta::currentDeltaToBytes(uint8_t * const buf, size_t buf_size) {
if (delta.size() == 0) {
dbg_default_trace("StringWithDelta: Calling currentDeltaToBytes with null buffer\n");
return 0;
} else if (buf_size < mutils::bytes_size(delta)) {
dbg_default_error("{} failed because the buffer({}) given is smaller than needed({}).\n",
__func__,buf_size,delta.size());
return 0;
} else {
size_t nbytes = mutils::to_bytes(delta,buf);
delta.clear();
return nbytes;
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/applications/tests/unit_tests/signed_log_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ struct TestState : public derecho::DeserializationContext {

/**
* A simple Delta-supporting object that stores a string and has a method that
* appends to the string. All data appended since the last call to finalizeCurrentDelta
* appends to the string. All data appended since the last call to currentDeltaToBytes
* is stored in the delta, and applyDelta appends the delta to the current string.
* (There's no way to delete or truncate the string since this is just for test
* purposes). Note that if append() has not been called since the last call to
* finalizeCurrentDelta, the delta entry will be empty.
* currentDeltaToBytes, the delta entry will be empty.
*/
class StringWithDelta : public mutils::ByteRepresentable,
public persistent::IDeltaSupport<StringWithDelta> {
Expand All @@ -53,8 +53,9 @@ class StringWithDelta : public mutils::ByteRepresentable,
StringWithDelta(const std::string& init_string);
void append(const std::string& str_val);
std::string get_current_state() const;
virtual void finalizeCurrentDelta(const persistent::DeltaFinalizer& finalizer);
virtual void applyDelta(uint8_t const* const data);
virtual size_t currentDeltaSize() override;
virtual size_t currentDeltaToBytes(uint8_t * const buf, size_t buf_size) override;
virtual void applyDelta(uint8_t const* const data) override;
static std::unique_ptr<StringWithDelta> create(mutils::DeserializationManager* dm);
DEFAULT_SERIALIZATION_SUPPORT(StringWithDelta, current_state);
};
Expand Down
11 changes: 9 additions & 2 deletions src/persistent/FilePersistLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ inline void FilePersistLog::do_append_validation(const uint64_t size, const int6
}

void FilePersistLog::append(const void* pdat, uint64_t size, version_t ver, const HLC& mhlc) {
this->append([pdat,size](void* buf,uint64_t bfsz) {
memcpy(buf,pdat,std::min(size,bfsz));
},size,ver,mhlc);
}

void FilePersistLog::append(const std::function<void(void*,uint64_t)>& blob_generator,
uint64_t size, version_t ver, const HLC& mhlc) {
dbg_trace(m_logger, "{0} append event ({1},{2})", this->m_sName, mhlc.m_rtc_us, mhlc.m_logic);
FPL_RDLOCK;

Expand All @@ -258,9 +265,9 @@ void FilePersistLog::append(const void* pdat, uint64_t size, version_t ver, cons
do_append_validation(size, ver);
dbg_trace(m_logger, "{0} append:validate check2 Finished.", this->m_sName);

// copy data
// generate data.
// we reserve the first 'signature_size' bytes at the beginning of NEXT_DATA.
memcpy(reinterpret_cast<void*>(reinterpret_cast<uint64_t>(NEXT_DATA) + signature_size), pdat, size);
blob_generator(reinterpret_cast<void*>(reinterpret_cast<uint64_t>(NEXT_DATA) + signature_size), size);
dbg_trace(m_logger, "{0} append:data ({1} bytes) is copied to log.", this->m_sName, size);

// fill the log entry
Expand Down
16 changes: 12 additions & 4 deletions src/persistent/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <derecho/persistent/Persistent.hpp>
#include <derecho/openssl/signature.hpp>
#include <derecho/persistent/detail/util.hpp>
#include <derecho/core/derecho_exception.hpp>
#include <iostream>
#include <signal.h>
#include <spdlog/spdlog.h>
Expand Down Expand Up @@ -102,11 +103,18 @@ class IntegerWithDelta : public ByteRepresentable, IDeltaSupport<IntegerWithDelt
this->delta -= op;
return this->value;
}
virtual void finalizeCurrentDelta(const DeltaFinalizer& dp) {
// finalize current delta
dp((uint8_t const* const) & (this->delta), sizeof(this->delta));
// clear delta
virtual size_t currentDeltaToBytes(uint8_t* const buf, size_t buf_size) override {
if (buf_size < sizeof(delta)) {
dbg_default_warn("currentDeltaToBytes received a buffer {} smaller than needed: {}.\n",
buf_size,sizeof(delta));
return 0;
}
memcpy(static_cast<void*>(buf),static_cast<void*>(&delta),sizeof(delta));
this->delta = 0;
return sizeof(delta);
}
virtual size_t currentDeltaSize() override {
return (this->delta == 0)?0:sizeof(delta);
}
virtual void applyDelta(uint8_t const* const pdat) {
// apply delta
Expand Down

0 comments on commit 323f068

Please sign in to comment.