Skip to content

Commit

Permalink
[HOPSWORKS-1900] epipe mishandling of large xattr when logged xattr s…
Browse files Browse the repository at this point in the history
…ize differs from current xattr size (#64)

* [HOPSWORKS-1900] fix handling of multi xattr operation in log having different sizes than current

* fix
  • Loading branch information
o-alex authored Sep 16, 2020
1 parent f446950 commit 826989c
Showing 1 changed file with 144 additions and 59 deletions.
203 changes: 144 additions & 59 deletions include/tables/XAttrTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@
#include "FsMutationsLogTable.h"
#include "MetadataLogTable.h"

struct XAttrPartKey {
Int64 mInodeId;
Int8 mNamespace;
std::string mName;
Int16 mIndex;

XAttrPartKey(Int64 inodeId, Int8 ns, std::string name, Int16 index) {
mInodeId = inodeId;
mNamespace = ns;
mName = name;
mIndex = index;
}

bool operator==(const XAttrPartKey &pk) const {
return pk.mInodeId == mInodeId && pk.mNamespace == mNamespace && pk.mName == mName && pk.mIndex == mIndex;
}

AnyMap getAnyKey() {
AnyMap pk;
pk[0] = mInodeId;
pk[1] = mNamespace;
pk[2] = mName;
pk[3] = mIndex;
return pk;
}
};

typedef std::vector<XAttrPartKey> XAttrPartKeyVec;

struct XAttrRowPart{
Int64 mInodeId;
Int8 mNamespace;
Expand Down Expand Up @@ -261,24 +290,6 @@ struct XAttrRow {
}
};

struct XAttrPK {
Int64 mInodeId;
Int8 mNamespace;
std::string mName;

XAttrPK(Int64 inodeId, Int8 ns, std::string name) {
mInodeId = inodeId;
mNamespace = ns;
mName = name;
}

std::string to_string() {
std::stringstream out;
out << mInodeId << "-" << std::to_string(mNamespace) << "-" << mName;
return out.str();
}
};

typedef std::vector<XAttrRow> XAttrVec;
typedef boost::unordered_map<std::string, XAttrVec> XAttrMap;
typedef boost::unordered_map<std::string, XAttrPartVec> XAttrPartMap;
Expand Down Expand Up @@ -336,44 +347,42 @@ class XAttrTable : public DBTable<XAttrRowPart> {
}

XAttrMap get(Ndb* connection, Fmq* data_batch) {
AnyVec anyVec;
Fmq batchedMutations;
Fmq addAllXattrs;

for (Fmq::iterator it = data_batch->begin();
it != data_batch->end(); ++it) {
AnyVec anyVec;
XAttrPartKeyVec xattrPartKeyVec;
for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) {
FsMutationRow row = *it;
if (!row.requiresReadingXAttr() || !row.isXAttrOperation()) {
continue;
}

if(row.mOperation == XAttrAddAll){
addAllXattrs.push_back(row);
continue;
}

LOG_DEBUG("doRead batch for XAttr [ " + row.getXAttrName() + " ] to get its " << row.getNumParts() << " parts ");
for(Int16 index=0; index < row.getNumParts(); index++){
AnyMap pk;
pk[0] = row.mInodeId;
pk[1] = row.getNamespace();
pk[2] = row.getXAttrName();
pk[3] = index;
anyVec.push_back(pk);
}
addRetryKeys(row.mInodeId, row.getNamespace(), row.getXAttrName(), row.getNumParts(), anyVec, xattrPartKeyVec);
batchedMutations.push_back(row);
}
std::pair<AnyVec, XAttrPartKeyVec> xAttrKeys = std::make_pair(anyVec, xattrPartKeyVec);
XAttrMap xattrs;
int retry = 0;
while(!xAttrKeys.first.empty()) {
if(retry > 5) {
LOG_ERROR("xattr are changing to fast - epipe cannot get a consistent read");
}
XAttrPartVec xattrsParts = doRead(connection, xAttrKeys.first);
xAttrKeys = combineBatch(xattrsParts, xattrs, batchedMutations, xAttrKeys);
retry++;
}

XAttrPartVec xattrsParts = doRead(connection, anyVec);
XAttrMap results = combine(xattrsParts, batchedMutations);

for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end();
++it){
for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end(); ++it){
FsMutationRow mr = *it;
results[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId);
xattrs[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId);
}

return results;
return xattrs;
}

XAttrVec getByInodeId(Ndb* connection, Int64 inodeId){
Expand All @@ -383,7 +392,7 @@ class XAttrTable : public DBTable<XAttrRowPart> {
return combine(xattrsParts);
}

boost::optional<XAttrRow> get(Ndb* connection, XAttrPK key) {
boost::optional<XAttrRow> get(Ndb* connection, XAttrPartKey key) {
XAttrRow row = get(connection, key.mInodeId, key.mNamespace, key.mName);
if(readCheckExists(key, row)) {
return row;
Expand All @@ -393,37 +402,113 @@ class XAttrTable : public DBTable<XAttrRowPart> {
}

private:
inline static bool readCheckExists(XAttrPK key, XAttrRow row) {
inline static bool readCheckExists(XAttrPartKey key, XAttrRow row) {
return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName;
}

inline static bool readCheckExists(XAttrPartKey key, XAttrRowPart row) {
return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName && key.mIndex == row.mIndex;
}

/** This handles a pruned index read, so entries in partVec are well formed */
XAttrVec combine(XAttrPartVec& partVec){
XAttrPartMap xAttrsByName;
convert(partVec, xAttrsByName);
XAttrVec results;
for(auto& e : xAttrsByName){
results.push_back(XAttrRow(e.second));
}
return results;
XAttrPartMap xAttrsByName;
for(auto& part : partVec) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
}
xAttrsByName[id].push_back(part);
}
XAttrVec results;
for(auto& e :xAttrsByName){
auto& vec = e.second;
std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){
return a.mIndex < b.mIndex;
});
results.push_back(XAttrRow(e.second));
}
return results;
}

void addRetryKeys(Int64 inodeId, Int8 ns, std::string name, Int16 numParts, AnyVec& anyVec, XAttrPartKeyVec& keys) {
LOG_DEBUG("doRead batch for XAttr [ " + name + " ] to get its " << numParts << " parts ");
XAttrPartKey key(inodeId, ns, name, 0);
/** There might be multiple operations using the same XAttr. No use getting it multiple times from the db. */
if(std::find(std::begin(keys), std::end(keys), key) == std::end(keys)) {
for (Int16 index = 0; index < numParts; index++) {
XAttrPartKey pkey(inodeId, ns, name, index);
keys.push_back(pkey);
anyVec.push_back(pkey.getAnyKey());
}
}
}

XAttrMap combine(XAttrPartVec& partVec, Fmq& xAttrMutations){
/** This handles a key batch read, so entries in partVec might have junk - filter them */
std::pair<AnyVec, XAttrPartKeyVec> combineBatch(XAttrPartVec& partVec, XAttrMap& xattrs, Fmq& xAttrMutations, std::pair<AnyVec, XAttrPartKeyVec> keys){
XAttrPartMap xAttrsByName;
convert(partVec, xAttrsByName);
XAttrMap results;
for(auto& m : xAttrMutations){
convertBatch(partVec, xAttrsByName, keys.second);
XAttrPartKeyVec retryPartKeyVec;
AnyVec anyVec;
for(auto& m : xAttrMutations) {
XAttrPartKey key(m.mInodeId, m.getNamespace(), m.getXAttrName(), 0);
if(std::find(std::begin(keys.second), std::end(keys.second), key) == std::end(keys.second)) {
continue;
}
std::string id = XAttrRowPart::getXAttrUniqueId(m);
auto& vec = xAttrsByName[id];
XAttrVec xvec;
xvec.push_back(XAttrRow(vec));
results[m.getPKStr()] = xvec;
if(xAttrsByName.find(id) == xAttrsByName.end()){
/** xattr was removed */
continue;
}
auto &vec = xAttrsByName[id];
unsigned long actualSize = vec[0].mNumParts;
if (actualSize == vec.size()) {
XAttrVec xvec;
xvec.push_back(XAttrRow(vec));
xattrs[m.getPKStr()] = xvec;
} else if (actualSize < vec.size()) {
XAttrPartVec actualVec(vec.begin(), vec.begin() + actualSize);
XAttrVec xvec;
xvec.push_back(XAttrRow(actualVec));
xattrs[m.getPKStr()] = xvec;
} else {
addRetryKeys(vec[0].mInodeId, vec[0].mNamespace, vec[0].mName, vec[0].mNumParts, anyVec, retryPartKeyVec);
}
}
std::pair<AnyVec, XAttrPartKeyVec> retryKeys = std::make_pair(anyVec, retryPartKeyVec);
return retryKeys;
}

return results;
/**
* the read was done as a batch of keys - we need to check key vs result to make sure the result exists
*/
void convertBatch(XAttrPartVec& parts, XAttrPartMap& xAttrsByName, XAttrPartKeyVec keys){
for(unsigned i = 0; i < keys.size(); ++i) {
XAttrPartKey key = keys[i];
XAttrRowPart part = parts[i];
/** we check each part against the rowKey */
if(readCheckExists(key, part)) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
}
xAttrsByName[id].push_back(part);
}
}

for(auto& e :xAttrsByName){
auto& vec = e.second;
std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){
return a.mIndex < b.mIndex;
});
}
}

void convert(XAttrPartVec& partVec, XAttrPartMap& xAttrsByName){
for(auto& part : partVec){

/**
* the read was done as a pruned index scan, only existing result returned. No need to check for sanity of results.
*/
void convert(XAttrPartVec& parts, XAttrPartMap& xAttrsByName){
for(auto& part : parts) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
Expand Down

0 comments on commit 826989c

Please sign in to comment.