Skip to content

Commit

Permalink
[HOPSWORKS-1770] reindex featurestore index (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
o-alex authored May 16, 2020
1 parent f312b88 commit f52794e
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 73 deletions.
2 changes: 2 additions & 0 deletions config.ini.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ePipe Configurations

reindex = false
# possible values [project, featurestore, all]
reindex_of = all

# general configuration

Expand Down
38 changes: 38 additions & 0 deletions include/FeaturestoreReindexer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* This file is part of ePipe
* Copyright (C) 2020, Logical Clocks AB. All rights reserved
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef EPIPE_FEATURESTOREREINDEXER_H
#define EPIPE_FEATURESTOREREINDEXER_H
#include "ClusterConnectionBase.h"
#include "ProjectsElasticSearch.h"

class FeaturestoreReindexer : public ClusterConnectionBase {
public:
FeaturestoreReindexer(const char* connection_string, const char* database_name,
const char* meta_database_name, const char* hive_meta_database_name,
const HttpClientConfig elastic_client_config, const std::string featurestore_index,
int elastic_batch_size, int elastic_issue_time, int lru_cap);
virtual ~FeaturestoreReindexer();

void run();
private:
ProjectsElasticSearch* mElasticSearch;
std::string mFeaturestoreIndex;
int mLRUCap;
};
#endif //EPIPE_FEATURESTOREREINDEXER_H
67 changes: 61 additions & 6 deletions include/FsMutationsDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,66 @@
#include "tables/XAttrTable.h"
#include "FileProvenanceConstants.h"

class FSMutationsJSONBuilder {
public:
static std::string featurestoreDoc(std::string featurestoreIndex, std::string docType, Int64 inodeId,
std::string name, int version, int projectId, std::string projectName, Int64 datasetIId) {
std::stringstream out;
out << FSMutationsJSONBuilder::docHeader(featurestoreIndex, inodeId) << std::endl;

rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);

docWriter.StartObject();
docWriter.String("doc");
docWriter.StartObject();

docWriter.String("doc_type");
docWriter.String(docType.c_str());
docWriter.String("name");
docWriter.String(name.c_str());
docWriter.String("version");
docWriter.Int(version);

docWriter.String("project_id");
docWriter.Int(projectId);
docWriter.String("project_name");
docWriter.String(projectName.c_str());
docWriter.String("dataset_iid");
docWriter.Int(datasetIId);

docWriter.EndObject();

docWriter.String("doc_as_upsert");
docWriter.Bool(true);

docWriter.EndObject();

out << sbDoc.GetString() << std::endl;
return out.str();
}
private:
static std::string docHeader(std::string index, Int64 id) {
std::stringstream out;
rapidjson::StringBuffer sbOp;
rapidjson::Writer<rapidjson::StringBuffer> opWriter(sbOp);

opWriter.StartObject();
opWriter.String("update");
opWriter.StartObject();

opWriter.String("_id");
opWriter.Int64(id);
opWriter.String("_index");
opWriter.String(index.c_str());

opWriter.EndObject();
opWriter.EndObject();

return sbOp.GetString();
}
};

class FsMutationsDataReader : public NdbDataReader<FsMutationRow, MConn> {
public:
FsMutationsDataReader(MConn connection, const bool hopsworks, const int lru_cap,
Expand All @@ -45,8 +105,6 @@ class FsMutationsDataReader : public NdbDataReader<FsMutationRow, MConn> {
virtual void processAddedandDeleted(Fmq* data_batch, eBulk& bulk);

void createJSON(Fmq* pending, INodeMap& inodes, XAttrMap& xattrs, eBulk& bulk);
std::string docHeader(std::string index, Int64 id);
std::string featurestoreDoc(std::string docType, Int64 inodeId, std::string name, int version, int projectId, std::string projectName, Int64 datasetIId);
};

class FsMutationsDataReaders : public NdbDataReaders<FsMutationRow, MConn>{
Expand All @@ -61,7 +119,4 @@ class FsMutationsDataReaders : public NdbDataReaders<FsMutationRow, MConn>{
}
}
};


#endif /* FSMUTATIONSDATAREADER_H */

#endif /* FSMUTATIONSDATAREADER_H */
104 changes: 104 additions & 0 deletions src/FeaturestoreReindexer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* This file is part of ePipe
* Copyright (C) 2020, Logical Clocks AB. All rights reserved
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "FeaturestoreReindexer.h"
#include "FileProvenanceConstants.h"
#include "FsMutationsDataReader.h"
#include "tables/ProjectTable.h"
#include "tables/INodeTable.h"
#include "tables/DatasetTable.h"
#include "tables/XAttrTable.h"
#include "tables/FsMutationsLogTable.h"

FeaturestoreReindexer::FeaturestoreReindexer(const char* connection_string, const char* database_name,
const char* meta_database_name, const char* hive_meta_database_name,
const HttpClientConfig elastic_client_config, const std::string featurestore_index,
int elastic_batch_size, int elastic_issue_time, int lru_cap)
: ClusterConnectionBase(connection_string, database_name, meta_database_name, hive_meta_database_name),
mFeaturestoreIndex(featurestore_index), mLRUCap(lru_cap) {
mElasticSearch = new ProjectsElasticSearch(elastic_client_config, elastic_issue_time, elastic_batch_size, false, MConn());
}
void FeaturestoreReindexer::run() {
ptime start = Utils::getCurrentTime();
mElasticSearch->start();

Ndb *metaConn = create_ndb_connection(mMetaDatabaseName);
Ndb *conn = create_ndb_connection(mDatabaseName);

ProjectTable projectsTable(mLRUCap);
INodeTable inodesTable(mLRUCap);
DatasetTable datasetsTable(mLRUCap);
XAttrTable xAttrTable;

boost::unordered_map<int, std::string> projectNames;
int numXAttrs = 0;
int nonExistentXAttrs = 0;
int featurestoreDocs = 0;

datasetsTable.getAll(metaConn);
while (datasetsTable.next()) {
DatasetRow dataset = datasetsTable.currRow();
if (projectNames.find(dataset.mProjectId) == projectNames.end()) {
ProjectRow pRow = projectsTable.get(metaConn, dataset.mProjectId);
projectNames[dataset.mProjectId] = pRow.mInodeName;
}
std::string projectName = projectNames[dataset.mProjectId];

std::string docType = FileProvenanceConstants::isPartOfFeaturestore(dataset.mInodeId, dataset.mInodeId, projectName, dataset.mInodeName);
if (docType != DONT_EXIST_STR()) {
LOG_INFO("dataset:" << dataset.mInodeName << " has " << docType << "s");
//this is a featurestore doc - featuregroup or trainingdataset
INodeVec inodes = inodesTable.getByParentId(conn, dataset.mInodeId);
for (INodeVec::iterator it = inodes.begin(); it != inodes.end(); ++it) {
INodeRow inode = *it;
boost::optional<std::pair<std::string, int>> nameParts = FileProvenanceConstants::splitNameVersion(inode.mName);
if (nameParts) {
eBulk bulk;
LOG_INFO("featurestore type:" << docType << " name:" << nameParts.get().first << " version:" << std::to_string(nameParts.get().second));
std::string featurestoreDoc = FSMutationsJSONBuilder::featurestoreDoc(mFeaturestoreIndex, docType, inode.mId,
nameParts.get().first, nameParts.get().second, dataset.mProjectId, projectName, dataset.mInodeId);
bulk.push(Utils::getCurrentTime(), featurestoreDoc);
featurestoreDocs++;

if (inode.has_xattrs()) {
XAttrVec xattrs = xAttrTable.getByInodeId(conn, inode.mId);
for (XAttrVec::iterator xit = xattrs.begin(); xit != xattrs.end(); ++xit) {
XAttrRow xAttrRow = *xit;
LOG_INFO("xattr:" << xAttrRow.mName);
if (xAttrRow.mInodeId == inode.mId) {
bulk.push(Utils::getCurrentTime(), xAttrRow.to_upsert_json(mFeaturestoreIndex, FsOpType::XAttrUpdate));
} else {
LOG_WARN("XAttrs doesn't exists for [" << inode.mId << "] - " << xAttrRow.to_string());
nonExistentXAttrs++;
}
numXAttrs++;
}
}
mElasticSearch->addData(bulk);
}
}
}
}
mElasticSearch->shutdown();
mElasticSearch->waitToFinish();
LOG_INFO((numXAttrs - nonExistentXAttrs) << " XAttrs added, " << nonExistentXAttrs << " doesn't exists");
LOG_INFO((featurestoreDocs) << " documents were added to featurestore index ");
LOG_INFO("Reindexing done in " << Utils::getTimeDiffInMilliseconds(start, Utils::getCurrentTime()) << " msec");
}
FeaturestoreReindexer::~FeaturestoreReindexer() {
}
60 changes: 2 additions & 58 deletions src/FsMutationsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void FsMutationsDataReader::createJSON(Fmq* pending, INodeMap& inodes,
if(nameParts) {
LOG_DEBUG("featurestore type:" << docType << "name:" << nameParts.get().first << " version:" << std::to_string(nameParts.get().second));
bulk.push(nullptr, row.mEventCreationTime,
featurestoreDoc(docType, inode.mId, nameParts.get().first, nameParts.get().second, projectId, projectName, datasetINodeId));
FSMutationsJSONBuilder::featurestoreDoc(mFeaturestoreIndex, docType, inode.mId, nameParts.get().first,
nameParts.get().second, projectId, projectName, datasetINodeId));
}
}
}
Expand Down Expand Up @@ -165,63 +166,6 @@ void FsMutationsDataReader::createJSON(Fmq* pending, INodeMap& inodes,
}
}

std::string FsMutationsDataReader::docHeader(std::string index, Int64 id) {
std::stringstream out;
rapidjson::StringBuffer sbOp;
rapidjson::Writer<rapidjson::StringBuffer> opWriter(sbOp);

opWriter.StartObject();
opWriter.String("update");
opWriter.StartObject();

opWriter.String("_id");
opWriter.Int64(id);
opWriter.String("_index");
opWriter.String(index.c_str());

opWriter.EndObject();
opWriter.EndObject();

return sbOp.GetString();
}

std::string FsMutationsDataReader::featurestoreDoc(std::string docType, Int64 inodeId, std::string name, int version,
int projectId, std::string projectName, Int64 datasetIId) {
std::stringstream out;
out << docHeader(mFeaturestoreIndex, inodeId) << std::endl;

rapidjson::StringBuffer sbDoc;
rapidjson::Writer<rapidjson::StringBuffer> docWriter(sbDoc);

docWriter.StartObject();
docWriter.String("doc");
docWriter.StartObject();

docWriter.String("doc_type");
docWriter.String(docType.c_str());
docWriter.String("name");
docWriter.String(name.c_str());
docWriter.String("version");
docWriter.Int(version);

docWriter.String("project_id");
docWriter.Int(projectId);
docWriter.String("project_name");
docWriter.String(projectName.c_str());
docWriter.String("dataset_iid");
docWriter.Int(datasetIId);

docWriter.EndObject();

docWriter.String("doc_as_upsert");
docWriter.Bool(true);

docWriter.EndObject();

out << sbDoc.GetString() << std::endl;
return out.str();
}

FsMutationsDataReader::~FsMutationsDataReader() {

}
40 changes: 31 additions & 9 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <boost/program_options.hpp>
#include "Version.h"
#include "Reindexer.h"
#include "FeaturestoreReindexer.h"

namespace po = boost::program_options;

Expand Down Expand Up @@ -55,6 +56,7 @@ int main(int argc, char** argv) {
Barrier barrier = EPOCH;

bool reindex = false;
std::string reindex_of = "all";

bool hiveCleaner = true;

Expand Down Expand Up @@ -133,6 +135,8 @@ int main(int argc, char** argv) {
"Table tailer barrier type. EPOCH=0, GCI=1")
("reindex", po::value<bool>(&reindex)->default_value(reindex),
"initialize an empty index with all metadata")
("reindex_of", po::value<std::string>(&reindex_of)->default_value(reindex_of),
"define index to initialize [project, featurestore]")
("ssl_enabled", po::value<bool>(&sslEnabled)->default_value(sslEnabled),
"ssl enabled or disabled for elastic communication")
("ca_path", po::value<std::string>(&caPath)->default_value(caPath),
Expand Down Expand Up @@ -212,15 +216,33 @@ int main(int argc, char** argv) {
HttpClientConfig config = {elastic_addr, sslEnabled, caPath, username,
password};
if (reindex) {
LOG_INFO("Create Elasticsearch index at " << elastic_index);
Reindexer *reindexer = new Reindexer(connection_string.c_str(),
database_name.c_str(),
meta_database_name.c_str(),
hive_meta_database_name.c_str(),
config, elastic_index, elastic_batch_size,
elastic_issue_time, lru_cap);

reindexer->run();
if(reindex_of == "project") {
LOG_INFO("Create Elasticsearch index at " << elastic_index);
Reindexer *reindexer = new Reindexer(connection_string.c_str(),
database_name.c_str(),
meta_database_name.c_str(),
hive_meta_database_name.c_str(),
config, elastic_index, elastic_batch_size,
elastic_issue_time, lru_cap);
reindexer->run();
} else if(reindex_of == "featurestore") {
LOG_INFO("Create Elasticsearch index at " << elastic_featurestore_index);
FeaturestoreReindexer *reindexer = new FeaturestoreReindexer(connection_string.c_str(),
database_name.c_str(), meta_database_name.c_str(), hive_meta_database_name.c_str(), config,
elastic_featurestore_index, elastic_batch_size, elastic_issue_time, lru_cap);
reindexer->run();
} else if(reindex_of == "all") {
LOG_INFO("Create Elasticsearch index at " << elastic_index);
Reindexer *projectReindexer = new Reindexer(connection_string.c_str(),
database_name.c_str(), meta_database_name.c_str(), hive_meta_database_name.c_str(),
config, elastic_index, elastic_batch_size, elastic_issue_time, lru_cap);
projectReindexer->run();
LOG_INFO("Create Elasticsearch index at " << elastic_featurestore_index);
FeaturestoreReindexer *featurestoreReindexer = new FeaturestoreReindexer(connection_string.c_str(),
database_name.c_str(), meta_database_name.c_str(), hive_meta_database_name.c_str(), config,
elastic_featurestore_index, elastic_batch_size, elastic_issue_time, lru_cap);
featurestoreReindexer->run();
}
} else {
Notifier *notifer = new Notifier(connection_string.c_str(),
database_name.c_str(),
Expand Down

0 comments on commit f52794e

Please sign in to comment.