Skip to content

Commit

Permalink
Add DeleteNode to presto_protocol serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
ghelmling committed Dec 18, 2024
1 parent 85cc36a commit 9600b84
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.InputDistribution;
import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.LimitNode;
Expand Down Expand Up @@ -545,7 +546,7 @@ public PlanWithProperties visitDelete(DeleteNode node, PreferredProperties prefe
if (!node.getInputDistribution().isPresent()) {
return visitPlan(node, preferredProperties);
}
DeleteNode.InputDistribution inputDistribution = node.getInputDistribution().get();
InputDistribution inputDistribution = node.getInputDistribution().get();
List<LocalProperty<VariableReferenceExpression>> desiredProperties = new ArrayList<>();
if (!inputDistribution.getPartitionBy().isEmpty()) {
desiredProperties.add(new GroupingProperty<>(inputDistribution.getPartitionBy()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.plan.DeleteNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.InputDistribution;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
Expand Down Expand Up @@ -467,7 +468,7 @@ public PlanWithProperties visitDelete(DeleteNode node, StreamPreferredProperties
if (!node.getInputDistribution().isPresent()) {
return visitPlan(node, parentPreferences);
}
DeleteNode.InputDistribution inputDistribution = node.getInputDistribution().get();
InputDistribution inputDistribution = node.getInputDistribution().get();
StreamPreferredProperties childRequirements = parentPreferences
.constrainTo(node.getSource().getOutputVariables())
.withDefaultParallelism(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,10 @@ void to_json(json& j, const std::shared_ptr<PlanNode>& p) {
j = *std::static_pointer_cast<GroupIdNode>(p);
return;
}
if (type == ".DeleteNode") {
j = *std::static_pointer_cast<DeleteNode>(p);
return;
}
if (type == ".DistinctLimitNode") {
j = *std::static_pointer_cast<DistinctLimitNode>(p);
return;
Expand Down Expand Up @@ -745,6 +749,12 @@ void from_json(const json& j, std::shared_ptr<PlanNode>& p) {
p = std::static_pointer_cast<PlanNode>(k);
return;
}
if (type == ".DeleteNode") {
std::shared_ptr<DeleteNode> k = std::make_shared<DeleteNode>();
j.get_to(*k);
p = std::static_pointer_cast<PlanNode>(k);
return;
}
if (type == ".DistinctLimitNode") {
std::shared_ptr<DistinctLimitNode> k =
std::make_shared<DistinctLimitNode>();
Expand Down Expand Up @@ -1210,6 +1220,62 @@ void from_json(const json& j, Assignments& p) {
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
BaseInputDistribution::BaseInputDistribution() noexcept {
_type = ".BaseInputDistribution";
}

void to_json(json& j, const BaseInputDistribution& p) {
j = json::object();
j["@type"] = ".BaseInputDistribution";
to_json_key(
j,
"partitionBy",
p.partitionBy,
"BaseInputDistribution",
"List<VariableReferenceExpression>",
"partitionBy");
to_json_key(
j,
"orderingScheme",
p.orderingScheme,
"BaseInputDistribution",
"OrderingScheme",
"orderingScheme");
to_json_key(
j,
"inputVariables",
p.inputVariables,
"BaseInputDistribution",
"List<VariableReferenceExpression>",
"inputVariables");
}

void from_json(const json& j, BaseInputDistribution& p) {
p._type = j["@type"];
from_json_key(
j,
"partitionBy",
p.partitionBy,
"BaseInputDistribution",
"List<VariableReferenceExpression>",
"partitionBy");
from_json_key(
j,
"orderingScheme",
p.orderingScheme,
"BaseInputDistribution",
"OrderingScheme",
"orderingScheme");
from_json_key(
j,
"inputVariables",
p.inputVariables,
"BaseInputDistribution",
"List<VariableReferenceExpression>",
"inputVariables");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
// Loosly copied this here from NLOHMANN_JSON_SERIALIZE_ENUM()

// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
Expand Down Expand Up @@ -3191,6 +3257,101 @@ void from_json(const json& j, DeleteHandle& p) {
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
void to_json(json& j, const std::shared_ptr<InputDistribution>& p) {
if (p == nullptr) {
return;
}
String type = p->_type;

if (type == ".BaseInputDistribution") {
j = *std::static_pointer_cast<BaseInputDistribution>(p);
return;
}

throw TypeError(type + " no abstract type InputDistribution ");
}

void from_json(const json& j, std::shared_ptr<InputDistribution>& p) {
String type;
try {
type = p->getSubclassKey(j);
} catch (json::parse_error& e) {
throw ParseError(
std::string(e.what()) + " InputDistribution InputDistribution");
}

if (type == ".BaseInputDistribution") {
std::shared_ptr<BaseInputDistribution> k =
std::make_shared<BaseInputDistribution>();
j.get_to(*k);
p = std::static_pointer_cast<InputDistribution>(k);
return;
}

throw TypeError(type + " no abstract type InputDistribution ");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
DeleteNode::DeleteNode() noexcept {
_type = ".DeleteNode";
}

void to_json(json& j, const DeleteNode& p) {
j = json::object();
j["@type"] = ".DeleteNode";
to_json_key(j, "id", p.id, "DeleteNode", "PlanNodeId", "id");
to_json_key(j, "source", p.source, "DeleteNode", "PlanNode", "source");
to_json_key(
j,
"rowId",
p.rowId,
"DeleteNode",
"VariableReferenceExpression",
"rowId");
to_json_key(
j,
"outputVariables",
p.outputVariables,
"DeleteNode",
"List<VariableReferenceExpression>",
"outputVariables");
to_json_key(
j,
"inputDistribution",
p.inputDistribution,
"DeleteNode",
"InputDistribution",
"inputDistribution");
}

void from_json(const json& j, DeleteNode& p) {
p._type = j["@type"];
from_json_key(j, "id", p.id, "DeleteNode", "PlanNodeId", "id");
from_json_key(j, "source", p.source, "DeleteNode", "PlanNode", "source");
from_json_key(
j,
"rowId",
p.rowId,
"DeleteNode",
"VariableReferenceExpression",
"rowId");
from_json_key(
j,
"outputVariables",
p.outputVariables,
"DeleteNode",
"List<VariableReferenceExpression>",
"outputVariables");
from_json_key(
j,
"inputDistribution",
p.inputDistribution,
"DeleteNode",
"InputDistribution",
"inputDistribution");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
DistinctLimitNode::DistinctLimitNode() noexcept {
_type = ".DistinctLimitNode";
}
Expand Down Expand Up @@ -8690,6 +8851,13 @@ void to_json(json& j, const SortNode& p) {
"OrderingScheme",
"orderingScheme");
to_json_key(j, "isPartial", p.isPartial, "SortNode", "bool", "isPartial");
to_json_key(
j,
"partitionBy",
p.partitionBy,
"SortNode",
"List<VariableReferenceExpression>",
"partitionBy");
}

void from_json(const json& j, SortNode& p) {
Expand All @@ -8704,6 +8872,13 @@ void from_json(const json& j, SortNode& p) {
"OrderingScheme",
"orderingScheme");
from_json_key(j, "isPartial", p.isPartial, "SortNode", "bool", "isPartial");
from_json_key(
j,
"partitionBy",
p.partitionBy,
"SortNode",
"List<VariableReferenceExpression>",
"partitionBy");
}
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,21 @@ extern const char* const PRESTO_ABORT_TASK_URL_PARAM;
class Exception : public std::runtime_error {
public:
explicit Exception(const std::string& message)
: std::runtime_error(message){};
: std::runtime_error(message) {};
};

class TypeError : public Exception {
public:
explicit TypeError(const std::string& message) : Exception(message){};
explicit TypeError(const std::string& message) : Exception(message) {};
};

class OutOfRange : public Exception {
public:
explicit OutOfRange(const std::string& message) : Exception(message){};
explicit OutOfRange(const std::string& message) : Exception(message) {};
};
class ParseError : public Exception {
public:
explicit ParseError(const std::string& message) : Exception(message){};
explicit ParseError(const std::string& message) : Exception(message) {};
};

using String = std::string;
Expand Down Expand Up @@ -313,6 +313,11 @@ void to_json(json& j, const std::shared_ptr<ConnectorOutputTableHandle>& p);
void from_json(const json& j, std::shared_ptr<ConnectorOutputTableHandle>& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
struct InputDistribution : public JsonEncodedSubclass {};
void to_json(json& j, const std::shared_ptr<InputDistribution>& p);
void from_json(const json& j, std::shared_ptr<InputDistribution>& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
struct ValueSet : public JsonEncodedSubclass {};
void to_json(json& j, const std::shared_ptr<ValueSet>& p);
void from_json(const json& j, std::shared_ptr<ValueSet>& p);
Expand Down Expand Up @@ -524,6 +529,17 @@ void to_json(json& j, const Assignments& p);
void from_json(const json& j, Assignments& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
struct BaseInputDistribution : public InputDistribution {
List<VariableReferenceExpression> partitionBy = {};
std::shared_ptr<OrderingScheme> orderingScheme = {};
List<VariableReferenceExpression> inputVariables = {};

BaseInputDistribution() noexcept;
};
void to_json(json& j, const BaseInputDistribution& p);
void from_json(const json& j, BaseInputDistribution& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
enum class BufferType {
PARTITIONED,
BROADCAST,
Expand Down Expand Up @@ -1020,6 +1036,18 @@ void to_json(json& j, const DeleteHandle& p);
void from_json(const json& j, DeleteHandle& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
struct DeleteNode : public PlanNode {
std::shared_ptr<PlanNode> source = {};
VariableReferenceExpression rowId = {};
List<VariableReferenceExpression> outputVariables = {};
std::shared_ptr<InputDistribution> inputDistribution = {};

DeleteNode() noexcept;
};
void to_json(json& j, const DeleteNode& p);
void from_json(const json& j, DeleteNode& p);
} // namespace facebook::presto::protocol
namespace facebook::presto::protocol {
struct DistinctLimitNode : public PlanNode {
std::shared_ptr<PlanNode> source = {};
int64_t limit = {};
Expand Down Expand Up @@ -2025,6 +2053,7 @@ struct SortNode : public PlanNode {
std::shared_ptr<PlanNode> source = {};
OrderingScheme orderingScheme = {};
bool isPartial = {};
List<VariableReferenceExpression> partitionBy = {};

SortNode() noexcept;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,17 @@ AbstractClasses:
- { name: InsertHandle, key: InsertHandle }
- { name: DeleteHandle, key: DeleteHandle }

InputDistribution:
super: JsonEncodedSubclass
subclasses:
- { name: BaseInputDistribution, key: .BaseInputDistribution }

PlanNode:
super: JsonEncodedSubclass
subclasses:
- { name: AggregationNode, key: .AggregationNode }
- { name: GroupIdNode, key: com.facebook.presto.sql.planner.plan.GroupIdNode }
- { name: DeleteNode, key: .DeleteNode }
- { name: DistinctLimitNode, key: .DistinctLimitNode }
- { name: EnforceSingleRowNode, key: com.facebook.presto.sql.planner.plan.EnforceSingleRowNode }
- { name: ExchangeNode, key: com.facebook.presto.sql.planner.plan.ExchangeNode }
Expand Down Expand Up @@ -318,3 +324,5 @@ JavaClasses:
- presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java
- presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionFailureInfo.java
- presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionResponse.java
- presto-spi/src/main/java/com/facebook/presto/spi/plan/DeleteNode.java
- presto-spi/src/main/java/com/facebook/presto/spi/plan/BaseInputDistribution.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,17 @@ AbstractClasses:
- { name: InsertHandle, key: InsertHandle }
- { name: DeleteHandle, key: DeleteHandle }

InputDistribution:
super: JsonEncodedSubclass
subclasses:
- { name: BaseInputDistribution, key: BaseInputDistribution }

PlanNode:
super: JsonEncodedSubclass
subclasses:
- { name: AggregationNode, key: .AggregationNode }
- { name: GroupIdNode, key: com.facebook.presto.sql.planner.plan.GroupIdNode }
- { name: DeleteNode, key: .DeleteNode }
- { name: DistinctLimitNode, key: .DistinctLimitNode }
- { name: EnforceSingleRowNode, key: com.facebook.presto.sql.planner.plan.EnforceSingleRowNode }
- { name: ExchangeNode, key: com.facebook.presto.sql.planner.plan.ExchangeNode }
Expand Down Expand Up @@ -365,3 +371,5 @@ JavaClasses:
- presto-main/src/main/java/com/facebook/presto/connector/system/SystemTransactionHandle.java
- presto-spi/src/main/java/com/facebook/presto/spi/function/AggregationFunctionMetadata.java
- presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/json/JsonBasedUdfFunctionMetadata.java
- presto-spi/src/main/java/com/facebook/presto/spi/plan/DeleteNode.java
- presto-spi/src/main/java/com/facebook/presto/spi/plan/BaseInputDistribution.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ add_executable(
CallExpressionTest.cpp
ConstantExpressionTest.cpp
DataSizeTest.cpp
DeleteTest.cpp
DomainTest.cpp
DurationTest.cpp
LifespanTest.cpp
Expand Down
Loading

0 comments on commit 9600b84

Please sign in to comment.