Skip to content

Commit

Permalink
Merge pull request #71 from Matts966/feature/proto
Browse files Browse the repository at this point in the history
Add proto for gRPC service and struct schema reader
  • Loading branch information
Matts966 authored Oct 17, 2021
2 parents f6fa717 + 9f5bc13 commit e5f7f9f
Show file tree
Hide file tree
Showing 25 changed files with 342 additions and 132 deletions.
11 changes: 11 additions & 0 deletions alphasql/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ package(
default_visibility = ["//:__subpackages__"],
)

proto_library(
name = "alphasql_service_proto",
srcs = ["proto/alphasql_service.proto"],
)

cc_proto_library(
name = "alphasql_service_cc_proto",
deps = [":alphasql_service_proto"],
)

cc_library(
name = "json_schema_reader",
hdrs = ["json_schema_reader.h"],
Expand All @@ -26,6 +36,7 @@ cc_library(
"@com_google_zetasql//zetasql/public:type",
"@boost//:property_tree",
"@com_google_absl//absl/strings",
":alphasql_service_cc_proto"
],
)

Expand Down
15 changes: 9 additions & 6 deletions alphasql/alphacheck.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <utility>
#include <vector>
#include <algorithm>

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
Expand Down Expand Up @@ -114,12 +115,12 @@ using namespace zetasql;

SimpleCatalog *ConstructCatalog(const google::protobuf::DescriptorPool *pool,
TypeFactory *type_factory) {
auto catalog = new SimpleCatalog("catalog", type_factory);
auto catalog = new zetasql::SimpleCatalog("catalog", type_factory);
catalog->AddZetaSQLFunctions();
catalog->SetDescriptorPool(pool);
const std::string json_schema_path = absl::GetFlag(FLAGS_json_schema_path);
if (!json_schema_path.empty()) {
zetasql::UpdateCatalogFromJSON(json_schema_path, catalog);
UpdateCatalogFromJSON(json_schema_path, catalog);
}
return catalog;
}
Expand Down Expand Up @@ -165,8 +166,7 @@ absl::Status check(const std::string &sql, const ASTStatement *statement,
create_table_stmt->column_definition_list()) {
std::unique_ptr<zetasql::SimpleColumn> column(new SimpleColumn(
table_name, column_definition->column().name_id().ToString(),
catalog->type_factory()->MakeSimpleType(
column_definition->column().type()->kind())));
column_definition->column().type()));
ZETASQL_RETURN_IF_ERROR(table->AddColumn(column.release(), false));
}
catalog->AddOwnedTable(table.release());
Expand Down Expand Up @@ -222,7 +222,7 @@ absl::Status Run(const std::string &sql_file_path,
std::vector<std::string> temp_table_names;

std::unique_ptr<ParserOutput> parser_output;
ZETASQL_RETURN_IF_ERROR(alphasql::ParseScript(sql, options.GetParserOptions(),
ZETASQL_RETURN_IF_ERROR(zetasql::ParseScript(sql, options.GetParserOptions(),
options.error_message_mode(),
&parser_output, file_path));

Expand Down Expand Up @@ -397,7 +397,10 @@ int main(int argc, char *argv[]) {
status, sql_file_path);
std::cout << "ERROR: " << status << std::endl;
std::cout << "catalog:" << std::endl;
for (const std::string &table_name : catalog->table_names()) {
// For deterministic output
auto table_names = catalog->table_names();
std::sort(table_names.begin(), table_names.end());
for (const std::string &table_name : table_names) {
std::cout << "\t" << table_name << std::endl;
}
return 1;
Expand Down
3 changes: 1 addition & 2 deletions alphasql/common_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
#include "zetasql/parser/parse_tree.h"
#include "zetasql/parser/parser.h"

namespace alphasql {
namespace zetasql {

using namespace zetasql;
using namespace zetasql::parser;
using zetasql::parser::BisonParser;
using zetasql::parser::BisonParserMode;
Expand Down
2 changes: 1 addition & 1 deletion alphasql/identifier_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ GetIdentifierInformation(const std::string &sql_file_path) {
std::ifstream file(file_path, std::ios::in);
std::string sql(std::istreambuf_iterator<char>(file), {});

ZETASQL_RETURN_IF_ERROR(alphasql::ParseScript(sql, options.GetParserOptions(),
ZETASQL_RETURN_IF_ERROR(zetasql::ParseScript(sql, options.GetParserOptions(),
options.error_message_mode(),
&parser_output, file_path));

Expand Down
123 changes: 84 additions & 39 deletions alphasql/json_schema_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,66 +15,108 @@
//

#include "absl/strings/ascii.h"
#include "alphasql/proto/alphasql_service.pb.h"
#include "zetasql/base/status.h"
#include "zetasql/base/status_macros.h"
#include "zetasql/base/statusor.h"
#include "zetasql/public/simple_catalog.h"
#include "zetasql/public/types/type_factory.h"
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <google/protobuf/util/json_util.h>
#include <iostream>
#include <string>
#include <tuple>
#include <vector>

namespace zetasql {
namespace alphasql {

std::map<std::string, TypeKind> FromBigQueryTypeToZetaSQLTypeMap = {
{"STRING", TYPE_STRING}, {"INT64", TYPE_INT64},
{"INTEGER", TYPE_INT64}, {"BOOL", TYPE_BOOL},
{"BOOLEAN", TYPE_BOOL}, {"FLOAT64", TYPE_FLOAT},
{"FLOAT", TYPE_FLOAT}, {"NUMERIC", TYPE_NUMERIC},
{"BYTES", TYPE_BYTES}, {"TIMESTAMP", TYPE_TIMESTAMP},
{"DATE", TYPE_DATE}, {"TIME", TYPE_TIME},
{"DATETIME", TYPE_DATETIME}, {"GEOGRAPHY", TYPE_GEOGRAPHY},
std::map<SupportedType, zetasql::TypeKind> FromBigQueryTypeToZetaSQLTypeMap = {
{STRING, zetasql::TYPE_STRING}, {INT64, zetasql::TYPE_INT64},
{INTEGER, zetasql::TYPE_INT64}, {BOOL, zetasql::TYPE_BOOL},
{BOOLEAN, zetasql::TYPE_BOOL}, {FLOAT64, zetasql::TYPE_FLOAT},
{FLOAT, zetasql::TYPE_FLOAT}, {NUMERIC, zetasql::TYPE_NUMERIC},
{BYTES, zetasql::TYPE_BYTES}, {TIMESTAMP, zetasql::TYPE_TIMESTAMP},
{DATE, zetasql::TYPE_DATE}, {TIME, zetasql::TYPE_TIME},
{DATETIME, zetasql::TYPE_DATETIME}, {GEOGRAPHY, zetasql::TYPE_GEOGRAPHY},
};

void AddColumnToTable(SimpleTable *table,
const boost::property_tree::ptree::value_type field) {
std::string mode = field.second.get<std::string>("mode");
std::string type_string = field.second.get<std::string>("type");
mode = absl::AsciiStrToUpper(mode);
type_string = absl::AsciiStrToUpper(type_string);
static zetasql::TypeFactory tf;

if (FromBigQueryTypeToZetaSQLTypeMap.count(type_string) == 0) {
std::cout << "ERROR: unsupported type " + type_string + "\n" << std::endl;
// TODO: Handle return statuses of type:: functions
void ConvertSupportedTypeToZetaSQLType(const zetasql::Type **zetasql_type,
const Column *column) {
if (column->mode() == REPEATED && column->type() != RECORD) {
// Array types
*zetasql_type = zetasql::types::ArrayTypeFromSimpleTypeKind(
FromBigQueryTypeToZetaSQLTypeMap[column->type()]);
return;
}
if (column->type() != RECORD) {
*zetasql_type = zetasql::types::TypeFromSimpleTypeKind(
FromBigQueryTypeToZetaSQLTypeMap[column->type()]);
return;
}
// Struct types
std::vector<zetasql::StructField> fields;
for (const auto &field : column->fields()) {
const zetasql::Type *field_type;
ConvertSupportedTypeToZetaSQLType(&field_type, &field);
fields.push_back(zetasql::StructField(field.name(), field_type));
}
if (column->mode() != REPEATED) {
const auto status = tf.MakeStructTypeFromVector(fields, zetasql_type);
if (!status.ok()) {
std::cout << "ERROR converting record " << column->name() << ": "
<< status << std::endl;
}
return;
}
const zetasql::Type *element_type;
auto status = tf.MakeStructTypeFromVector(fields, &element_type);
if (!status.ok()) {
std::cout << "ERROR converting repeated record " << column->name() << ": "
<< status << std::endl;
}

status = tf.MakeArrayType(element_type, zetasql_type);
if (!status.ok()) {
std::cout << "ERROR converting repeated record " << column->name() << ": "
<< status << std::endl;
}
}

void AddColumnToTable(zetasql::SimpleTable *table, const std::string field) {
Column column_msg;
google::protobuf::util::JsonParseOptions jsonParseOptions;
jsonParseOptions.ignore_unknown_fields = true;
const auto status = google::protobuf::util::JsonStringToMessage(
field, &column_msg, jsonParseOptions);
if (!status.ok()) {
std::cout << "ERROR: " << status << std::endl;
throw;
}

const zetasql::Type *zetasql_type;

// TODO(Matts966): Implement Struct types
if (mode == "REPEATED" && type_string != "RECORD") {
// Array types
zetasql_type = types::ArrayTypeFromSimpleTypeKind(
FromBigQueryTypeToZetaSQLTypeMap[type_string]);
} else {
zetasql_type = types::TypeFromSimpleTypeKind(
FromBigQueryTypeToZetaSQLTypeMap[type_string]);
}
ConvertSupportedTypeToZetaSQLType(&zetasql_type, &column_msg);

if (zetasql_type == nullptr) {
std::cout << "ERROR: unsupported type " + type_string + "\n" << std::endl;
std::string message;
google::protobuf::util::MessageToJsonString(column_msg, &message);
std::cout << "ERROR: invalid column " << message << std::endl;
throw;
}

std::unique_ptr<SimpleColumn> column(new SimpleColumn(
table->Name(), field.second.get<std::string>("name"), zetasql_type));
table->AddColumn(column.release(), true);
std::unique_ptr<zetasql::SimpleColumn> zetasql_column(
new zetasql::SimpleColumn(table->Name(), column_msg.name(),
zetasql_type));
table->AddColumn(zetasql_column.release(), true);
}

void UpdateCatalogFromJSON(const std::string &json_schema_path,
SimpleCatalog *catalog) {
if (!std::filesystem::is_regular_file(json_schema_path) &
zetasql::SimpleCatalog *catalog) {
if (!std::filesystem::is_regular_file(json_schema_path) &&
!std::filesystem::is_fifo(json_schema_path)) {
std::cout << "ERROR: not a json file path " << json_schema_path
<< std::endl;
Expand All @@ -85,20 +127,23 @@ void UpdateCatalogFromJSON(const std::string &json_schema_path,
property_tree::ptree pt;
property_tree::read_json(json_schema_path, pt);

std::string table_name;
std::vector<std::unique_ptr<zetasql::SimpleTable>> tables;
for (property_tree::ptree::const_iterator it = pt.begin(); it != pt.end();
++it) {
table_name = it->first;
const auto table_name = it->first;
const property_tree::ptree &schema = it->second;
std::unique_ptr<SimpleTable> table(new SimpleTable(table_name));
BOOST_FOREACH (const property_tree::ptree::value_type &field, schema) {
AddColumnToTable(table.get(), field);
std::unique_ptr<zetasql::SimpleTable> table(
new zetasql::SimpleTable(table_name));
for (property_tree::ptree::const_iterator it = schema.begin();
it != schema.end(); ++it) {
std::ostringstream oss;
property_tree::write_json(oss, it->second);
AddColumnToTable(table.get(), oss.str());
}

catalog->AddTable(table.release());
}

return;
}

} // namespace zetasql
} // namespace alphasql
81 changes: 81 additions & 0 deletions alphasql/proto/alphasql_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
syntax = "proto2";

message File {
required string name = 1;
required string content = 2;
}

message AlphaDAGRequest {
required bool warning_as_error = 1;
required bool with_tables = 2;
required bool with_functions = 3;
required bool side_effect_first = 4;
repeated File files = 5;
}

message AlphaDAGResponse {
repeated string external_required_tables = 1;
repeated string dag_dot_string = 2;
optional string error = 3;
}

enum SupportedType {
STRING = 0;
INTEGER = 1;
INT64 = 2;
BOOLEAN = 3;
BOOL = 4;
FLOAT64 = 5;
FLOAT = 6;
NUMERIC = 7;
BYTES = 8;
TIMESTAMP = 9;
TIME = 10;
DATETIME = 11;
DATE = 12;
GEOGRAPHY = 13;
RECORD = 14;
}

enum Mode {
REPEATED = 0;
NULLABLE = 1;
REQUIRED = 2;
}

/* import "google/protobuf/any.proto"; */

message Column {
required string name = 1;
required SupportedType type = 2;
required Mode mode = 3;
// For record types
repeated Column fields = 4;
/* optional string description = 6; */
/* optional google.protobuf.Any policyTags = 7; */
}

message TableSchema {
required string table_name = 1;
repeated Column columns = 2;
}

message AlphaCheckRequest {
repeated TableSchema external_required_tables_schema = 1;
required string dag_dot_string = 2;
repeated File files = 3;
}

message AlphaCheckResponse {
optional string error = 1;
}

service AlphaSQL {
// Extract DAG from SQL files
rpc AlphaDAG(AlphaDAGRequest)
returns (AlphaDAGResponse) {}

// Validate DAG
rpc AlphaCheck(AlphaCheckRequest)
returns (AlphaCheckResponse) {}
}
2 changes: 1 addition & 1 deletion alphasql/table_name_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ absl::Status GetTables(const std::string &sql_file_path,
std::filesystem::path file_path(sql_file_path);
std::ifstream file(file_path, std::ios::in);
std::string sql(std::istreambuf_iterator<char>(file), {});
ZETASQL_RETURN_IF_ERROR(alphasql::ParseScript(
ZETASQL_RETURN_IF_ERROR(zetasql::ParseScript(
sql, analyzer_options.GetParserOptions(),
analyzer_options.error_message_mode(), &parser_output, file_path));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
INSERT INTO `dataset.main`
SELECT
1;
1, STRUCT("test", "", STRUCT(4));
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ SELECT
"this table is intended to be deleted by DROP TABLE IF EXISTS statements in SQL files that reference a table after this query finished" AS explanation;
INSERT INTO `dataset.main`
SELECT
1;
1, NULL;
2 changes: 1 addition & 1 deletion samples/sample-ci/alphacheck_stdout.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Analyzing "samples/sample-ci/sample/create_datawarehouse3.sql"
ERROR: INVALID_ARGUMENT: Table not found: `bigquery-public-data.samples.gsod` [at samples/sample-ci/sample/create_datawarehouse3.sql:5:3]
catalog:
dataset.main
tablename1
tablename2
dataset.main
Loading

0 comments on commit e5f7f9f

Please sign in to comment.