Skip to content

Commit

Permalink
Implement BufferedWriter that wraps a child writer (#114)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #114

This diff adds a very simple buffered writer. Although this wasn't a requirement for supporting large PA/PL runs, it is good to have parity for buffered reader and buffered writer.

This class has a flush() function that writes the current buffer to the underlying file. When write is called, it first writes into the buffer and then flushes if there is no space

Reviewed By: elliottlawrence

Differential Revision: D34904877

fbshipit-source-id: 5a3dba3ef39de408c279ff6518f33f9e5bc6af7b
  • Loading branch information
adshastri authored and facebook-github-bot committed Mar 23, 2022
1 parent 6f72f8a commit d9676a0
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 5 deletions.
72 changes: 72 additions & 0 deletions fbpcf/io/api/BufferedWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <algorithm>
#include <cstddef>
#include <vector>

#include "fbpcf/io/api/BufferedWriter.h"

namespace fbpcf::io {

int BufferedWriter::close() {
flush();
return baseWriter_.close();
}

size_t BufferedWriter::write(std::vector<char>& buf) {
size_t written = 0;
size_t remaining = buf.size();

while (written < buf.size()) {
// write what we can in the current chunk
size_t bytesThatWillFit = buffer_.size() - currentPosition_;
size_t bytesToWrite = std::min(bytesThatWillFit, remaining);
std::copy(
buf.begin() + written,
buf.begin() + written + bytesToWrite,
buffer_.begin() + currentPosition_);
currentPosition_ += bytesToWrite;
written += bytesToWrite;
remaining -= bytesToWrite;
if (remaining == 0) {
// if we were able to fit everything, break and return right here
break;
}

// flush appropriately sets the currentPosition_
flush();
}

return written;
}

BufferedWriter::~BufferedWriter() {
close();
}

void BufferedWriter::flush() {
if (currentPosition_ == 0) {
return;
}

std::vector<char> toWrite;
if (currentPosition_ < buffer_.size()) {
toWrite = std::vector<char>(currentPosition_);
std::copy(
buffer_.begin(), buffer_.begin() + currentPosition_, toWrite.begin());
} else {
toWrite = buffer_;
}
currentPosition_ = 0;
if (baseWriter_.write(toWrite) != toWrite.size()) {
throw std::runtime_error(
"Failed to flush contents of buffer. Terminating.");
}
}

} // namespace fbpcf::io
46 changes: 46 additions & 0 deletions fbpcf/io/api/BufferedWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <cstddef>
#include <memory>
#include <vector>

#include "fbpcf/io/api/IWriterCloser.h"

namespace fbpcf::io {

/*
This class is the API for buffered writer, which
provides the ability to specify a chunk size.
*/

constexpr size_t defaultChunkSize = 4096;

class BufferedWriter : public IWriterCloser {
public:
explicit BufferedWriter(
IWriterCloser& baseWriter,
const size_t chunkSize = defaultChunkSize)
: buffer_{std::vector<char>(chunkSize)},
currentPosition_{0},
baseWriter_{baseWriter} {}

int close() override;
size_t write(std::vector<char>& buf) override;
~BufferedWriter() override;

void flush();

private:
std::vector<char> buffer_;
size_t currentPosition_;
IWriterCloser& baseWriter_;
};

} // namespace fbpcf::io
70 changes: 70 additions & 0 deletions fbpcf/io/api/test/BufferedWriterTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <gtest/gtest.h>
#include <stdio.h>
#include <filesystem>
#include <string>
#include "folly/logging/xlog.h"

#include "fbpcf/io/api/BufferedWriter.h"
#include "fbpcf/io/api/LocalFileWriter.h"
#include "fbpcf/io/api/test/utils/IOTestHelper.h"

namespace fbpcf::io {

TEST(BufferedWriterTest, testWritingToFile) {
std::string base_dir = IOTestHelper::getBaseDirFromPath(__FILE__);
std::string file_to_write_to =
base_dir + "data/buffered_writer_writer_test_file.txt";
auto writer = fbpcf::io::LocalFileWriter(file_to_write_to);
auto bufferedWriter = std::make_unique<BufferedWriter>(writer, 40);

std::string to_write = "this file tests the buffered writer\n";
auto buf =
std::vector<char>(to_write.c_str(), to_write.c_str() + to_write.size());
auto nBytes = bufferedWriter->write(buf);
EXPECT_EQ(nBytes, to_write.size());

std::vector<char> arbitraryBytes{
't', 'h', 'e', 's', 'e', ' ', 't', 'w', 'o', ' '};
nBytes = bufferedWriter->write(arbitraryBytes);
EXPECT_EQ(nBytes, arbitraryBytes.size());

to_write = "lines fit in one chunk\n";
auto buf2 =
std::vector<char>(to_write.c_str(), to_write.c_str() + to_write.size());
nBytes = bufferedWriter->write(buf2);
EXPECT_EQ(nBytes, to_write.size());

std::string longLine =
"but this next line is going to be much longer and will require multiple iterations of the loop to fit everything in the file\n";
auto buf3 =
std::vector<char>(longLine.c_str(), longLine.c_str() + longLine.size());
nBytes = bufferedWriter->write(buf3);
EXPECT_EQ(nBytes, longLine.size());
bufferedWriter->flush();

to_write = "this is tiny\n";
auto buf4 =
std::vector<char>(to_write.c_str(), to_write.c_str() + to_write.size());
nBytes = bufferedWriter->write(buf4);
EXPECT_EQ(nBytes, to_write.size());

bufferedWriter->close();

/*
Verify that file contents match the expected
*/
IOTestHelper::expectFileContentsMatch(
file_to_write_to,
base_dir + "data/expected_buffered_writer_test_file.txt");

IOTestHelper::cleanup(file_to_write_to);
}

} // namespace fbpcf::io
6 changes: 1 addition & 5 deletions fbpcf/io/api/test/LocalFileWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

namespace fbpcf::io {

inline void cleanup(std::string fileToDelete) {
remove(fileToDelete.c_str());
}

TEST(LocalFileWriterTest, testWritingToFile) {
std::string baseDir = IOTestHelper::getBaseDirFromPath(__FILE__);
std::random_device rd;
Expand Down Expand Up @@ -71,7 +67,7 @@ TEST(LocalFileWriterTest, testWritingToFile) {
IOTestHelper::expectFileContentsMatch(
fileToWriteTo, baseDir + "data/expected_local_file_writer_test_file.txt");

cleanup(fileToWriteTo);
IOTestHelper::cleanup(fileToWriteTo);
}

} // namespace fbpcf::io
4 changes: 4 additions & 0 deletions fbpcf/io/api/test/data/expected_buffered_writer_test_file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
this file tests the buffered writer
these two lines fit in one chunk
but this next line is going to be much longer and will require multiple iterations of the loop to fit everything in the file
this is tiny
4 changes: 4 additions & 0 deletions fbpcf/io/api/test/utils/IOTestHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class IOTestHelper {
expectedFile->close();
testFile->close();
}

static void cleanup(std::string file_to_delete) {
remove(file_to_delete.c_str());
}
};

} // namespace fbpcf::io

0 comments on commit d9676a0

Please sign in to comment.