Skip to content

Commit

Permalink
Merge pull request #663 from treasure-data/last-results-compat
Browse files Browse the repository at this point in the history
Making "store_last_results: true" consistent between td> and pg>
  • Loading branch information
frsyuki authored Oct 10, 2017
2 parents 7f5f836 + 6f1aef6 commit 14545bd
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 47 deletions.
16 changes: 12 additions & 4 deletions digdag-docs/src/operators/pg.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _export:
```
+select_members:
pg>: select_members.sql
store_last_results: true
store_last_results: first
+send_email:
for_each>:
Expand Down Expand Up @@ -90,14 +90,22 @@ _export:
download_file: output.csv
```

* **store_last_results**: BOOLEAN
* **store_last_results**: first | all

Whether to store the query result. *Default:* `false`.
Whether to store the query results to ``pg.last_results`` parameter. *Default:* `false`.

Setting ``first`` stores the first row to the parameter as an object (e.g. ``${pg.last_results.count}``).

Setting ``all`` stores all rows to the parameter as an array of objects (e.g. ``${pg.last_results[0].name}``). If number of rows exceeds limit, task fails.

Examples:

```
store_last_results: true
store_last_results: first
```

```
store_last_results: all
```

* **database**: NAME
Expand Down
14 changes: 11 additions & 3 deletions digdag-docs/src/operators/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _export:
```
+select_members:
redshift>: select_members.sql
store_last_results: true
store_last_results: first
+send_email:
for_each>:
Expand Down Expand Up @@ -93,12 +93,20 @@ _export:

* **store_last_results**: BOOLEAN

Whether to store the query result. *Default:* `false`.
Whether to store the query results to ``redshift.last_results`` parameter. *Default:* `false`.

Setting ``first`` stores the first row to the parameter as an object (e.g. ``${redshift.last_results.count}``).

Setting ``all`` stores all rows to the parameter as an array of objects (e.g. ``${redshift.last_results[0].name}``). If number of rows exceeds limit, task fails.

Examples:

```
store_last_results: true
store_last_results: first
```

```
store_last_results: all
```

* **database**: NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigElement;
import io.digdag.client.config.ConfigException;
Expand Down Expand Up @@ -71,15 +72,15 @@ protected TaskResult run(Config params, Config state, C connectionConfig)
throw new ConfigException("Can't use download_file with insert_into or create_table");
}

boolean storeResult = params.get("store_last_results", boolean.class, false);
if (storeResult && queryModifier > 0) {
StoreLastResultsOption storeResultsOption = params.get("store_last_results", StoreLastResultsOption.class, StoreLastResultsOption.FALSE);
if (storeResultsOption.isEnabled() && queryModifier > 0) {
throw new ConfigException("Can't use store_last_results with insert_into or create_table");
}
if (downloadFile.isPresent() && storeResult) {
if (downloadFile.isPresent() && storeResultsOption.isEnabled()) {
throw new ConfigException("Can't use both download_file and store_last_results at once");
}

boolean readOnlyMode = downloadFile.isPresent() || storeResult;
boolean readOnlyMode = downloadFile.isPresent() || storeResultsOption.isEnabled();

boolean strictTransaction = strictTransaction(params);

Expand Down Expand Up @@ -111,8 +112,8 @@ protected TaskResult run(Config params, Config state, C connectionConfig)
if (downloadFile.isPresent()) {
connection.executeReadOnlyQuery(query, (results) -> downloadResultsToFile(results, downloadFile.get()));
}
else if (storeResult) {
connection.executeReadOnlyQuery(query, (results) -> storeResultInTaskResult(builder, results));
else if (storeResultsOption.isEnabled()) {
connection.executeReadOnlyQuery(query, (results) -> storeResultsInTaskResult(results, storeResultsOption, builder));
}
else {
connection.executeReadOnlyQuery(query, (results) -> skipResults(results));
Expand Down Expand Up @@ -228,13 +229,35 @@ private void skipResults(JdbcResultSet results)
;
}

private void storeResultInTaskResult(ImmutableTaskResult.Builder builder, JdbcResultSet jdbcResultSet)
private void storeResultsInTaskResult(JdbcResultSet jdbcResultSet, StoreLastResultsOption option, ImmutableTaskResult.Builder builder)
{
List<String> columnNames = jdbcResultSet.getColumnNames();
if (columnNames.size() > maxStoreLastResultsColumns) {
throw new TaskExecutionException("The number of result columns exceeded the limit: " + columnNames.size() + " > " + maxStoreLastResultsColumns);
int columnsCount = jdbcResultSet.getColumnNames().size();
if (columnsCount > maxStoreLastResultsColumns) {
throw new TaskExecutionException("The number of result columns exceeded the limit: " + columnsCount + " > " + maxStoreLastResultsColumns);
}

Object lastResults;
switch (option) {
case ALL:
lastResults = collectAllResults(jdbcResultSet);
break;
case FIRST:
lastResults = collectFirstResults(jdbcResultSet);
break;
default:
throw new AssertionError("Unexpected StoreLastResultsOption: " + option);
}
List<Map<String, Object>> lastResult = new ArrayList<>();

Config storeParams = request.getConfig().getFactory().create();
storeParams.getNestedOrSetEmpty(type())
.set("last_results", lastResults);
builder.storeParams(storeParams);
}

private List<Map<String, Object>> collectAllResults(JdbcResultSet jdbcResultSet)
{
List<String> columnNames = jdbcResultSet.getColumnNames();
ImmutableList.Builder<Map<String, Object>> lastResults = ImmutableList.builder();

long rows = 0;
while (true) {
Expand All @@ -248,25 +271,41 @@ private void storeResultInTaskResult(ImmutableTaskResult.Builder builder, JdbcRe
throw new TaskExecutionException("The number of result rows exceeded the limit: " + rows + " > " + maxStoreLastResultsRows);
}

HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < columnNames.size(); i++) {
Object v = values.get(i);
if (v instanceof String) {
String s = (String) v;
if (s.length() > maxStoreLastResultsValueSize) {
throw new TaskExecutionException("The size of result value exceeded the limit: " + s.length() + " > " + maxStoreLastResultsValueSize);
}
}
map.put(columnNames.get(i), v);
}
lastResult.add(map);
lastResults.add(buildResultsMap(columnNames, values));
}

return lastResults.build();
}

private Map<String, Object> collectFirstResults(JdbcResultSet jdbcResultSet)
{
List<Object> values = jdbcResultSet.next();
if (values == null) {
return new HashMap<>();
}

ConfigFactory cf = request.getConfig().getFactory();
Config result = cf.create();
Config taskState = result.getNestedOrSetEmpty(type());
taskState.set("last_results", lastResult);
Map<String, Object> lastResults = buildResultsMap(jdbcResultSet.getColumnNames(), values);

// consume all results
while (jdbcResultSet.skip())
;

return lastResults;
}

builder.storeParams(result);
private Map<String, Object> buildResultsMap(List<String> columnNames, List<Object> values)
{
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < columnNames.size(); i++) {
Object v = values.get(i);
if (v instanceof String) {
String s = (String) v;
if (s.length() > maxStoreLastResultsValueSize) {
throw new TaskExecutionException("The size of result value exceeded the limit: " + s.length() + " > " + maxStoreLastResultsValueSize);
}
}
map.put(columnNames.get(i), v);
}
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public List<Object> next()
}
}

@Override
public boolean skip()
{
try {
return resultSet.next();
}
catch (SQLException ex) {
throw new DatabaseException("Failed to fetch next rows", ex);
}
}

private List<Object> getObjects() throws SQLException
{
List<Object> results = new ArrayList<>(columnNames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public interface JdbcResultSet
List<String> getColumnNames();

List<Object> next();

boolean skip();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.digdag.standards.operator.jdbc;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import io.digdag.client.config.ConfigException;
import static java.util.Locale.ENGLISH;

public enum StoreLastResultsOption
{
FALSE(false),
ALL(true),
FIRST(true);

private final boolean enabled;

private StoreLastResultsOption(boolean enabled)
{
this.enabled = enabled;
}

public boolean isEnabled()
{
return enabled;
}

@JsonCreator
public static StoreLastResultsOption parse(String text)
{
switch (text) {
case "false":
return FALSE;
case "true":
case "first":
return FIRST;
case "all":
return ALL;
default:
throw new ConfigException("last_results must be either of \"first\" or \"all\": " + text);
}
}

@JsonValue
public String toString()
{
return name().toLowerCase(ENGLISH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ private TdOperator(OperatorContext context)
throw new ConfigException("result_settings is valid only if result_connection is set");
}

// TODO store_last_results should be io.digdag.standards.operator.jdbc.StoreLastResultsOption
// instead of boolean to be consistent with pg> and redshift> operators but not implemented yet.
this.storeLastResults = params.get("store_last_results", boolean.class, false);

this.preview = params.get("preview", boolean.class, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private TdRunOperator(OperatorContext context)
this.command = params.get("_command", JsonNode.class);
this.sessionTime = params.get("session_time", Instant.class);
this.downloadFile = params.getOptional("download_file", String.class);
// TODO store_last_results should be io.digdag.standards.operator.jdbc.StoreLastResultsOption
// instead of boolean to be consistent with pg> and redshift> operators but not implemented yet.
this.storeLastResults = params.get("store_last_results", boolean.class, false);
this.preview = params.get("preview", boolean.class, false);
}
Expand Down
Loading

0 comments on commit 14545bd

Please sign in to comment.