From 6f1aef645ffe2da7a2e2b04266c3828184404a3e Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Fri, 6 Oct 2017 14:20:25 -0700 Subject: [PATCH] Making "store_last_results: true" consistent between td> and pg> `td>` and `td_run>` operators support `store_last_results: boolean` option. It stores the first row to `${td.last_results}` parameter. But `pg>` and `redshift>` operators store all rows and throw an exception if number of rows exceeds limit. This change makes it possible to achieve the consistent behavior to be less surprising. `pg>` and `redshift>` operators support `all`, `first` and `true` options. `all` stores all rows and causes exception if rows exceeds limit. `first` sets the first row. `true` is an alias of `first` to be consistent with `td>` operator. `td>` operator should also support `all` and `first` but not implemented yet. --- digdag-docs/src/operators/pg.md | 16 ++- digdag-docs/src/operators/redshift.md | 14 ++- .../jdbc/AbstractJdbcJobOperator.java | 95 ++++++++++++------ .../operator/jdbc/AbstractJdbcResultSet.java | 11 +++ .../operator/jdbc/JdbcResultSet.java | 2 + .../operator/jdbc/StoreLastResultsOption.java | 47 +++++++++ .../operator/td/TdOperatorFactory.java | 2 + .../operator/td/TdRunOperatorFactory.java | 2 + .../jdbc/AbstractJdbcJobOperatorTest.java | 98 +++++++++++++++++-- .../src/test/java/acceptance/PgIT.java | 27 +++++ .../test/java/acceptance/td/RedshiftIT.java | 28 ++++++ .../pg/select_store_last_results.dig | 2 +- .../pg/select_store_last_results_first.dig | 11 +++ .../redshift/select_store_last_results.dig | 2 +- .../select_store_last_results_first.dig | 11 +++ 15 files changed, 321 insertions(+), 47 deletions(-) create mode 100644 digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/StoreLastResultsOption.java create mode 100644 digdag-tests/src/test/resources/acceptance/pg/select_store_last_results_first.dig create mode 100644 digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results_first.dig diff --git a/digdag-docs/src/operators/pg.md b/digdag-docs/src/operators/pg.md index 082fe3904e..20613d3542 100644 --- a/digdag-docs/src/operators/pg.md +++ b/digdag-docs/src/operators/pg.md @@ -28,7 +28,7 @@ _export: ``` +select_members: pg>: select_members.sql - store_last_results: true + store_last_results: first +send_email: for_each>: @@ -90,14 +90,22 @@ _export: download_file: output.csv ``` -* **store_last_results**: BOOLEAN +* **store_last_results**: first | all - Whether to store the query result. *Default:* `false`. + Whether to store the query results to ``pg.last_results`` parameter. *Default:* `false`. + + Setting ``first`` stores the first row to the parameter as an object (e.g. ``${pg.last_results.count}``). + + Setting ``all`` stores all rows to the parameter as an array of objects (e.g. ``${pg.last_results[0].name}``). If number of rows exceeds limit, task fails. Examples: ``` - store_last_results: true + store_last_results: first + ``` + + ``` + store_last_results: all ``` * **database**: NAME diff --git a/digdag-docs/src/operators/redshift.md b/digdag-docs/src/operators/redshift.md index 233389568e..4373ccd2b1 100644 --- a/digdag-docs/src/operators/redshift.md +++ b/digdag-docs/src/operators/redshift.md @@ -28,7 +28,7 @@ _export: ``` +select_members: redshift>: select_members.sql - store_last_results: true + store_last_results: first +send_email: for_each>: @@ -93,12 +93,20 @@ _export: * **store_last_results**: BOOLEAN - Whether to store the query result. *Default:* `false`. + Whether to store the query results to ``redshift.last_results`` parameter. *Default:* `false`. + + Setting ``first`` stores the first row to the parameter as an object (e.g. ``${redshift.last_results.count}``). + + Setting ``all`` stores all rows to the parameter as an array of objects (e.g. ``${redshift.last_results[0].name}``). If number of rows exceeds limit, task fails. Examples: ``` - store_last_results: true + store_last_results: first + ``` + + ``` + store_last_results: all ``` * **database**: NAME diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java index 1d7a71bdbc..efbedd4197 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java @@ -2,6 +2,7 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigElement; import io.digdag.client.config.ConfigException; @@ -71,15 +72,15 @@ protected TaskResult run(Config params, Config state, C connectionConfig) throw new ConfigException("Can't use download_file with insert_into or create_table"); } - boolean storeResult = params.get("store_last_results", boolean.class, false); - if (storeResult && queryModifier > 0) { + StoreLastResultsOption storeResultsOption = params.get("store_last_results", StoreLastResultsOption.class, StoreLastResultsOption.FALSE); + if (storeResultsOption.isEnabled() && queryModifier > 0) { throw new ConfigException("Can't use store_last_results with insert_into or create_table"); } - if (downloadFile.isPresent() && storeResult) { + if (downloadFile.isPresent() && storeResultsOption.isEnabled()) { throw new ConfigException("Can't use both download_file and store_last_results at once"); } - boolean readOnlyMode = downloadFile.isPresent() || storeResult; + boolean readOnlyMode = downloadFile.isPresent() || storeResultsOption.isEnabled(); boolean strictTransaction = strictTransaction(params); @@ -111,8 +112,8 @@ protected TaskResult run(Config params, Config state, C connectionConfig) if (downloadFile.isPresent()) { connection.executeReadOnlyQuery(query, (results) -> downloadResultsToFile(results, downloadFile.get())); } - else if (storeResult) { - connection.executeReadOnlyQuery(query, (results) -> storeResultInTaskResult(builder, results)); + else if (storeResultsOption.isEnabled()) { + connection.executeReadOnlyQuery(query, (results) -> storeResultsInTaskResult(results, storeResultsOption, builder)); } else { connection.executeReadOnlyQuery(query, (results) -> skipResults(results)); @@ -228,13 +229,35 @@ private void skipResults(JdbcResultSet results) ; } - private void storeResultInTaskResult(ImmutableTaskResult.Builder builder, JdbcResultSet jdbcResultSet) + private void storeResultsInTaskResult(JdbcResultSet jdbcResultSet, StoreLastResultsOption option, ImmutableTaskResult.Builder builder) { - List columnNames = jdbcResultSet.getColumnNames(); - if (columnNames.size() > maxStoreLastResultsColumns) { - throw new TaskExecutionException("The number of result columns exceeded the limit: " + columnNames.size() + " > " + maxStoreLastResultsColumns); + int columnsCount = jdbcResultSet.getColumnNames().size(); + if (columnsCount > maxStoreLastResultsColumns) { + throw new TaskExecutionException("The number of result columns exceeded the limit: " + columnsCount + " > " + maxStoreLastResultsColumns); + } + + Object lastResults; + switch (option) { + case ALL: + lastResults = collectAllResults(jdbcResultSet); + break; + case FIRST: + lastResults = collectFirstResults(jdbcResultSet); + break; + default: + throw new AssertionError("Unexpected StoreLastResultsOption: " + option); } - List> lastResult = new ArrayList<>(); + + Config storeParams = request.getConfig().getFactory().create(); + storeParams.getNestedOrSetEmpty(type()) + .set("last_results", lastResults); + builder.storeParams(storeParams); + } + + private List> collectAllResults(JdbcResultSet jdbcResultSet) + { + List columnNames = jdbcResultSet.getColumnNames(); + ImmutableList.Builder> lastResults = ImmutableList.builder(); long rows = 0; while (true) { @@ -248,25 +271,41 @@ private void storeResultInTaskResult(ImmutableTaskResult.Builder builder, JdbcRe throw new TaskExecutionException("The number of result rows exceeded the limit: " + rows + " > " + maxStoreLastResultsRows); } - HashMap map = new HashMap<>(); - for (int i = 0; i < columnNames.size(); i++) { - Object v = values.get(i); - if (v instanceof String) { - String s = (String) v; - if (s.length() > maxStoreLastResultsValueSize) { - throw new TaskExecutionException("The size of result value exceeded the limit: " + s.length() + " > " + maxStoreLastResultsValueSize); - } - } - map.put(columnNames.get(i), v); - } - lastResult.add(map); + lastResults.add(buildResultsMap(columnNames, values)); + } + + return lastResults.build(); + } + + private Map collectFirstResults(JdbcResultSet jdbcResultSet) + { + List values = jdbcResultSet.next(); + if (values == null) { + return new HashMap<>(); } - ConfigFactory cf = request.getConfig().getFactory(); - Config result = cf.create(); - Config taskState = result.getNestedOrSetEmpty(type()); - taskState.set("last_results", lastResult); + Map lastResults = buildResultsMap(jdbcResultSet.getColumnNames(), values); + + // consume all results + while (jdbcResultSet.skip()) + ; + + return lastResults; + } - builder.storeParams(result); + private Map buildResultsMap(List columnNames, List values) + { + HashMap map = new HashMap<>(); + for (int i = 0; i < columnNames.size(); i++) { + Object v = values.get(i); + if (v instanceof String) { + String s = (String) v; + if (s.length() > maxStoreLastResultsValueSize) { + throw new TaskExecutionException("The size of result value exceeded the limit: " + s.length() + " > " + maxStoreLastResultsValueSize); + } + } + map.put(columnNames.get(i), v); + } + return map; } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcResultSet.java b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcResultSet.java index f5f05671dc..da1219beca 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcResultSet.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcResultSet.java @@ -51,6 +51,17 @@ public List next() } } + @Override + public boolean skip() + { + try { + return resultSet.next(); + } + catch (SQLException ex) { + throw new DatabaseException("Failed to fetch next rows", ex); + } + } + private List getObjects() throws SQLException { List results = new ArrayList<>(columnNames.size()); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/JdbcResultSet.java b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/JdbcResultSet.java index 9bfcbcc89b..74b3f7fe78 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/JdbcResultSet.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/JdbcResultSet.java @@ -7,4 +7,6 @@ public interface JdbcResultSet List getColumnNames(); List next(); + + boolean skip(); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/StoreLastResultsOption.java b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/StoreLastResultsOption.java new file mode 100644 index 0000000000..221c2109fe --- /dev/null +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/StoreLastResultsOption.java @@ -0,0 +1,47 @@ +package io.digdag.standards.operator.jdbc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import io.digdag.client.config.ConfigException; +import static java.util.Locale.ENGLISH; + +public enum StoreLastResultsOption +{ + FALSE(false), + ALL(true), + FIRST(true); + + private final boolean enabled; + + private StoreLastResultsOption(boolean enabled) + { + this.enabled = enabled; + } + + public boolean isEnabled() + { + return enabled; + } + + @JsonCreator + public static StoreLastResultsOption parse(String text) + { + switch (text) { + case "false": + return FALSE; + case "true": + case "first": + return FIRST; + case "all": + return ALL; + default: + throw new ConfigException("last_results must be either of \"first\" or \"all\": " + text); + } + } + + @JsonValue + public String toString() + { + return name().toLowerCase(ENGLISH); + } +} diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java index f1c16245af..addb1b2c41 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdOperatorFactory.java @@ -138,6 +138,8 @@ private TdOperator(OperatorContext context) throw new ConfigException("result_settings is valid only if result_connection is set"); } + // TODO store_last_results should be io.digdag.standards.operator.jdbc.StoreLastResultsOption + // instead of boolean to be consistent with pg> and redshift> operators but not implemented yet. this.storeLastResults = params.get("store_last_results", boolean.class, false); this.preview = params.get("preview", boolean.class, false); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdRunOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdRunOperatorFactory.java index 02639d5801..37f09d5e97 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdRunOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdRunOperatorFactory.java @@ -67,6 +67,8 @@ private TdRunOperator(OperatorContext context) this.command = params.get("_command", JsonNode.class); this.sessionTime = params.get("session_time", Instant.class); this.downloadFile = params.getOptional("download_file", String.class); + // TODO store_last_results should be io.digdag.standards.operator.jdbc.StoreLastResultsOption + // instead of boolean to be consistent with pg> and redshift> operators but not implemented yet. this.storeLastResults = params.get("store_last_results", boolean.class, false); this.preview = params.get("preview", boolean.class, false); } diff --git a/digdag-standards/src/test/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperatorTest.java b/digdag-standards/src/test/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperatorTest.java index e2a8724b63..0d1b5e2226 100644 --- a/digdag-standards/src/test/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperatorTest.java +++ b/digdag-standards/src/test/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperatorTest.java @@ -219,7 +219,7 @@ public void selectAndDownload() } @Test - public void selectAndStoreResult() + public void selectAndStoreAllResults() throws IOException, NotReadOnlyException { String sql = "SELECT * FROM users"; @@ -227,7 +227,7 @@ public void selectAndStoreResult() "host", "foobar.com", "user", "testuser", "database", "testdb", - "store_last_results", true, + "store_last_results", "all", "query", sql ); @@ -247,8 +247,52 @@ public void selectAndStoreResult() assertThat(second.get("float").floatValue(), is(0.12f)); } + @Test + public void selectAndStoreFirstResults() + throws IOException, NotReadOnlyException + { + String sql = "SELECT * FROM users"; + Map configInput = ImmutableMap.of( + "host", "foobar.com", + "user", "testuser", + "database", "testdb", + "store_last_results", "first", + "query", sql + ); + + TaskResult taskResult = runTaskReadOnly(configInput, sql); + JsonNode first = taskResult.getStoreParams().getNestedOrGetEmpty("testop").get("last_results", JsonNode.class); + + assertThat(first.size(), is(3)); + assertThat(first.get("int").asInt(), is(42)); + assertThat(first.get("str").asText(), is("foo")); + assertThat(first.get("float").floatValue(), is(3.14f)); + } + + @Test + public void selectAndStoreFirstResultsConfiguredByBoolean() + throws IOException, NotReadOnlyException + { + String sql = "SELECT * FROM users"; + Map configInput = ImmutableMap.of( + "host", "foobar.com", + "user", "testuser", + "database", "testdb", + "store_last_results", true, // true == "first" + "query", sql + ); + + TaskResult taskResult = runTaskReadOnly(configInput, sql); + JsonNode first = taskResult.getStoreParams().getNestedOrGetEmpty("testop").get("last_results", JsonNode.class); + + assertThat(first.size(), is(3)); + assertThat(first.get("int").asInt(), is(42)); + assertThat(first.get("str").asText(), is("foo")); + assertThat(first.get("float").floatValue(), is(3.14f)); + } + @Test(expected = TaskExecutionException.class) - public void selectAndStoreResultWithExceedingMaxRows() + public void selectAndStoreAllResultsWithExceedingMaxRows() throws IOException, NotReadOnlyException { String sql = "SELECT * FROM users"; @@ -256,7 +300,7 @@ public void selectAndStoreResultWithExceedingMaxRows() "host", "foobar.com", "user", "testuser", "database", "testdb", - "store_last_results", true, + "store_last_results", "all", "query", sql ); Config systemConfig = new ConfigFactory(DigdagClient.objectMapper()).create(); @@ -265,7 +309,7 @@ public void selectAndStoreResultWithExceedingMaxRows() } @Test(expected = TaskExecutionException.class) - public void selectAndStoreResultWithExceedingMaxColumns() + public void selectAndStoreAllResultsWithExceedingMaxColumns() throws IOException, NotReadOnlyException { String sql = "SELECT * FROM users"; @@ -273,7 +317,7 @@ public void selectAndStoreResultWithExceedingMaxColumns() "host", "foobar.com", "user", "testuser", "database", "testdb", - "store_last_results", true, + "store_last_results", "all", "query", sql ); Config systemConfig = new ConfigFactory(DigdagClient.objectMapper()).create(); @@ -282,7 +326,7 @@ public void selectAndStoreResultWithExceedingMaxColumns() } @Test(expected = TaskExecutionException.class) - public void selectAndStoreResultWithExceedingMaxValueSize() + public void selectAndStoreFirstResultsWithExceedingMaxColumns() throws IOException, NotReadOnlyException { String sql = "SELECT * FROM users"; @@ -290,7 +334,24 @@ public void selectAndStoreResultWithExceedingMaxValueSize() "host", "foobar.com", "user", "testuser", "database", "testdb", - "store_last_results", true, + "store_last_results", "first", + "query", sql + ); + Config systemConfig = new ConfigFactory(DigdagClient.objectMapper()).create(); + systemConfig.set("config.testop.max_store_last_results_columns", 2); + runTaskReadOnly(Optional.of(systemConfig), configInput, sql); + } + + @Test(expected = TaskExecutionException.class) + public void selectAndStoreAllResultsWithExceedingMaxValueSize() + throws IOException, NotReadOnlyException + { + String sql = "SELECT * FROM users"; + Map configInput = ImmutableMap.of( + "host", "foobar.com", + "user", "testuser", + "database", "testdb", + "store_last_results", "all", "query", sql ); Config systemConfig = new ConfigFactory(DigdagClient.objectMapper()).create(); @@ -299,7 +360,24 @@ public void selectAndStoreResultWithExceedingMaxValueSize() } @Test(expected = ConfigException.class) - public void selectAndStoreResultWithConflictOption() + public void selectAndStoreLastResultsWithConflictOption() + throws IOException, NotReadOnlyException + { + String sql = "SELECT * FROM users"; + Map configInput = new ImmutableMap.Builder() + .put("host", "foobar.com") + .put("user", "testuser") + .put("database", "testdb") + .put("store_last_results", "all") + .put("download_file", "result.csv") + .put("query", sql) + .build(); + + runTaskReadOnly(configInput, sql); + } + + @Test + public void selectAndFalseStoreLastResultsWithoutConflicts() throws IOException, NotReadOnlyException { String sql = "SELECT * FROM users"; @@ -307,7 +385,7 @@ public void selectAndStoreResultWithConflictOption() .put("host", "foobar.com") .put("user", "testuser") .put("database", "testdb") - .put("store_last_results", true) + .put("store_last_results", false) .put("download_file", "result.csv") .put("query", sql) .build(); diff --git a/digdag-tests/src/test/java/acceptance/PgIT.java b/digdag-tests/src/test/java/acceptance/PgIT.java index 49779b022f..d096d243cf 100644 --- a/digdag-tests/src/test/java/acceptance/PgIT.java +++ b/digdag-tests/src/test/java/acceptance/PgIT.java @@ -229,6 +229,33 @@ public void selectAndStoreLastResults() } } + @Test + public void selectAndStoreLastResultsWithFirst() + throws Exception + { + copyResource("acceptance/pg/select_store_last_results_first.dig", root().resolve("pg.dig")); + copyResource("acceptance/pg/select_table.sql", root().resolve("select_table.sql")); + + setupSourceTable(); + + CommandStatus status = TestUtils.main( + "run", "-o", root().toString(), + "--project", root().toString(), + "-p", "pg_database=" + tempDatabase, + "-p", "outfile=out", + "pg.dig"); + assertCommandStatus(status); + + List lines = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(new File(root().toFile(), "out")))) { + String line; + while ((line = reader.readLine()) != null) { + lines.add(line.trim()); + } + assertThat(lines, is(Arrays.asList("foo"))); + } + } + @Test public void selectAndStoreLastResultsWithExceedingMaxRows() throws Exception diff --git a/digdag-tests/src/test/java/acceptance/td/RedshiftIT.java b/digdag-tests/src/test/java/acceptance/td/RedshiftIT.java index 9f4d1fbba0..645f05dc2f 100644 --- a/digdag-tests/src/test/java/acceptance/td/RedshiftIT.java +++ b/digdag-tests/src/test/java/acceptance/td/RedshiftIT.java @@ -237,6 +237,34 @@ public void selectAndStoreResult() } } + @Test + public void selectAndStoreResultsWithFirst() + throws Exception + { + copyResource("acceptance/redshift/select_store_last_results_first.dig", projectDir.resolve("redshift.dig")); + copyResource("acceptance/redshift/select_table.sql", projectDir.resolve("select_table.sql")); + + setupSourceTable(); + + CommandStatus status = TestUtils.main("run", "-o", projectDir.toString(), "--project", projectDir.toString(), + "-p", "redshift_database=" + database, + "-p", "redshift_host=" + redshiftHost, + "-p", "redshift_user=" + redshiftUser, + "-p", "outfile=out", + "-c", configFile.toString(), + "redshift.dig"); + assertCommandStatus(status); + + List lines = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(new File(projectDir.toFile(), "out")))) { + String line; + while ((line = reader.readLine()) != null) { + lines.add(line.trim()); + } + assertThat(lines, is(Arrays.asList("foo"))); + } + } + @Test public void createTable() throws Exception diff --git a/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results.dig b/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results.dig index 9fce580bb4..01b99310e7 100644 --- a/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results.dig +++ b/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results.dig @@ -5,7 +5,7 @@ timezone: UTC host: localhost database: ${pg_database} user: digdag_test - store_last_results: true + store_last_results: all +process: for_each>: diff --git a/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results_first.dig b/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results_first.dig new file mode 100644 index 0000000000..241f01cdf7 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/pg/select_store_last_results_first.dig @@ -0,0 +1,11 @@ +timezone: UTC + ++run: + pg>: select_table.sql + host: localhost + database: ${pg_database} + user: digdag_test + store_last_results: first + ++process: + sh>: echo ${pg.last_results.name} >> $outfile diff --git a/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results.dig b/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results.dig index f43696ea9f..9e57c769e5 100644 --- a/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results.dig +++ b/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results.dig @@ -5,7 +5,7 @@ timezone: UTC host: ${redshift_host} database: ${redshift_database} user: ${redshift_user} - store_last_results: true + store_last_results: all +process: for_each>: diff --git a/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results_first.dig b/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results_first.dig new file mode 100644 index 0000000000..fe27862c54 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/redshift/select_store_last_results_first.dig @@ -0,0 +1,11 @@ +timezone: UTC + ++run: + redshift>: select_table.sql + host: ${redshift_host} + database: ${redshift_database} + user: ${redshift_user} + store_last_results: first + ++process: + sh>: echo ${redshift.last_results.name} >> $outfile