Skip to content

Commit

Permalink
Merge pull request #20 from Matts966/alphasql
Browse files Browse the repository at this point in the history
Add function_name_resolver
  • Loading branch information
Matts966 authored Jul 31, 2020
2 parents fd9bfb8 + d7ba360 commit fb9e033
Show file tree
Hide file tree
Showing 27 changed files with 398 additions and 89 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ osx:
.PHONY: sample
sample: osx
ls -d samples/*/ | while read sample; do \
echo ""; \
dag $$sample --output_path $$sample/dag.dot; \
dot -Tpng $$sample/dag.dot -o $$sample/dag.png; \
pipeline_type_checker $$sample/dag.dot \
Expand Down
54 changes: 27 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ docker run --rm -v `pwd`:/home matts966/alphasql:latest [command]
like

```bash
docker run --rm -v `pwd`:/home matts966/alphasql:latest pipeline_type_checker ./samples/sample1/dag.dot
docker run --rm -v `pwd`:/home matts966/alphasql:latest pipeline_type_checker ./samples/sample/dag.dot
```

Commands are installed in the PATH of the image.
Expand All @@ -61,26 +61,26 @@ wget -P $temp https://github.com/Matts966/alphasql/releases/latest/download/alph

```bash
# To extract DAG from your SQL set
$ dag --output_path ./samples/sample1/dag.dot ./samples/sample1/
$ dag --output_path ./samples/sample/dag.dot ./samples/sample/

# Or you can check the output in stdout by
$ dag [paths]

# with graphviz
$ dot -Tpng samples/sample1/dag.dot -o samples/sample1/dag.png
$ dot -Tpng samples/sample/dag.dot -o samples/sample/dag.png
```

Note that sometimes the output has cycle, and refactoring SQL files or manual editing of the dot file is needed (see [this issue](https://github.com/Matts966/alphasql/issues/2)).

If there are cycles, warning is emitted, type checker reports error, and bq_jobrunner raise error before execution. You can see the example in [./samples/sample-cycle](./samples/sample-cycle) .

If you want to serially execute some statements, you can write SQL script that contains multiple statements. See [samples/sample1/create_interim1.sql](samples/sample1/create_interim1.sql) as an example.
If you want to serially execute some statements, you can write SQL script that contains multiple statements. See [samples/sample/create_interim1.sql](samples/sample/create_interim1.sql) as an example.

### Sample DAG output

The image below is extracted from SQL set in [./samples/sample1](./samples/sample1) . You can write tests for created tables and run them parallely only by separating SQL file.
The image below is extracted from SQL set in [./samples/sample](./samples/sample) . You can write tests for created tables and run them parallely only by separating SQL file.

![dag.dot](samples/sample1/dag.png)
![dag.dot](samples/sample/dag.png)

## Parallel Execution

Expand Down Expand Up @@ -109,55 +109,55 @@ Note that you should run type_checker in the same path as in extracting DAG.

```bash
# to check type and schema of SQL set
$ pipeline_type_checker ./samples/sample1.dot
Analyzing "./samples/sample1/create_datawarehouse3.sql"
$ pipeline_type_checker ./samples/sample.dot
Analyzing "./samples/sample/create_datawarehouse3.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_datawarehouse2.sql"
Analyzing "./samples/sample/create_datawarehouse2.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_interim2.sql"
Analyzing "./samples/sample/create_interim2.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/update_interim2.sql"
Analyzing "./samples/sample/update_interim2.sql"
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_datawarehouse1.sql"
Analyzing "./samples/sample/create_datawarehouse1.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_interim3.sql"
Analyzing "./samples/sample/create_interim3.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_interim1.sql"
Analyzing "./samples/sample/create_interim1.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/update_interium1.sql"
Analyzing "./samples/sample/update_interium1.sql"
SUCCESS: analysis finished!
Analyzing "./samples/sample1/insert_into_interim1.sql"
Analyzing "./samples/sample/insert_into_interim1.sql"
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_mart.sql"
Analyzing "./samples/sample/create_mart.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/test_mart1.sql"
Analyzing "./samples/sample/test_mart1.sql"
SUCCESS: analysis finished!
Analyzing "./samples/sample1/test_mart2.sql"
Analyzing "./samples/sample/test_mart2.sql"
SUCCESS: analysis finished!
Analyzing "./samples/sample1/test_mart3.sql"
Analyzing "./samples/sample/test_mart3.sql"
SUCCESS: analysis finished!
Successfully finished type check!
```

If you change column `x`'s type in `./samples/sample1/create_datawarehouse3.sql` to `STRING`, type checker reports error.
If you change column `x`'s type in `./samples/sample/create_datawarehouse3.sql` to `STRING`, type checker reports error.

```bash
$ pipeline_type_checker ./samples/sample1/dag.dot
Analyzing "./samples/sample1/create_datawarehouse3.sql"
$ pipeline_type_checker ./samples/sample/dag.dot
Analyzing "./samples/sample/create_datawarehouse3.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_datawarehouse2.sql"
Analyzing "./samples/sample/create_datawarehouse2.sql"
DDL analyzed, adding table to catalog...
SUCCESS: analysis finished!
Analyzing "./samples/sample1/create_interim2.sql"
ERROR: INVALID_ARGUMENT: Column 1 in UNION ALL has incompatible types: INT64, STRING [at ./samples/sample1/create_interim2.sql:7:1]
Analyzing "./samples/sample/create_interim2.sql"
ERROR: INVALID_ARGUMENT: Column 1 in UNION ALL has incompatible types: INT64, STRING [at ./samples/sample/create_interim2.sql:7:1]
catalog:
datawarehouse3
datawarehouse2
Expand All @@ -169,7 +169,7 @@ You can specify external schemata (not created by queries in SQL set) by passing

```bash
# with external schema
$ pipeline_type_checker --json_schema_path ./samples/sample-schema.json ./samples/sample1/dag.dot
$ pipeline_type_checker --json_schema_path ./samples/sample-schema.json ./samples/sample/dag.dot
```

You can extract required external tables by
Expand Down
24 changes: 17 additions & 7 deletions alphasql/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ cc_library(
],
)

cc_library(
name = "function_name_resolver",
hdrs = ["function_name_resolver.h"],
srcs = ["function_name_resolver.cc"],
deps = [
"@com_google_zetasql//zetasql/public:simple_catalog",
"@com_google_zetasql//zetasql/public:type",
"@com_google_zetasql//zetasql/public:analyzer",
"@com_google_zetasql//zetasql/public:language_options",
"@com_google_zetasql//zetasql/parser:parser",
"@com_google_zetasql//zetasql/analyzer",
"@boost//:property_tree",
"@com_google_absl//absl/strings",
],
)

cc_binary(
name = "pipeline_type_checker",
srcs = [
Expand Down Expand Up @@ -80,13 +96,6 @@ cc_binary(
name = "dag",
srcs = ["dag.cc"],
deps = [
"@com_google_zetasql//zetasql/analyzer",
"@com_google_zetasql//zetasql/resolved_ast",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/strings",
"@boost//:graph",
":identifier_resolver",
":dag_lib",
],
)
Expand All @@ -103,6 +112,7 @@ cc_library(
"@com_google_absl//absl/strings",
"@boost//:graph",
":identifier_resolver",
":function_name_resolver",
],
)

Expand Down
13 changes: 11 additions & 2 deletions alphasql/dag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ int main(int argc, char* argv[]) {
std::vector<char*> remaining_args(args.begin() + 1, args.end());

std::map<std::string, table_queries> table_queries_map;
std::map<std::string, function_queries> function_queries_map;
std::set<std::string> vertices;
std::cout << "Reading paths passed as a command line arguments..." << std::endl;
std::cout << "Only files that end with .sql or .bq are analyzed." << std::endl;
for (const auto& path : remaining_args) {
if (std::filesystem::is_regular_file(path)) {
std::filesystem::path file_path(path);
absl::Status status = alphasql::UpdateTableQueriesMapAndVertices(file_path, table_queries_map, vertices);
absl::Status status = alphasql::UpdateIdentifierQueriesMapsAndVertices(file_path, table_queries_map,
function_queries_map, vertices);
if (!status.ok()) {
std::cout << status << std::endl;
return 1;
Expand All @@ -47,7 +49,8 @@ int main(int argc, char* argv[]) {
if (err) {
std::cout << "WARNING: " << err << std::endl;
}
absl::Status status = alphasql::UpdateTableQueriesMapAndVertices(file_path->path(), table_queries_map, vertices);
absl::Status status = alphasql::UpdateIdentifierQueriesMapsAndVertices(file_path->path(), table_queries_map,
function_queries_map, vertices);
if (!status.ok()) {
std::cout << status << std::endl;
return 1;
Expand All @@ -66,6 +69,12 @@ int main(int argc, char* argv[]) {
}
}

for (auto const& [_, function_queries] : function_queries_map) {
alphasql::UpdateEdges(depends_on, function_queries.call, {
function_queries.create,
});
}

const int nedges = depends_on.size();

using namespace boost;
Expand Down
66 changes: 39 additions & 27 deletions alphasql/dag_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
#include "zetasql/base/statusor.h"
#include "zetasql/public/analyzer.h"
#include "alphasql/identifier_resolver.h"
#include "alphasql/function_name_resolver.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_join.h"
#include "boost/graph/graphviz.hpp"
#include "boost/graph/depth_first_search.hpp"
#include "absl/strings/str_join.h"

typedef std::pair<std::string, std::string> Edge;

Expand All @@ -40,50 +41,36 @@ struct table_queries {
std::vector<std::string> others;
};

struct function_queries {
std::vector<std::string> create;
std::vector<std::string> call;
};

namespace alphasql {

using namespace zetasql;

// Returns <options> if it already has all arenas initialized, or otherwise
// populates <copy> as a copy for <options>, creates arenas in <copy> and
// returns it. This avoids unnecessary duplication of AnalyzerOptions, which
// might be expensive.
const AnalyzerOptions& GetOptionsWithArenas(
const AnalyzerOptions* options, std::unique_ptr<AnalyzerOptions>* copy) {
if (options->AllArenasAreInitialized()) {
return *options;
}
*copy = absl::make_unique<AnalyzerOptions>(*options);
(*copy)->CreateDefaultArenasIfNotSet();
return **copy;
}

zetasql_base::StatusOr<std::map<ResolvedNodeKind, TableNamesSet>> ExtractTableNamesFromSQL(const std::string& sql_file_path,
const AnalyzerOptions& analyzer_options,
TableNamesSet* table_names) {
LanguageOptions language_options;
language_options.EnableMaximumLanguageFeaturesForDevelopment();
language_options.SetEnabledLanguageFeatures({FEATURE_V_1_3_ALLOW_DASHES_IN_TABLE_NAME});
language_options.SetSupportsAllStatementKinds();
AnalyzerOptions options(language_options);
options.mutable_language()->EnableMaximumLanguageFeaturesForDevelopment();
options.CreateDefaultArenasIfNotSet();

std::unique_ptr<AnalyzerOptions> copy;
const AnalyzerOptions& options = GetAnalyzerOptions();
return identifier_resolver::GetNodeKindToTableNamesMap(
sql_file_path, options, table_names);
}

absl::Status UpdateTableQueriesMapAndVertices(const std::filesystem::path& file_path,
absl::Status UpdateIdentifierQueriesMapsAndVertices(const std::filesystem::path& file_path,
std::map<std::string, table_queries>& table_queries_map,
std::map<std::string, function_queries>& function_queries_map,
std::set<std::string>& vertices) {
if (file_path.extension() != ".bq" && file_path.extension() != ".sql") {
// std::cout << "not a sql file " << file_path << "!" << std::endl;
// Skip if not SQL.
return absl::OkStatus();
}
std::cout << "Reading " << file_path << std::endl;
const AnalyzerOptions options = GetAnalyzerOptions();

TableNamesSet table_names;
auto node_kind_to_table_names_or_status = ExtractTableNamesFromSQL(file_path.string(), &table_names);
auto node_kind_to_table_names_or_status = ExtractTableNamesFromSQL(file_path.string(), options, &table_names);
if (!node_kind_to_table_names_or_status.ok()) {
return node_kind_to_table_names_or_status.status();
}
Expand All @@ -104,6 +91,31 @@ namespace alphasql {
table_queries_map[table_string].others.push_back(file_path);
}

// Resolve file dependency from SQL files calling functions on the callee.
auto function_information_or_status = function_name_resolver::GetFunctionInformation(file_path.string(), options);
if (!function_information_or_status.ok()) {
return function_information_or_status.status();
}
auto function_info = function_information_or_status.value();

// TODO(Matts966): Multiple function definitions lead to cycles like tables
// and we should think if we should raise error here.
for (auto const& defined : function_info.defined) {
const std::string function_name = absl::StrJoin(defined, ".");

std::cout << function_name << " defined in " << file_path << std::endl;

function_queries_map[function_name].create.push_back(file_path);
}
for (auto const& called : function_info.called) {
const std::string function_name = absl::StrJoin(called, ".");

std::cout << function_name << " called in " << file_path << std::endl;

function_queries_map[function_name].call.push_back(file_path);
}

// Add the file as a vertice.
vertices.insert(file_path);

return absl::OkStatus();
Expand Down
Loading

0 comments on commit fb9e033

Please sign in to comment.