diff --git a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp new file mode 100644 index 000000000..7ff51096d --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include + +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" + +namespace global_parameters { + +std::string serialize(const GlobalParameters& src) { + std::ostringstream s; + boost::archive::text_oarchive oa(s); + oa << src; + return s.str(); +} + +GlobalParameters deserialize(const std::string& src) { + GlobalParameters data; + std::istringstream s(src); + boost::archive::text_iarchive ia(s); + ia >> data; + return data; +} + +void writeToFile(const std::string& file, const GlobalParameters& gp) { + auto writer = std::make_unique( + std::make_unique(file)); + auto string = global_parameters::serialize(gp); + writer->writeString(string); + writer->close(); +} + +GlobalParameters readFromFile(const std::string& file) { + auto reader = std::make_unique( + std::make_unique(file)); + auto serializedGlobalParameters = reader->readLine(); + while (!reader->eof()) { + auto line = reader->readLine(); + serializedGlobalParameters += "\n" + line; + } + reader->close(); + return global_parameters::deserialize(serializedGlobalParameters); +} + +} // namespace global_parameters diff --git a/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h new file mode 100644 index 000000000..96847fd03 --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace global_parameters { + +inline const std::string KAdvRowCount = "Advertiser_Row_Count"; +inline const std::string KPubRowCount = "Publisher_Row_Count"; + +inline const std::string KAdvDataWidth = "Advertiser_Data_Width"; +inline const std::string KPubDataWidth = "Publisher_Data_Width"; + +inline const std::string KMatchedUserCount = "Matched_User_Count"; + +using GlobalParameterType = + boost::variant>; + +using GlobalParameters = std::unordered_map; + +std::string serialize(const GlobalParameters& src); + +GlobalParameters deserialize(const std::string& src); + +void writeToFile(const std::string& file, const GlobalParameters& gp); + +GlobalParameters readFromFile(const std::string& file); + +} // namespace global_parameters diff --git a/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp b/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp new file mode 100644 index 000000000..383592fc9 --- /dev/null +++ b/fbpcs/emp_games/data_processing/global_parameters/test/GlobalParametersTest.cpp @@ -0,0 +1,39 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" +#include +#include "folly/Format.h" +#include "folly/Random.h" + +using namespace ::testing; + +namespace global_parameters { + +TEST(GlobalParametersSerialization, testSerializeAndDeserialize) { + const std::string file = + folly::sformat("./global_parameter_{}", folly::Random::rand32()); + + GlobalParameters gp; + gp.emplace("test1", 3); + gp.emplace( + "test2", std::unordered_map({{1, 2}, {3, 4}, {5, 6}})); + + writeToFile(file, gp); + auto gp1 = readFromFile(file); + std::remove(file.c_str()); + + EXPECT_EQ( + boost::get(gp.at("test1")), + boost::get(gp1.at("test1"))); + + EXPECT_EQ( + (boost::get>(gp.at("test2"))), + (boost::get>(gp1.at("test2")))); +} + +} // namespace global_parameters diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp new file mode 100644 index 000000000..458cb1ce5 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.cpp @@ -0,0 +1,115 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" +#include + +namespace unified_data_process { + +/** + * The idea here is to distribute the workload across more threads. This + * UdpEncryptor object will read in data in the main thread and buffer them. + * Once there are sufficient buffered data (defined by chunkSize_), the buffered + * data will be passed to the underlying udp encryption object to process in a + * background thread. + */ +void UdpEncryptor::processDataInBuffer() { + if (udpThreadForMySelf_ != nullptr) { + // this is not the first time of executing processingMyData + udpThreadForMySelf_->join(); + } else { + // this is the first time of executing processingMyData, need to call + // preparation first + udpEncryption_->prepareToProcessMyData( + bufferForMyDataInLoading_->at(0).size()); + } + if (bufferIndex_ < chunkSize_) { + bufferForMyDataInLoading_->resize(bufferIndex_); + } + std::swap(bufferForMyDataInLoading_, bufferForMyDataInProcessing_); + bufferIndex_ = 0; + udpThreadForMySelf_ = std::make_unique([this]() { + if (bufferForMyDataInProcessing_->size() == 0) { + return; + } + udpEncryption_->processMyData(*bufferForMyDataInProcessing_); + }); +} + +// load a line that is to be processed later. +void UdpEncryptor::pushOneLineFromMe( + std::vector&& serializedLine) { + bufferForMyDataInLoading_->at(bufferIndex_++) = std::move(serializedLine); + if (bufferIndex_ >= chunkSize_) { + processDataInBuffer(); + } +} + +// load multiple lines into the buffer. +void UdpEncryptor::pushLinesFromMe( + std::vector>&& serializedLines) { + size_t inputIndex = 0; + + while (inputIndex < serializedLines.size()) { + if (chunkSize_ - bufferIndex_ <= serializedLines.size() - inputIndex) { + std::copy( + serializedLines.begin() + inputIndex, + serializedLines.begin() + inputIndex + chunkSize_ - bufferIndex_, + bufferForMyDataInLoading_->begin() + bufferIndex_); + inputIndex += chunkSize_ - bufferIndex_; + // the buffer is full, the index should be changed to chunkSize_ + bufferIndex_ = chunkSize_; + processDataInBuffer(); + } else { + std::copy( + serializedLines.begin() + inputIndex, + serializedLines.end(), + bufferForMyDataInLoading_->begin() + bufferIndex_); + + bufferIndex_ += serializedLines.size() - inputIndex; + inputIndex = serializedLines.size(); + } + } +} + +// set the config for peer's data. +void UdpEncryptor::setPeerConfig( + size_t totalNumberOfPeerRows, + size_t peerDataWidth, + const std::vector& indexes) { + udpEncryption_->prepareToProcessPeerData(peerDataWidth, indexes); + auto loop = [this, totalNumberOfPeerRows]() { + size_t numberOfProcessedRow = 0; + while (numberOfProcessedRow < totalNumberOfPeerRows) { + udpEncryption_->processPeerData( + std::min(chunkSize_, totalNumberOfPeerRows - numberOfProcessedRow)); + numberOfProcessedRow += chunkSize_; + } + }; + udpThreadForPeer_ = std::make_unique(loop); +} + +UdpEncryptor::EncryptionResuts UdpEncryptor::getEncryptionResults() const { + if (udpThreadForPeer_ == nullptr) { + throw std::runtime_error("No thread to join for peer!"); + } + udpThreadForPeer_->join(); + + auto [ciphertexts, nonces, indexes] = udpEncryption_->getProcessedData(); + return EncryptionResuts{ciphertexts, nonces, indexes}; +} + +std::vector<__m128i> UdpEncryptor::getExpandedKey() { + processDataInBuffer(); + if (udpThreadForMySelf_ == nullptr) { + throw std::runtime_error("No thread to join for peer!"); + } + udpThreadForMySelf_->join(); + return udpEncryption_->getExpandedKey(); +} + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h new file mode 100644 index 000000000..ee6380552 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/IUdpEncryption.h" + +namespace unified_data_process { + +class UdpEncryptor { + using UdpEncryption = + fbpcf::mpc_std_lib::unified_data_process::data_processor::IUdpEncryption; + + public: + using EncryptionResuts = UdpEncryption::EncryptionResuts; + + UdpEncryptor(std::unique_ptr udpEncryption, size_t chunkSize) + : udpEncryption_(std::move(udpEncryption)), + udpThreadForMySelf_(nullptr), + udpThreadForPeer_(nullptr), + chunkSize_(chunkSize), + bufferIndex_(0), + bufferForMyDataInLoading_{ + std::make_unique>>( + chunkSize_)}, + bufferForMyDataInProcessing_( + std::make_unique>>( + chunkSize_)) {} + + // load a line that is to be processed later. + void pushOneLineFromMe(std::vector&& serializedLine); + + // load a number of lines that is to be processed later. + void pushLinesFromMe( + std::vector>&& serializedLines); + + // set the config for peer's data. + void setPeerConfig( + size_t totalNumberOfPeerRows, + size_t peerDataWidth, + const std::vector& indexes); + + EncryptionResuts getEncryptionResults() const; + + std::vector<__m128i> getExpandedKey(); + + private: + void processDataInBuffer(); + + std::unique_ptr udpEncryption_; + std::unique_ptr udpThreadForMySelf_; + std::unique_ptr udpThreadForPeer_; + size_t chunkSize_; + + size_t bufferIndex_; + std::unique_ptr>> + bufferForMyDataInLoading_; + std::unique_ptr>> + bufferForMyDataInProcessing_; +}; + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp new file mode 100644 index 000000000..284938ca3 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/UdpUtil.h" +#include "fbpcs/emp_games/data_processing/global_parameters/GlobalParameters.h" +#include "folly/String.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/experimental/coro/Collect.h" + +namespace unified_data_process { + +folly::coro::Task UdpEncryptorApp::invokeUdpEncryption( + const std::vector& indexFiles, + const std::vector& serializedDataFiles, + const std::string& globalParameters, + const std::string& dataFile, + const std::string& expandedKeyFile) { + auto executor = + std::make_shared(serializedDataFiles.size() + 1); + + auto task1 = + processPeerData(indexFiles, globalParameters).scheduleOn(executor.get()); + + std::vector>>> tasks; + + for (size_t i = 1; i < serializedDataFiles.size(); i++) { + tasks.push_back(readDataFile(serializedDataFiles[i]).scheduleOn(executor.get())); + } + + { + // process the first file in main thread. + auto reader = std::make_unique( + std::make_unique(serializedDataFiles.at(0))); + reader->readLine(); // header, useless + while (!reader->eof()) { + auto line = reader->readLine(); + + encryptor_->pushOneLineFromMe( + std::vector(line.begin(), line.end())); + } + reader->close(); + } + + auto data = co_await folly::coro::collectAllRange(std::move(tasks)); + for (auto& datum : data) { + encryptor_->pushLinesFromMe(std::move(datum)); + } + + fbpcf::mpc_std_lib::unified_data_process::data_processor:: + writeEncryptionResultsToFile( + encryptor_->getEncryptionResults(), dataFile); + fbpcf::mpc_std_lib::unified_data_process::data_processor:: + writeExpandedKeyToFile(encryptor_->getExpandedKey(), expandedKeyFile); +} + +folly::coro::Task> UdpEncryptorApp::readIndexFile( + const std::string& fileName) { + auto reader = std::make_unique( + std::make_unique(fileName)); + reader->readLine(); // header, useless + + std::vector rst; + while (!reader->eof()) { + std::vector data; + auto line = reader->readLine(); + folly::split(",", std::move(line), data); + rst.push_back(stoi(data.at(1))); + } + reader->close(); + co_return rst; +} + +folly::coro::Task>> +UdpEncryptorApp::readDataFile(const std::string& fileName) { + auto reader = std::make_unique( + std::make_unique(fileName)); + reader->readLine(); // header, useless + + std::vector> rst; + while (!reader->eof()) { + auto line = reader->readLine(); + rst.push_back(std::vector(line.begin(), line.end())); + } + reader->close(); + co_return rst; +} + +folly::coro::Task UdpEncryptorApp::processPeerData( + const std::vector& indexFiles, + const std::string& globalParameterFile) const { + auto executor = std::make_shared(indexFiles.size()); + + std::vector>> tasks; + for (auto& file : indexFiles) { + tasks.push_back(readIndexFile(file).scheduleOn(executor.get())); + } + + auto results = co_await folly::coro::collectAllRange(std::move(tasks)); + + auto globalParameters = global_parameters::readFromFile(globalParameterFile); + + std::vector indexes; + for (auto& indexInFile : results) { + indexes.insert(indexes.end(), indexInFile.begin(), indexInFile.end()); + } + auto totalNumberOfPeerRows = boost::get( + amIPublisher_ ? globalParameters.at(global_parameters::KAdvRowCount) + : globalParameters.at(global_parameters::KPubRowCount)); + auto peerDataWidth = boost::get( + amIPublisher_ ? globalParameters.at(global_parameters::KAdvDataWidth) + : globalParameters.at(global_parameters::KPubDataWidth)); + + encryptor_->setPeerConfig(totalNumberOfPeerRows, peerDataWidth, indexes); +} + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h new file mode 100644 index 000000000..a11f890e9 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptorApp.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" + +namespace unified_data_process { + +class UdpEncryptorApp { + public: + ~UdpEncryptorApp() {} + + UdpEncryptorApp(std::unique_ptr encryptor, bool amIPublisher) + : encryptor_(std::move(encryptor)), amIPublisher_(amIPublisher) {} + + folly::coro::Task invokeUdpEncryption( + const std::vector& indexFiles, + const std::vector& serializedDataFiles, + const std::string& globalParameters, + const std::string& dataFile, + const std::string& expandedKeyFile); + + private: + static folly::coro::Task> readIndexFile( + const std::string& fileName); + static folly::coro::Task>> + readDataFile(const std::string& fileName); + + folly::coro::Task processPeerData( + const std::vector& indexFiles, + const std::string& globalParameterFile) const; + + std::unique_ptr encryptor_; + bool amIPublisher_; +}; + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h new file mode 100644 index 000000000..b767e806c --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include "fbpcf/mpc_std_lib/unified_data_process/data_processor/IUdpEncryption.h" + +namespace unified_data_process { + +using namespace ::testing; + +class UdpEncryptionMock final + : public fbpcf::mpc_std_lib::unified_data_process::data_processor:: + IUdpEncryption { + public: + MOCK_METHOD(void, prepareToProcessMyData, (size_t)); + + MOCK_METHOD( + void, + processMyData, + (const std::vector>&)); + + MOCK_METHOD(std::vector<__m128i>, getExpandedKey, ()); + + MOCK_METHOD( + void, + prepareToProcessPeerData, + (size_t, const std::vector&)); + + MOCK_METHOD(void, processPeerData, (size_t)); + + MOCK_METHOD(EncryptionResuts, getProcessedData, ()); +}; + +} // namespace unified_data_process diff --git a/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp new file mode 100644 index 000000000..bf7008780 --- /dev/null +++ b/fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptorTest.cpp @@ -0,0 +1,135 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/UdpEncryptor.h" +#include "fbpcs/emp_games/data_processing/unified_data_process/UdpEncryptor/test/UdpEncryptionMock.h" + +#include +#include + +using namespace ::testing; +namespace unified_data_process { + +TEST(UdpEncryptorTestWithMock, testProcessingPeerData) { + int chunkSize = 500; + int totalRow = 1200; + size_t dataWidth = 32; + std::vector indexes{3, 31, 6, 12, 5}; + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessPeerData(dataWidth, indexes)).Times(1); + EXPECT_CALL(*mock, processPeerData(chunkSize)).Times(totalRow / chunkSize); + if (totalRow % chunkSize != 0) { + EXPECT_CALL(*mock, processPeerData(totalRow % chunkSize)).Times(1); + } + EXPECT_CALL(*mock, getProcessedData()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + encryptor.setPeerConfig(totalRow, dataWidth, indexes); + encryptor.getEncryptionResults(); +} + +TEST(UdpEncryptorTestWithMock, testProcessingMyData) { + size_t chunkSize = 200; + size_t sampleSize = 219; + size_t totalRow = 1200; + size_t width = 32; + + std::vector>> testData; + std::vector>> testData1; + for (size_t i = 0; i < totalRow; i += chunkSize) { + testData.push_back(std::vector>()); + testData.back().reserve(std::min(chunkSize, totalRow - i)); + for (size_t j = 0; (j < chunkSize) && (j + i < totalRow); j++) { + testData.back().push_back( + std::vector(width, (i + j) & 0xFF)); + if ((i + j) % sampleSize == 0) { + testData1.push_back(std::vector>()); + } + testData1.back().push_back( + std::vector(width, (i + j) & 0xFF)); + } + } + + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessMyData(width)).Times(1); + for (size_t i = 0; i < testData.size(); i++) { + EXPECT_CALL(*mock, processMyData(testData.at(i))).Times(1); + } + EXPECT_CALL(*mock, getExpandedKey()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + for (size_t i = 0; i < testData1.size() / 2; i++) { + for (size_t j = 0; j < testData1.at(i).size(); j++) { + encryptor.pushOneLineFromMe(std::move(testData1.at(i).at(j))); + } + } + for (size_t i = testData1.size() / 2; i < testData1.size(); i++) { + encryptor.pushLinesFromMe(std::move(testData1.at(i))); + } + encryptor.getExpandedKey(); +} + +TEST(UdpEncryptorTestWithMock, testProcessingBothSidesData) { + size_t chunkSize = 200; + size_t sampleSize = 219; + size_t myTotalRow = 1200; + size_t peerTotalRow = 1500; + size_t myWidth = 32; + size_t peerWidth = 35; + std::vector indexes{3, 31, 6, 12, 5}; + + std::vector>> testData; + std::vector>> testData1; + + for (size_t i = 0; i < myTotalRow; i += chunkSize) { + testData.push_back(std::vector>()); + testData.back().reserve(std::min(chunkSize, myTotalRow - i)); + for (size_t j = 0; j < chunkSize && j + i < myTotalRow; j++) { + testData.back().push_back( + std::vector(myWidth, (i + j) & 0xFF)); + if ((i + j) % sampleSize == 0) { + testData1.push_back(std::vector>()); + } + testData1.back().push_back( + std::vector(myWidth, (i + j) & 0xFF)); + } + } + + auto mock = std::make_unique(); + + EXPECT_CALL(*mock, prepareToProcessMyData(myWidth)).Times(1); + for (size_t i = 0; i < testData.size(); i++) { + EXPECT_CALL(*mock, processMyData(testData.at(i))).Times(1); + } + EXPECT_CALL(*mock, getExpandedKey()).Times(1); + + EXPECT_CALL(*mock, prepareToProcessPeerData(peerWidth, indexes)).Times(1); + EXPECT_CALL(*mock, processPeerData(chunkSize)) + .Times(peerTotalRow / chunkSize); + if (peerTotalRow % chunkSize != 0) { + EXPECT_CALL(*mock, processPeerData(peerTotalRow % chunkSize)).Times(1); + } + EXPECT_CALL(*mock, getProcessedData()).Times(1); + + UdpEncryptor encryptor(std::move(mock), chunkSize); + encryptor.setPeerConfig(peerTotalRow, peerWidth, indexes); + + for (size_t i = 0; i < testData1.size() / 2; i++) { + for (size_t j = 0; j < testData1.at(i).size(); j++) { + encryptor.pushOneLineFromMe(std::move(testData1.at(i).at(j))); + } + } + for (size_t i = testData1.size() / 2; i < testData1.size(); i++) { + encryptor.pushLinesFromMe(std::move(testData1.at(i))); + } + encryptor.getExpandedKey(); + encryptor.getEncryptionResults(); +} + +} // namespace unified_data_process