Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plat 692 s3 streams consistency #138

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 62 additions & 21 deletions service/s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,19 @@ finishMultiPartUpload(const std::string & bucket,
}
}

void
S3Api::
abortMultiPartUpload(const std::string & bucket,
const std::string & resource,
const std::string & uploadId)
const
{
auto result = erase(bucket, resource, "uploadId=" + uploadId);
if (result.code_ != 204) {
throw ML::Exception("error aborting multipart upload: " + uploadId);
}
}

void
S3Api::MultiPartUploadPart::
fromXml(tinyxml2::XMLElement * element)
Expand Down Expand Up @@ -1965,8 +1978,8 @@ struct StreamingUploadSource {

struct Impl {
Impl()
: offset(0), chunkIndex(0), shutdown(false),
chunks(16)
: offset(0), chunkIndex(0), shutdown(false), chunks(16),
exceptionThrown(false), uploadAborted(false)
{
}

Expand Down Expand Up @@ -2062,6 +2075,8 @@ struct StreamingUploadSource {
std::mutex etagsLock;
std::vector<std::string> etags;
std::exception_ptr exc;
bool exceptionThrown;
bool uploadAborted;
ML::OnUriHandlerException onException;

void start()
Expand Down Expand Up @@ -2095,8 +2110,10 @@ struct StreamingUploadSource {

std::streamsize write(const char_type* s, std::streamsize n)
{
if (exc)
std::rethrow_exception(exc);
if (exc) {
handleException();
return 0;
}

size_t done = current.append(s, n);
offset += done;
Expand All @@ -2108,8 +2125,10 @@ struct StreamingUploadSource {
//cerr << "writing " << n << " characters returned "
// << done << endl;

if (exc)
std::rethrow_exception(exc);
if (exc) {
handleException();
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wsourdeau Hypothétiquement, on ne peut pas savoir si un fichier de taille zéro a fonctioné ou non.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pourquoi?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return done; -> done c'est le file size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non, c'est le nombre de bytes ecrit lors de cet appel de fonction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Et dans le cas d'un stream vide est-ce que cette fonction est appelée quand même?

}

return done;
}
Expand All @@ -2129,8 +2148,11 @@ struct StreamingUploadSource {

void finish()
{
if (exc)
std::rethrow_exception(exc);
if (exc) {
handleException();
return;
}

// cerr << "pushing last chunk " << chunkIndex << endl;
flush();

Expand All @@ -2147,23 +2169,20 @@ struct StreamingUploadSource {

// Make sure that an exception in uploading the last chunk doesn't
// lead to a corrupt (truncated) file
if (exc)
std::rethrow_exception(exc);
if (exc) {
handleException();
return;
}

string etag;
try {
etag = owner->finishMultiPartUpload(bucket, "/" + object,
uploadId,
etags);
owner->finishMultiPartUpload(bucket, "/" + object, uploadId,
etags);
}
catch (...) {
owner->abortMultiPartUpload(bucket, "/" + object, uploadId);
onException();
throw;
}
//cerr << "final etag is " << etag << endl;

if (exc)
std::rethrow_exception(exc);

// double elapsed = Date::now().secondsSince(startDate);

Expand All @@ -2173,18 +2192,38 @@ struct StreamingUploadSource {
// << "MB/s" << " to " << etag << endl;
}

void handleException()
{
if (!uploadAborted) {
owner->abortMultiPartUpload(bucket, "/" + object, uploadId);
uploadAborted = true;
}
if (!exceptionThrown) {
exceptionThrown = true;
std::rethrow_exception(exc);
}
}

/* upload threads */
void runThread()
{
while (!shutdown) {
Chunk chunk;
if (chunks.tryPop(chunk, 0.01)) {
if (exc)
if (exc) {
while (chunks.tryPop(chunk));
return;
}
try {
//cerr << "got chunk " << chunk.index
// << " with " << chunk.size << " bytes at index "
// << chunk.index << endl;

if (chunk.index == metadata.throwChunk) {
throw ML::Exception("deterministic upload"
" exception at chunk %d",
chunk.index);
}
// Upload the data
string md5 = md5HashToHex(chunk.data, chunk.size);

Expand Down Expand Up @@ -2537,10 +2576,12 @@ struct RegisterS3Handler {
throw ML::Exception("unknown aws option " + name + "=" + value
+ " opening S3 object " + resource);
}
else if(name == "num-threads")
{
else if (name == "num-threads") {
md.numThreads = std::stoi(value);
}
else if (name == "throw-chunk") {
md.throwChunk = std::stoi(value);
}
else {
cerr << "warning: skipping unknown S3 option "
<< name << "=" << value << endl;
Expand Down
11 changes: 10 additions & 1 deletion service/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ struct S3Api : public AwsApi {
ObjectMetadata(const Redundancy & redundancy)
: redundancy(redundancy),
serverSideEncryption(SSE_NONE),
numThreads(8)
numThreads(8), throwChunk(-1)
{
}

Expand All @@ -259,6 +259,10 @@ struct S3Api : public AwsApi {
std::map<std::string, std::string> metadata;
std::string acl;
unsigned int numThreads;

/* Index of the chunk during a write operation after which to emulate
an HTTP exception. */
int throwChunk;
};

/** Signed request that can be executed. */
Expand Down Expand Up @@ -558,6 +562,11 @@ struct S3Api : public AwsApi {
const std::string & uploadId,
const std::vector<std::string> & etags) const;

void
abortMultiPartUpload(const std::string & bucket,
const std::string & resource,
const std::string & uploadId) const;

void uploadRecursive(std::string dirSrc,
std::string bucketDest,
bool includeDir);
Expand Down