Skip to content

Commit

Permalink
Changes to presto native to add plan conversion endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Sep 13, 2024
1 parent baf8253 commit 9375271
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
48 changes: 48 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,15 @@ void PrestoServer::run() {
prestoServerOperations_->runOperation(message, downstream);
});

httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
server->convertToVeloxPlan(message, downstream, body);
});

PRESTO_STARTUP_LOG(INFO) << "Driver CPU executor '"
<< driverExecutor_->getName() << "' has "
<< driverExecutor_->numThreads() << " threads.";
Expand Down Expand Up @@ -1401,6 +1410,45 @@ static protocol::Duration getUptime(
return protocol::Duration(seconds, protocol::TimeUnit::SECONDS);
}

void PrestoServer::convertToVeloxPlan(
proxygen::HTTPMessage* message,
proxygen::ResponseHandler* downstream,
const std::vector<std::unique_ptr<folly::IOBuf>>& body) {
std::string error;
try {
auto headers = message->getHeaders();

std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
std::string planFragmentJson = oss.str();
protocol::PlanFragment planFragment = json::parse(planFragmentJson);

auto queryCtx = core::QueryCtx::create();
VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_.get());

// Create static taskId and empty TableWriteInfo needed for plan conversion
protocol::TaskId taskId = "velox-plan-conversion.0.0.0";
auto tableWriteInfo = std::make_shared<protocol::TableWriteInfo>();

// Attempt to convert the plan fragment to a Velox plan
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);

} catch (VeloxException& e) {
error = e.message();
} catch (std::exception& e) {
error = e.what();
}

// Return ok status if conversion succeeded or error if failed
if (error.empty()) {
http::sendOkResponse(downstream, json(R"({ "status": "ok" })"));
} else {
http::sendErrorResponse(downstream, json(R"({ "status": "error", "message": ")" + error + R"(")})"));
}
}

void PrestoServer::reportMemoryInfo(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(**memoryInfo_.rlock()));
}
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ class PrestoServer {

void addServerPeriodicTasks();

void convertToVeloxPlan(
proxygen::HTTPMessage* message,
proxygen::ResponseHandler* downstream,
const std::vector<std::unique_ptr<folly::IOBuf>>& body);

void reportMemoryInfo(proxygen::ResponseHandler* downstream);

void reportServerInfo(proxygen::ResponseHandler* downstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
case protocol::SystemPartitioning::FIXED: {
switch (systemPartitioningHandle->function) {
case protocol::SystemPartitionFunction::ROUND_ROBIN: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition ? partitioningScheme.bucketToPartition->size() : 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand All @@ -1853,7 +1853,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
return planFragment;
}
case protocol::SystemPartitionFunction::HASH: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition ? partitioningScheme.bucketToPartition->size() : 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ add_executable(
TupleDomainTest.cpp
TypeErrorTest.cpp
VariableReferenceExpressionTest.cpp
PlanFragmentTest.cpp)
PlanFragmentTest.cpp
)
add_test(
NAME presto_protocol_test
COMMAND presto_protocol_test
Expand Down

0 comments on commit 9375271

Please sign in to comment.