From 8ad8d36f60295b5ddfe6bf33af2b16dfd1b6443d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 20 Jun 2024 17:22:14 +0200 Subject: [PATCH 01/16] feat: updates service accept header to application/json --- .../influxdb/query/InfluxQLQueryResult.java | 37 ++ .../query/InfluxQLQueryResultTest.java | 45 ++ .../client/service/InfluxQLQueryService.java | 2 +- .../client/internal/InfluxQLQueryApiImpl.java | 130 ++++- .../influxdb/client/ITInfluxQLQueryApi.java | 37 +- .../internal/InfluxQLQueryApiImplTest.java | 472 +++++++++++++++++- 6 files changed, 713 insertions(+), 10 deletions(-) create mode 100644 client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java diff --git a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java index 88c9012e205..4ad9608752f 100644 --- a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java +++ b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java @@ -21,6 +21,8 @@ */ package com.influxdb.query; +import java.time.Instant; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -211,6 +213,41 @@ public Object[] getValues() { } } + public static Object defaultExtractValue(@Nonnull final String columnName, + @Nonnull final String rawValue, + final int resultIndex, + @Nonnull final String seriesName) { + try { + return Long.parseLong(rawValue); + } catch (NumberFormatException le) { + try { + return Double.parseDouble(rawValue); + } catch (NumberFormatException de) { + try { + return Instant.parse(rawValue); + } catch (DateTimeParseException dte) { + return rawValue; + } + } + } + } + + @SuppressWarnings("MagicNumber") + public static Object legacyExtractValue(@Nonnull final String columnName, + @Nonnull final String rawValue, + final int resultIndex, + @Nonnull final String seriesName) { + + if (columnName.toLowerCase().equals("time")) { + try { + Instant instant = Instant.parse(rawValue); + return String.valueOf(instant.getEpochSecond() * 1_000_000_000L + instant.getNano()); + } catch (DateTimeParseException dtpe) { + return rawValue; + } + } + return rawValue; + } } } diff --git a/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java b/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java new file mode 100644 index 00000000000..77905b81286 --- /dev/null +++ b/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java @@ -0,0 +1,45 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.query; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class InfluxQLQueryResultTest { + + @Test + public void readLegacyTimestamps(){ + Map times = Map.of( + "2024-06-20T11:41:17.690437345Z", "1718883677690437345", // nano + "2024-06-20T11:41:17.690437Z", "1718883677690437000", // micro + "2024-06-20T11:41:17.690Z", "1718883677690000000", // milli + "2024-06-20T11:41:17Z", "1718883677000000000" // second + ); + for(String stamp : times.keySet()){ + Object result = InfluxQLQueryResult.Series.legacyExtractValue("time", stamp, 0, "test"); + Assertions.assertThat(times.get(stamp)).isEqualTo(result); + } + } + +} diff --git a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java index 73328f01af2..76d0cda6c5f 100644 --- a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java +++ b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java @@ -15,7 +15,7 @@ public interface InfluxQLQueryService { * @param zapTraceSpan OpenTracing span context (optional) * @return response in csv format */ - @Headers({"Accept:application/csv", "Content-Type:application/x-www-form-urlencoded"}) + @Headers({"Accept:application/json", "Content-Type:application/x-www-form-urlencoded"}) @FormUrlEncoded @POST("query") Call query( diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index 255ae17a1e9..3902ee07622 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import javax.annotation.Nonnull; @@ -44,6 +46,16 @@ import com.influxdb.query.InfluxQLQueryResult; import com.influxdb.utils.Arguments; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSyntaxException; import okhttp3.ResponseBody; import okio.BufferedSource; import org.apache.commons.csv.CSVFormat; @@ -102,11 +114,12 @@ private InfluxQLQueryResult parseResponse( Arguments.checkNotNull(bufferedSource, "bufferedSource"); try (Reader reader = new InputStreamReader(bufferedSource.inputStream(), StandardCharsets.UTF_8)) { - return readInfluxQLResult(reader, cancellable, valueExtractor); +// return readInfluxQLCSVResult(reader, cancellable, valueExtractor); + return readInfluxQLJsonResult(reader, cancellable, valueExtractor); } } - static InfluxQLQueryResult readInfluxQLResult( + static InfluxQLQueryResult readInfluxQLCSVResult( @Nonnull final Reader reader, @Nonnull final Cancellable cancellable, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor @@ -189,4 +202,117 @@ private static Map parseTags(@Nonnull final String value) { return tags; } + + static InfluxQLQueryResult readInfluxQLJsonResult( + @Nonnull final Reader reader, + @Nonnull final Cancellable cancellable, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ) { + + Gson gson = new GsonBuilder() + .registerTypeAdapter(InfluxQLQueryResult.class, new ResultsDeserializer(cancellable)) + .registerTypeAdapter(InfluxQLQueryResult.Result.class, new ResultDeserializer(valueExtractor)) + .create(); + + try { + return gson.fromJson(reader, InfluxQLQueryResult.class); + } catch (JsonSyntaxException | JsonIOException jse) { + ERROR_CONSUMER.accept(jse); + return null; + } + } + + public static class ResultsDeserializer implements JsonDeserializer { + + Cancellable cancellable; + + public ResultsDeserializer(final Cancellable cancellable) { + this.cancellable = cancellable; + } + + @Override + public InfluxQLQueryResult deserialize( + final JsonElement elem, + final Type type, + final JsonDeserializationContext ctx) throws JsonParseException { + List results = new ArrayList<>(); + JsonArray jsonArray = elem.getAsJsonObject().get("results").getAsJsonArray(); + for (JsonElement jsonElement : jsonArray) { + if (cancellable.isCancelled()) { + break; + } + results.add(ctx.deserialize(jsonElement, InfluxQLQueryResult.Result.class)); + } + return new InfluxQLQueryResult(results); + } + } + + public static class ResultDeserializer implements JsonDeserializer { + + InfluxQLQueryResult.Series.ValueExtractor extractor; + + public ResultDeserializer(final InfluxQLQueryResult.Series.ValueExtractor extractor) { + this.extractor = extractor; + } + + @Override + public InfluxQLQueryResult.Result deserialize( + final JsonElement elem, + final Type type, + final JsonDeserializationContext ctx) throws JsonParseException { + JsonObject eobj = elem.getAsJsonObject(); + int id = eobj.get("statement_id").getAsInt(); + List series = new ArrayList<>(); + JsonArray seriesArray = eobj.getAsJsonArray("series"); + for (JsonElement jserie : seriesArray) { + JsonObject sobj = jserie.getAsJsonObject(); + String name = sobj.getAsJsonObject().get("name").getAsString(); + Map columns = new LinkedHashMap<>(); + Map tags = null; + // Handle columns + JsonArray jac = sobj.get("columns").getAsJsonArray(); + final AtomicInteger count = new AtomicInteger(0); + jac.forEach(e -> { + columns.put(e.getAsString(), count.getAndIncrement()); + }); + + InfluxQLQueryResult.Series serie = null; + // Handle tags - if they exist + if (sobj.get("tags") != null) { + JsonObject tagsObj = sobj.get("tags").getAsJsonObject(); + tags = new LinkedHashMap<>(); + for (String key : tagsObj.keySet()) { + tags.put(key, tagsObj.get(key).getAsString()); + } + serie = new InfluxQLQueryResult.Series(name, tags, columns); + } else { + serie = new InfluxQLQueryResult.Series(name, columns); + } + JsonArray jvals = sobj.get("values").getAsJsonArray(); + for (JsonElement jval : jvals) { + List values = new ArrayList<>(); + JsonArray jae = jval.getAsJsonArray(); + int index = 0; + for (JsonElement je : jae) { + List columnKeys = new ArrayList<>(serie.getColumns().keySet()); + if (extractor != null) { + String stringVal = je.getAsString(); + Object ov = extractor.extractValue( + columnKeys.get(index), + stringVal, + id, + serie.getName()); + values.add(ov); + } else { + values.add(je.getAsString()); + } + index++; + } + serie.addRecord(serie.new Record(values.toArray())); + } + series.add(serie); + } + return new InfluxQLQueryResult.Result(id, series); + } + } } diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 45dcda1a58f..063a077b1c6 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -89,7 +89,8 @@ void testQueryData() { .hasSize(1) .first() .satisfies(record -> { - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); +// Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("first")).isEqualTo("10"); }); } @@ -126,13 +127,45 @@ void testSelectAll() { .hasSize(1) .first() .satisfies(record -> { - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); }); } + @Test + public void testSelectGroupBy(){ + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SELECT * FROM \"influxql\" GROUP By \"region\",\"host\"", DATABASE_NAME) + ); + + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + Assertions.assertThat(record.getValueByKey("region")).isNull(); + Assertions.assertThat(record.getValueByKey("host")).isNull(); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); + }); + + Assertions.assertThat(result) + .extracting(InfluxQLQueryResult::getResults, list(InfluxQLQueryResult.Result.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Result::getSeries, list(InfluxQLQueryResult.Series.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Series::getTags) + .satisfies(tagz -> { + Assertions.assertThat(tagz).isNotNull(); + Assertions.assertThat(tagz.get("host")).isEqualTo("A"); + Assertions.assertThat(tagz.get("region")).isEqualTo("west"); + }); + } + @Test void testInfluxDB18() { // create database diff --git a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java index fca1109ad86..464dd6bb0bf 100644 --- a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java +++ b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java @@ -29,10 +29,9 @@ import com.influxdb.Cancellable; import com.influxdb.query.InfluxQLQueryResult; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; import org.assertj.core.api.Assertions; +import org.junit.Ignore; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; class InfluxQLQueryApiImplTest { @@ -77,7 +76,7 @@ void readInfluxQLResult() throws IOException { "cpu,\"region=us-east-1,host=server2\",1483225200,67.91,1.3\n" ); - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, extractValues); + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(reader, NO_CANCELLING, extractValues); List results = result.getResults(); Assertions.assertThat(results).hasSize(4); @@ -187,7 +186,7 @@ public void readInfluxQLShowSeriesRequest() throws IOException { ",,\"temperature,locale=nw002,device=rpi5_88e1\"" ); - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(reader, NO_CANCELLING, (columnName, rawValue, resultIndex, seriesName) -> { return rawValue;}); Assertions.assertThat(result.getResults().get(0)) @@ -213,6 +212,469 @@ public void readInfluxQLShowSeriesRequest() throws IOException { }); }); }); + } + + StringReader sampleReader = new StringReader("{\n" + + " \"results\":\n" + + "[\n" + + " {\n" + + " \"statement_id\": 0,\n" + + " \"series\": \n" + + " [ \n" + + " {\n" + + " \"name\": \"data1\",\n" + + " \"columns\": [\"time\",\"first\"],\n" + + " \"values\": [\n" + + " [1483225200, 1]\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"data2\",\n" + + " \"columns\": [\"time\",\"first\"],\n" + + " \"values\": [\n" + + " [1483225200, 2]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 1,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"data\",\n" + + " \"columns\": [\"time\",\"first\",\"text\"],\n" + + " \"values\": [\n" + + " [1500000000, 42, \"foo\"]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 2,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"databases\",\n" + + " \"columns\" : [\"name\"],\n" + + " \"values\" : [\n" + + " [\"measurement-1\"],\n" + + " [\"measurement-2\"]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 3,\n" + + " \"series\": \n" + + " [ \n" + + " {\n" + + " \"name\": \"cpu\",\n" + + " \"tags\": {\"region\": \"us-east-1\", \"host\": \"server1\" },\n" + + " \"columns\": [\"time\", \"usage_user\", \"usage_system\"],\n" + + " \"values\" : [\n" + + " [1483225200,13.57,1.4],\n" + + " [1483225201,14.06,1.7]\n" + + " ] \n" + + " },\n" + + " {\n" + + " \"name\": \"cpu\",\n" + + " \"tags\": {\"region\": \"us-east-1\", \"host\": \"server2\" },\n" + + " \"columns\": [\"time\", \"usage_user\", \"usage_system\"],\n" + + " \"values\" : [\n" + + " [1483225200,67.91,1.3]\n" + + " ] \n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 4,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"login\",\n" + + " \"tags\": {\"region\": \"eu-west-3\", \"host\": \"portal-17\"},\n" + + " \"columns\": [\"time\", \"user_id\", \"success\", \"stay\"],\n" + + " \"values\" : [\n" + + " [ \"2024-06-18T11:29:48.454Z\", 958772110, true, 1.27],\n" + + " [ \"2024-06-18T11:29:47.124Z\", 452223904, false, 0.0],\n" + + " [ \"2024-06-18T11:29:45.007Z\", 147178901, true, 15.5],\n" + + " [ \"2024-06-18T11:29:41.881Z\", 71119178, true, 78.4]\n" + + " ]\n" + + " }\n" + + " ] \n" + + " } \n" + + "]\n" + + "}"); + + // All values as Strings - universal default + @Test + public void readInfluxQLJSONResult(){ + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("2"); + }); + }); + Assertions.assertThat(results.get(1)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first", "text"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1500000000"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("42"); + Assertions.assertThat(record.getValueByKey("text")).isEqualTo("foo"); + }); + }); + Assertions.assertThat(results.get(2)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("databases"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("name"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("name")) + .isEqualTo("measurement-1"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("name")) + .isEqualTo("measurement-2"); + }); + }); + Assertions.assertThat(results.get(3)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("cpu"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("server1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo("13.57"); + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo("1.4"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_user")) + .isEqualTo("14.06"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_system")) + .isEqualTo("1.7"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("cpu"); + Assertions.assertThat(series2.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series2.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series2.getTags().get("host")).isEqualTo("server2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series2.getValues()).hasSize(1); + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo("67.91"); + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo("1.3"); + }); + }); + Assertions.assertThat(results.get(4)) + .satisfies(r -> { + Assertions.assertThat(r.getIndex()).isEqualTo(4); + }) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("login"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); + Assertions.assertThat(series1.getValues()).hasSize(4); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo("2024-06-18T11:29:48.454Z"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo("958772110"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo("1.27"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo("2024-06-18T11:29:47.124Z"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo("452223904"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo("0.0"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo("2024-06-18T11:29:41.881Z"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo("71119178"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo("78.4"); + }); + }); + } + + // Default + @Test + public void readInfluxQLJSONDefaultExtractValue(){ + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, + NO_CANCELLING, + InfluxQLQueryResult.Series::defaultExtractValue); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1483225200L); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(1L); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1483225200L); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(2L); + }); + }); + Assertions.assertThat(results.get(1)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first", "text"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1500000000L); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(42L); + Assertions.assertThat(record.getValueByKey("text")).isEqualTo("foo"); + }); + }); + Assertions.assertThat(results.get(2)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("databases"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("name"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("name")) + .isEqualTo("measurement-1"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("name")) + .isEqualTo("measurement-2"); + }); + }); + Assertions.assertThat(results.get(3)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("cpu"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("server1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo(13.57); + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo(1.4); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_user")) + .isEqualTo(14.06); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_system")) + .isEqualTo(1.7); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("cpu"); + Assertions.assertThat(series2.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series2.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series2.getTags().get("host")).isEqualTo("server2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series2.getValues()).hasSize(1); + + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo(67.91); + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo(1.3); + }); + }); + Assertions.assertThat(results.get(4)) + .satisfies(r -> { + Assertions.assertThat(r.getIndex()).isEqualTo(4); + }) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("login"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); + Assertions.assertThat(series1.getValues()).hasSize(4); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:48.454Z")); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo(958772110L); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo(1.27); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:47.124Z")); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo(452223904L); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo(0.0); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:41.881Z")); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo(71119178L); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo(78.4); + }); + }); + } + + // Legacy timestamps + @Test + public void readInfluxQLJSONResultWithLegacyExtractValue(){ + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, + NO_CANCELLING, + InfluxQLQueryResult.Series::legacyExtractValue); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("2"); + }); + }); + Assertions.assertThat(results.get(4)) + .satisfies(r -> { + Assertions.assertThat(r.getIndex()).isEqualTo(4); + }) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("login"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); + Assertions.assertThat(series1.getValues()).hasSize(4); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo("1718710188454000000"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo("958772110"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo("1.27"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo("1718710187124000000"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo("452223904"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo("0.0"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo("1718710181881000000"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo("71119178"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo("78.4"); + }); + }); + } + + // Custom + @Test + public void readInfluxQLJSONResultCustomExtractValue(){ + InfluxQLQueryResult.Series.ValueExtractor extractValues = (columnName, rawValue, resultIndex, seriesName) -> { + if (resultIndex == 0 && seriesName.equals("data2")){ + switch (columnName){ + case "time": + return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": + return Double.valueOf(rawValue); + } + } + if(seriesName.equals("login")){ + if (columnName.equals("success")) { + return Boolean.parseBoolean(rawValue); + } + } + return rawValue; + }; + + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, + NO_CANCELLING, + extractValues + ); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(Instant.ofEpochSecond(1483225200L)); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(2.0); + }); + }); } } From f84fb5cdc39a8c6f77e37ff99bfcdb1e9996fbb2 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 28 Jun 2024 16:36:28 +0200 Subject: [PATCH 02/16] chore: remove built-in value extractor methods --- .../influxdb/query/InfluxQLQueryResult.java | 38 ---------------- .../query/InfluxQLQueryResultTest.java | 45 ------------------- 2 files changed, 83 deletions(-) delete mode 100644 client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java diff --git a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java index 4ad9608752f..4f2fe8f7a1b 100644 --- a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java +++ b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java @@ -21,8 +21,6 @@ */ package com.influxdb.query; -import java.time.Instant; -import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -212,42 +210,6 @@ public Object[] getValues() { return values; } } - - public static Object defaultExtractValue(@Nonnull final String columnName, - @Nonnull final String rawValue, - final int resultIndex, - @Nonnull final String seriesName) { - try { - return Long.parseLong(rawValue); - } catch (NumberFormatException le) { - try { - return Double.parseDouble(rawValue); - } catch (NumberFormatException de) { - try { - return Instant.parse(rawValue); - } catch (DateTimeParseException dte) { - return rawValue; - } - } - } - } - - @SuppressWarnings("MagicNumber") - public static Object legacyExtractValue(@Nonnull final String columnName, - @Nonnull final String rawValue, - final int resultIndex, - @Nonnull final String seriesName) { - - if (columnName.toLowerCase().equals("time")) { - try { - Instant instant = Instant.parse(rawValue); - return String.valueOf(instant.getEpochSecond() * 1_000_000_000L + instant.getNano()); - } catch (DateTimeParseException dtpe) { - return rawValue; - } - } - return rawValue; - } } } diff --git a/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java b/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java deleted file mode 100644 index 77905b81286..00000000000 --- a/client-core/src/test/java/com/influxdb/query/InfluxQLQueryResultTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * The MIT License - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package com.influxdb.query; - -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.Map; - -public class InfluxQLQueryResultTest { - - @Test - public void readLegacyTimestamps(){ - Map times = Map.of( - "2024-06-20T11:41:17.690437345Z", "1718883677690437345", // nano - "2024-06-20T11:41:17.690437Z", "1718883677690437000", // micro - "2024-06-20T11:41:17.690Z", "1718883677690000000", // milli - "2024-06-20T11:41:17Z", "1718883677000000000" // second - ); - for(String stamp : times.keySet()){ - Object result = InfluxQLQueryResult.Series.legacyExtractValue("time", stamp, 0, "test"); - Assertions.assertThat(times.get(stamp)).isEqualTo(result); - } - } - -} From 872212dfecbb5b5e8300e85d177618ae5bde1929 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 28 Jun 2024 17:04:31 +0200 Subject: [PATCH 03/16] feat: makes Accept header in InfluxQLQueryService dynamic. --- .../client/service/InfluxQLQueryService.java | 5 +- .../com/influxdb/client/InfluxQLQueryApi.java | 13 ++ .../influxdb/client/domain/InfluxQLQuery.java | 40 ++++ .../client/internal/InfluxQLQueryApiImpl.java | 39 +++- .../influxdb/client/ITInfluxQLQueryApi.java | 178 ++++++++++++++++ .../influxdb/client/InfluxDBClientTest.java | 26 +++ .../client/domain/InfluxQLQueryTest.java | 21 ++ .../internal/InfluxQLQueryApiImplTest.java | 192 ------------------ 8 files changed, 315 insertions(+), 199 deletions(-) create mode 100644 client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java diff --git a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java index 76d0cda6c5f..6563d022830 100644 --- a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java +++ b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java @@ -15,7 +15,7 @@ public interface InfluxQLQueryService { * @param zapTraceSpan OpenTracing span context (optional) * @return response in csv format */ - @Headers({"Accept:application/json", "Content-Type:application/x-www-form-urlencoded"}) + @Headers({"Content-Type:application/x-www-form-urlencoded"}) @FormUrlEncoded @POST("query") Call query( @@ -23,6 +23,7 @@ Call query( @Nonnull @Query("db") String db, @Query("rp") String retentionPolicy, @Query("epoch") String epoch, - @Header("Zap-Trace-Span") String zapTraceSpan + @Header("Zap-Trace-Span") String zapTraceSpan, + @Header("Accept") String accept ); } diff --git a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java index c3624065d56..fc4b349b133 100644 --- a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java +++ b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java @@ -92,4 +92,17 @@ InfluxQLQueryResult query( @Nonnull InfluxQLQuery influxQlQuery, @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor ); + + @Nonnull + InfluxQLQueryResult query( + @Nonnull InfluxQLQuery influxQlQuery, + @Nullable InfluxQLQuery.AcceptHeader header, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ); + + @Nonnull + InfluxQLQueryResult query( + @Nonnull InfluxQLQuery influxQLQuery, + @Nullable InfluxQLQuery.AcceptHeader header + ); } diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index 80d8673606c..d9cf5a3e77a 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -26,14 +26,18 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.influxdb.client.JSON; + /** * A InfluxQL query. */ public class InfluxQLQuery { + private final String command; private final String database; private String retentionPolicy; private InfluxQLPrecision precision; + private AcceptHeader acceptHeader; /** * @param command the InfluxQL command to execute @@ -42,6 +46,15 @@ public class InfluxQLQuery { public InfluxQLQuery(@Nonnull final String command, @Nonnull final String database) { this.command = command; this.database = database; + this.acceptHeader = AcceptHeader.JSON; + } + + public InfluxQLQuery(@Nonnull final String command, + @Nonnull final String database, + @Nonnull final AcceptHeader acceptHeader) { + this.command = command; + this.database = database; + this.acceptHeader = acceptHeader; } /** @@ -97,6 +110,18 @@ public InfluxQLQuery setPrecision(@Nullable final InfluxQLPrecision precision) { return this; } + public AcceptHeader getAcceptHeader() { + return acceptHeader; + } + + public void setAcceptHeader(final AcceptHeader acceptHeader) { + this.acceptHeader = acceptHeader; + } + + public String getAcceptHeaderVal() { + return acceptHeader != null ? acceptHeader.getVal() : AcceptHeader.JSON.getVal(); + } + /** * The precision used for the timestamps returned by InfluxQL queries. */ @@ -143,4 +168,19 @@ public static InfluxQLPrecision toTimePrecision(final TimeUnit t) { } } } + + public enum AcceptHeader { + JSON("application/json"), + CSV("application/csv"); + + private final String val; + + AcceptHeader(final String val) { + this.val = val; + } + + public String getVal() { + return val; + } + } } diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index 3902ee07622..f10864b5623 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -76,14 +76,32 @@ public InfluxQLQueryApiImpl(@Nonnull final InfluxQLQueryService service) { @Nonnull @Override - public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQlQuery) { - return query(influxQlQuery, null); + public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, influxQLQuery.getAcceptHeader(), null); + } + + @Nonnull + @Override + public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, influxQLQuery.getAcceptHeader(), valueExtractor); } + @Nonnull + @Override + public InfluxQLQueryResult query( + @Nonnull final InfluxQLQuery influxQLQuery, + @Nonnull final InfluxQLQuery.AcceptHeader accept) { + return query(influxQLQuery, accept, null); + } + + + @Nonnull @Override public InfluxQLQueryResult query( @Nonnull final InfluxQLQuery influxQlQuery, + @Nullable final InfluxQLQuery.AcceptHeader accept, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor ) { Call call = service.query( @@ -91,12 +109,18 @@ public InfluxQLQueryResult query( influxQlQuery.getDatabase(), influxQlQuery.getRetentionPolicy(), influxQlQuery.getPrecision() != null ? influxQlQuery.getPrecision().getSymbol() : null, - null); + null, + accept != null ? accept.getVal() : InfluxQLQuery.AcceptHeader.JSON.getVal()); + + System.out.println("DEBUG call header: " + call.request().header("Accept")); AtomicReference atomicReference = new AtomicReference<>(); BiConsumer consumer = (cancellable, bufferedSource) -> { try { - InfluxQLQueryResult result = parseResponse(bufferedSource, cancellable, valueExtractor); + InfluxQLQueryResult result = parseResponse(bufferedSource, + cancellable, + accept, + valueExtractor); atomicReference.set(result); } catch (IOException e) { ERROR_CONSUMER.accept(e); @@ -109,12 +133,17 @@ public InfluxQLQueryResult query( private InfluxQLQueryResult parseResponse( @Nonnull final BufferedSource bufferedSource, @Nonnull final Cancellable cancellable, + @Nonnull final InfluxQLQuery.AcceptHeader accept, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) throws IOException { Arguments.checkNotNull(bufferedSource, "bufferedSource"); try (Reader reader = new InputStreamReader(bufferedSource.inputStream(), StandardCharsets.UTF_8)) { -// return readInfluxQLCSVResult(reader, cancellable, valueExtractor); + if (accept == InfluxQLQuery.AcceptHeader.CSV) { + System.out.println("DEBUG have CSV"); + return readInfluxQLCSVResult(reader, cancellable, valueExtractor); + } + System.out.println("DEBUG have JSON"); return readInfluxQLJsonResult(reader, cancellable, valueExtractor); } } diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 063a077b1c6..1d592e8842c 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -24,6 +24,10 @@ import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import com.influxdb.client.domain.Bucket; import com.influxdb.client.domain.DBRPCreate; @@ -33,9 +37,14 @@ import com.influxdb.client.write.Point; import com.influxdb.query.InfluxQLQueryResult; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import static org.assertj.core.api.InstanceOfAssertFactories.BIG_DECIMAL; @@ -81,6 +90,15 @@ void testShowDatabases() { .contains(DATABASE_NAME); } + @Test + void testShowDatabasesCSV() { + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SHOW DATABASES", DATABASE_NAME, InfluxQLQuery.AcceptHeader.CSV)); + assertSingleSeriesRecords(result) + .map(record -> record.getValueByKey("name")) + // internal buckets are also available by DBRP mapping + .contains(DATABASE_NAME); + } @Test void testQueryData() { @@ -199,4 +217,164 @@ private ListAssert assertSingleSeriesRecords( .first() .extracting(InfluxQLQueryResult.Series::getValues, list(InfluxQLQueryResult.Series.Record.class)); } + + @Nested + class ServiceHeaderTest { + + protected MockWebServer mockServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + mockServer.enqueue(new MockResponse().setBody("")); + mockServer.start(); + } + + @AfterEach + void tearDown() throws IOException { + mockServer.shutdown(); + } + + @Test + public void serviceHeaderCSV() throws InterruptedException { + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.CSV)); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + + @Test + public void serviceHeaderJSON() throws InterruptedException { + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", + InfluxQLQuery.AcceptHeader.JSON)); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderDefault() throws InterruptedException { + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderInCallCSV() throws InterruptedException { + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query( + new InfluxQLQuery("SELECT * FROM cpu", "test_db"), + InfluxQLQuery.AcceptHeader.CSV); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + @Test + public void serviceHeaderInCallJSON() throws InterruptedException { + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query( + new InfluxQLQuery("SELECT * FROM cpu", "test_db"), + InfluxQLQuery.AcceptHeader.JSON); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + } + + @Test + public void testQueryJsonPrecision(){ + Bucket bucket = influxDBClient.getBucketsApi().findBucketByName("my-bucket"); + int idx = 0; + Map precisionValues = new HashMap<>(); + for(WritePrecision precision : WritePrecision.values()){ + Instant time = Instant.now().minusSeconds(10 * (1 + idx++)); + long nanoTimestamp = (time.getEpochSecond() * 1_000_000_000L) + time.getNano(); + + long timestamp = 0; + switch(precision){ + case S: + timestamp = nanoTimestamp/1_000_000_000L; + precisionValues.put(precision.getValue(), Instant.ofEpochSecond(timestamp)); + break; + case MS: + timestamp = nanoTimestamp/1_000_000L; + precisionValues.put(precision.getValue(), Instant.ofEpochMilli(timestamp)); + break; + case US: + timestamp = nanoTimestamp/1_000L; + precisionValues.put(precision.getValue(), + Instant.ofEpochSecond(timestamp/1_000_000L, (timestamp%1_000_000L) * 1000)); + break; + case NS: + timestamp = nanoTimestamp; + precisionValues.put(precision.getValue(), + Instant.ofEpochSecond(timestamp/1_000_000_000L, timestamp%1_000_000_000L)); + break; + } + influxDBClient.getWriteApiBlocking() + .writePoint(bucket.getId(), bucket.getOrgID(), new Point("precise") + .time(timestamp, precision) + .addField("cpu_usage", 10.42) + .addTag("domain", precision.toString())); + } + assert bucket != null; + InfluxQLQueryResult result = influxDBClient.getInfluxQLQueryApi() + .query(new InfluxQLQuery( + "SELECT * FROM precise WHERE time > now() - 1m", + bucket.getName()), InfluxQLQuery.AcceptHeader.JSON); + + for(InfluxQLQueryResult.Result r: result.getResults()){ + InfluxQLQueryResult.Series s = r.getSeries().get(0); + for(InfluxQLQueryResult.Series.Record record: s.getValues()){ + System.out.println("DEBUG record: " + Arrays.toString(record.getValues())); + String domain = Objects.requireNonNull(record.getValueByKey("domain")).toString(); + System.out.println("DEBUG time " + domain + " " + record.getValueByKey("time")); + System.out.println("DEBUG precision value " + precisionValues.get(domain)); + Assertions.assertThat(precisionValues.get(domain)) + .isEqualTo(Instant.parse( + Objects.requireNonNull(record.getValueByKey("time") + ).toString())); + } + } + } } diff --git a/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java b/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java index af876ccf752..c3a53bb238c 100644 --- a/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java +++ b/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java @@ -33,6 +33,8 @@ import java.util.logging.Logger; import javax.annotation.Nonnull; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.service.InfluxQLQueryService; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpHandler; @@ -48,12 +50,14 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import okhttp3.ResponseBody; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import retrofit2.Call; /** * @author Jakub Bednar (bednar@github) (05/09/2018 14:00) @@ -122,6 +126,28 @@ public void createNotificationRulesApi() { Assertions.assertThat(influxDBClient.getNotificationRulesApi()).isNotNull(); } + @Test + public void serviceHeaderDefault() { + InfluxQLQueryService service = influxDBClient.getService(InfluxQLQueryService.class); + Call call = service.query("SELECT * FROM cpu", "test_db", + null, + null, + null, + InfluxQLQuery.AcceptHeader.JSON.getVal()); + Assertions.assertThat(call.request().header("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderChange() { + InfluxQLQueryService service = influxDBClient.getService(InfluxQLQueryService.class); + Call call = service.query("SELECT * FROM cpu", "test_db", + null, + null, + null, + InfluxQLQuery.AcceptHeader.CSV.getVal()); + Assertions.assertThat(call.request().header("accept")).isEqualTo("application/csv"); + } + @Test void logLevel() { diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java new file mode 100644 index 00000000000..df65dd713bb --- /dev/null +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -0,0 +1,21 @@ +package com.influxdb.client.domain; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class InfluxQLQueryTest { + + @Test + public void headerSelectDefault(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/json"); + } + + @Test + public void headerSelect(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", + "test_db", + InfluxQLQuery.AcceptHeader.CSV); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + } +} diff --git a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java index 464dd6bb0bf..4ef94077833 100644 --- a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java +++ b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java @@ -435,198 +435,6 @@ public void readInfluxQLJSONResult(){ }); } - // Default - @Test - public void readInfluxQLJSONDefaultExtractValue(){ - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, - NO_CANCELLING, - InfluxQLQueryResult.Series::defaultExtractValue); - List results = result.getResults(); - Assertions.assertThat(results).hasSize(5); - Assertions.assertThat(results.get(0)) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(2); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("data1"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); - Assertions.assertThat(series1.getValues()).hasSize(1); - InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1483225200L); - Assertions.assertThat(record.getValueByKey("first")).isEqualTo(1L); - }); - Assertions.assertThat(series.get(1)) - .satisfies(series2 -> { - Assertions.assertThat(series2.getName()).isEqualTo("data2"); - Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); - Assertions.assertThat(series2.getValues()).hasSize(1); - InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1483225200L); - Assertions.assertThat(record.getValueByKey("first")).isEqualTo(2L); - }); - }); - Assertions.assertThat(results.get(1)) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(1); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("data"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first", "text"); - Assertions.assertThat(series1.getValues()).hasSize(1); - InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo(1500000000L); - Assertions.assertThat(record.getValueByKey("first")).isEqualTo(42L); - Assertions.assertThat(record.getValueByKey("text")).isEqualTo("foo"); - }); - }); - Assertions.assertThat(results.get(2)) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(1); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("databases"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("name"); - Assertions.assertThat(series1.getValues()).hasSize(2); - - Assertions.assertThat( series1.getValues().get(0).getValueByKey("name")) - .isEqualTo("measurement-1"); - Assertions.assertThat( series1.getValues().get(1).getValueByKey("name")) - .isEqualTo("measurement-2"); - }); - }); - Assertions.assertThat(results.get(3)) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(2); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("cpu"); - Assertions.assertThat(series1.getTags()).containsOnlyKeys("region", "host"); - Assertions.assertThat(series1.getTags().get("region")).isEqualTo("us-east-1"); - Assertions.assertThat(series1.getTags().get("host")).isEqualTo("server1"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); - Assertions.assertThat(series1.getValues()).hasSize(2); - - Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_user")) - .isEqualTo(13.57); - Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_system")) - .isEqualTo(1.4); - Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_user")) - .isEqualTo(14.06); - Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_system")) - .isEqualTo(1.7); - }); - Assertions.assertThat(series.get(1)) - .satisfies(series2 -> { - Assertions.assertThat(series2.getName()).isEqualTo("cpu"); - Assertions.assertThat(series2.getTags()).containsOnlyKeys("region", "host"); - Assertions.assertThat(series2.getTags().get("region")).isEqualTo("us-east-1"); - Assertions.assertThat(series2.getTags().get("host")).isEqualTo("server2"); - Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); - Assertions.assertThat(series2.getValues()).hasSize(1); - - Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_user")) - .isEqualTo(67.91); - Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_system")) - .isEqualTo(1.3); - }); - }); - Assertions.assertThat(results.get(4)) - .satisfies(r -> { - Assertions.assertThat(r.getIndex()).isEqualTo(4); - }) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(1); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("login"); - Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); - Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); - Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); - Assertions.assertThat(series1.getValues()).hasSize(4); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:48.454Z")); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo(958772110L); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo(1.27); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:47.124Z")); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo(452223904L); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo(0.0); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo(Instant.parse("2024-06-18T11:29:41.881Z")); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo(71119178L); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo(78.4); - }); - }); - } - - // Legacy timestamps - @Test - public void readInfluxQLJSONResultWithLegacyExtractValue(){ - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, - NO_CANCELLING, - InfluxQLQueryResult.Series::legacyExtractValue); - List results = result.getResults(); - Assertions.assertThat(results).hasSize(5); - Assertions.assertThat(results.get(0)) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(2); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("data1"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); - Assertions.assertThat(series1.getValues()).hasSize(1); - InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); - Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); - }); - Assertions.assertThat(series.get(1)) - .satisfies(series2 -> { - Assertions.assertThat(series2.getName()).isEqualTo("data2"); - Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); - Assertions.assertThat(series2.getValues()).hasSize(1); - InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); - Assertions.assertThat(record.getValueByKey("first")).isEqualTo("2"); - }); - }); - Assertions.assertThat(results.get(4)) - .satisfies(r -> { - Assertions.assertThat(r.getIndex()).isEqualTo(4); - }) - .extracting(InfluxQLQueryResult.Result::getSeries) - .satisfies(series -> { - Assertions.assertThat(series).hasSize(1); - Assertions.assertThat(series.get(0)) - .satisfies(series1 -> { - Assertions.assertThat(series1.getName()).isEqualTo("login"); - Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); - Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); - Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); - Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); - Assertions.assertThat(series1.getValues()).hasSize(4); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo("1718710188454000000"); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo("958772110"); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); - Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo("1.27"); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo("1718710187124000000"); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo("452223904"); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); - Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo("0.0"); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo("1718710181881000000"); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo("71119178"); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); - Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo("78.4"); - }); - }); - } - // Custom @Test public void readInfluxQLJSONResultCustomExtractValue(){ From 6997cb7149df62acf19773aaf4045f3f7a69e83a Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 28 Jun 2024 17:11:39 +0200 Subject: [PATCH 04/16] chore: add license to new test --- .../client/domain/InfluxQLQueryTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java index df65dd713bb..580ae689c71 100644 --- a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.client.domain; import org.assertj.core.api.Assertions; From d9e11d19cc2aab16287521c9b18fa26cea367b8b Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 2 Jul 2024 14:08:16 +0200 Subject: [PATCH 05/16] feat: adds methods queryCSV and queryJSON to InfluxQLQueryAPI and rollsback query method to previous signature. --- .../com/influxdb/client/InfluxQLQueryApi.java | 72 ++++++++-- .../influxdb/client/domain/InfluxQLQuery.java | 19 ++- .../client/internal/InfluxQLQueryApiImpl.java | 31 ++-- .../influxdb/client/ITInfluxQLQueryApi.java | 134 ++++++++++++++++-- 4 files changed, 221 insertions(+), 35 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java index fc4b349b133..c718b7f3fa8 100644 --- a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java +++ b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java @@ -29,10 +29,25 @@ import com.influxdb.query.InfluxQLQueryResult; /** - * The InfluxQL can be used with /query compatibility endpoint which uses the + * The InfluxQL API can be used with the /query compatibility endpoint which uses the * {@link InfluxQLQuery#getDatabase() database} and * {@link InfluxQLQuery#getRetentionPolicy() retention policy} specified in the query request to * map the request to an InfluxDB bucket. + * + *

Note that as of release 7.2 queries using the legacy InfluxQL compatible endpoint, will use + * the default Accept header mime type of application/json instead of the previous + * mime type of application/csv. This means timestamps will be returned in the RFC3339 format, + * e.g. "2024-06-18T11:29:48.454Z" instead of in the Epoch format, e.g. 1655900000000000000. + *

+ * + *

To continue to use the application/csv mime type and to receive Epoch timestamps, use a + * new convenience method queryCSV. To explicitly indicate use of the application/json + * mime type additional convenience methods queryJSON are also now available. These are synonymous + * with the original query methods.

+ * + *

Note that the Accept header mime type can now also be specified when instantiating the + *{@link com.influxdb.client.domain.InfluxQLQuery} class.

+ * *
* For more information, see: * **/ @ThreadSafe @@ -93,16 +113,48 @@ InfluxQLQueryResult query( @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor ); + /** + * Convenience method to specify use of the mime type application/csv + * in the Accept header. Result timestamps will be in the Epoch format. + * + * @param influxQLQuery the query + * @return the result + */ @Nonnull - InfluxQLQueryResult query( - @Nonnull InfluxQLQuery influxQlQuery, - @Nullable InfluxQLQuery.AcceptHeader header, - @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor - ); + InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery); + /** + * Convenience method to specify use of the mime type application/csv + * in the Accept header. Result timestamps will be in the Epoch format. + * + * @param influxQLQuery the query + * @param valueExtractor a callback, to convert column values + * @return the result + */ + InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor); + + /** + * Convenience method to specify use of the mime type application/json + * in the Accept header. Result timestamps will be in the RFC3339 format. + * + * @param influxQLQuery the query + * @return the result + */ @Nonnull - InfluxQLQueryResult query( - @Nonnull InfluxQLQuery influxQLQuery, - @Nullable InfluxQLQuery.AcceptHeader header - ); + InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery); + + /** + * Convenience method to specify use of the mime type application/json + * in the Accept header. Result timestamps will be in the RFC3339 format. + * + * @param influxQLQuery the query + * @param valueExtractor a callback, to convert column values + * @return the result + */ + @Nonnull + InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor); + + } diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index d9cf5a3e77a..1ac8a59c7ef 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -26,8 +26,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import com.influxdb.client.JSON; - /** * A InfluxQL query. */ @@ -49,6 +47,11 @@ public InfluxQLQuery(@Nonnull final String command, @Nonnull final String databa this.acceptHeader = AcceptHeader.JSON; } + /** + * @param command the InfluxQL command to execute + * @param database the database to run this query against + * @param acceptHeader the Accept header to use in the request + */ public InfluxQLQuery(@Nonnull final String command, @Nonnull final String database, @Nonnull final AcceptHeader acceptHeader) { @@ -110,14 +113,23 @@ public InfluxQLQuery setPrecision(@Nullable final InfluxQLPrecision precision) { return this; } + /** + * @return the current AcceptHeader used when making queries. + */ public AcceptHeader getAcceptHeader() { return acceptHeader; } + /*** + * @param acceptHeader the AcceptHeader to be used when making queries. + */ public void setAcceptHeader(final AcceptHeader acceptHeader) { this.acceptHeader = acceptHeader; } + /** + * @return the string value of the AcceptHeader used when making queries. + */ public String getAcceptHeaderVal() { return acceptHeader != null ? acceptHeader.getVal() : AcceptHeader.JSON.getVal(); } @@ -169,6 +181,9 @@ public static InfluxQLPrecision toTimePrecision(final TimeUnit t) { } } + /** + * The possible values to be used in the header Accept, when making queries. + */ public enum AcceptHeader { JSON("application/json"), CSV("application/csv"); diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index f10864b5623..7ce7aeeb515 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -89,17 +89,34 @@ public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQLQuery, @Nonnull @Override - public InfluxQLQueryResult query( - @Nonnull final InfluxQLQuery influxQLQuery, - @Nonnull final InfluxQLQuery.AcceptHeader accept) { - return query(influxQLQuery, accept, null); + public InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.CSV, null); } + @Override + public InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.CSV, valueExtractor); + + } + @Nonnull + @Override + public InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.JSON, null); + } @Nonnull @Override - public InfluxQLQueryResult query( + public InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.JSON, valueExtractor); + + } + + + @Nonnull + private InfluxQLQueryResult query( @Nonnull final InfluxQLQuery influxQlQuery, @Nullable final InfluxQLQuery.AcceptHeader accept, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor @@ -112,8 +129,6 @@ public InfluxQLQueryResult query( null, accept != null ? accept.getVal() : InfluxQLQuery.AcceptHeader.JSON.getVal()); - System.out.println("DEBUG call header: " + call.request().header("Accept")); - AtomicReference atomicReference = new AtomicReference<>(); BiConsumer consumer = (cancellable, bufferedSource) -> { try { @@ -140,10 +155,8 @@ private InfluxQLQueryResult parseResponse( try (Reader reader = new InputStreamReader(bufferedSource.inputStream(), StandardCharsets.UTF_8)) { if (accept == InfluxQLQuery.AcceptHeader.CSV) { - System.out.println("DEBUG have CSV"); return readInfluxQLCSVResult(reader, cancellable, valueExtractor); } - System.out.println("DEBUG have JSON"); return readInfluxQLJsonResult(reader, cancellable, valueExtractor); } } diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 1d592e8842c..9917cbe5909 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -225,7 +225,6 @@ class ServiceHeaderTest { @BeforeEach void setUp() throws IOException { - mockServer.enqueue(new MockResponse().setBody("")); mockServer.start(); } @@ -236,6 +235,7 @@ void tearDown() throws IOException { @Test public void serviceHeaderCSV() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); InfluxDBClient client = InfluxDBClientFactory.create( mockServer.url("/").toString(), "my_token".toCharArray(), @@ -245,6 +245,8 @@ public void serviceHeaderCSV() throws InterruptedException { InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.CSV)); + Assertions.assertThat(result.getResults()).hasSize(1); + RecordedRequest request = mockServer.takeRequest(); Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); @@ -253,6 +255,7 @@ public void serviceHeaderCSV() throws InterruptedException { @Test public void serviceHeaderJSON() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); InfluxDBClient client = InfluxDBClientFactory.create( mockServer.url("/").toString(), "my_token".toCharArray(), @@ -263,6 +266,8 @@ public void serviceHeaderJSON() throws InterruptedException { InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.JSON)); + Assertions.assertThat(result.getResults()).hasSize(0); + RecordedRequest request = mockServer.takeRequest(); Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); @@ -270,6 +275,7 @@ public void serviceHeaderJSON() throws InterruptedException { @Test public void serviceHeaderDefault() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); InfluxDBClient client = InfluxDBClientFactory.create( mockServer.url("/").toString(), "my_token".toCharArray(), @@ -285,7 +291,8 @@ public void serviceHeaderDefault() throws InterruptedException { } @Test - public void serviceHeaderInCallCSV() throws InterruptedException { + public void serviceHeaderMethodQueryCSV() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); InfluxDBClient client = InfluxDBClientFactory.create( mockServer.url("/").toString(), "my_token".toCharArray(), @@ -294,27 +301,129 @@ public void serviceHeaderInCallCSV() throws InterruptedException { ); InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); - InfluxQLQueryResult result = influxQuery.query( - new InfluxQLQuery("SELECT * FROM cpu", "test_db"), - InfluxQLQuery.AcceptHeader.CSV); + InfluxQLQueryResult result = influxQuery.queryCSV( + new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + Assertions.assertThat(result.getResults()).hasSize(1); RecordedRequest request = mockServer.takeRequest(); Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); } @Test - public void serviceHeaderInCallJSON() throws InterruptedException { + public void serverHeaderMethodQueryCSVExtractor(){ + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,tags,c,d,e\n\"mem\",\"foo=bar\",2,3,4")); InfluxDBClient client = InfluxDBClientFactory.create( mockServer.url("/").toString(), "my_token".toCharArray(), "my_org", "my_bucket" ); - InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); - InfluxQLQueryResult result = influxQuery.query( + InfluxQLQueryResult result = influxQuery.queryCSV( new InfluxQLQuery("SELECT * FROM cpu", "test_db"), - InfluxQLQuery.AcceptHeader.JSON); + (columnName, rawValue, resultIndex, seriesName) -> { + switch(columnName) { + case "c": + return Long.valueOf(rawValue); + case "d": + return Double.valueOf(rawValue); + } + return rawValue; + }); + InfluxQLQueryResult.Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertThat(series.getName()).isEqualTo("mem"); + Assertions.assertThat(series.getTags().get("foo")).isEqualTo("bar"); + Assertions.assertThat(series.getColumns().get("c")).isEqualTo(0); + Assertions.assertThat(series.getColumns().get("d")).isEqualTo(1); + Assertions.assertThat(series.getColumns().get("e")).isEqualTo(2); + Assertions.assertThat(series.getValues().get(0).getValueByKey("c")).isEqualTo(2L); + Assertions.assertThat(series.getValues().get(0).getValueByKey("d")).isEqualTo(3.0); + Assertions.assertThat(series.getValues().get(0).getValueByKey("e")).isEqualTo("4"); + } + + @Test + public void serviceHeaderMethodQueryJSON() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON(new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + Assertions.assertThat(result.getResults()).hasSize(0); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderMethodQueryJSONExtractor() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"results\":[{\"statement_id\":0," + + "\"series\":[{\"name\":\"mem\",\"tags\": { \"foo\":\"bar\"},\"columns\": [\"c\",\"d\",\"e\"]," + + "\"values\":[[2,3,4]]}]}]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON + (new InfluxQLQuery("SELECT * FROM cpu", "test_db"), + (columnName, rawValue, resultIndex, seriesName) -> { + switch(columnName) { + case "c": + return Long.valueOf(rawValue); + case "d": + return Double.valueOf(rawValue); + } + return rawValue; + }); + InfluxQLQueryResult.Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertThat(series.getName()).isEqualTo("mem"); + Assertions.assertThat(series.getTags().get("foo")).isEqualTo("bar"); + Assertions.assertThat(series.getColumns().get("c")).isEqualTo(0); + Assertions.assertThat(series.getColumns().get("d")).isEqualTo(1); + Assertions.assertThat(series.getColumns().get("e")).isEqualTo(2); + Assertions.assertThat(series.getValues().get(0).getValueByKey("c")).isEqualTo(2L); + Assertions.assertThat(series.getValues().get(0).getValueByKey("d")).isEqualTo(3.0); + Assertions.assertThat(series.getValues().get(0).getValueByKey("e")).isEqualTo("4"); + } + + @Test + public void serviceHeaderMethodQueryCSVPrecedent() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryCSV( + new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.JSON)); + Assertions.assertThat(result.getResults()).hasSize(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + @Test + public void serviceHeaderMethodQueryJSONPrecedent() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON( + new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.CSV)); + Assertions.assertThat(result.getResults()).hasSize(0); RecordedRequest request = mockServer.takeRequest(); Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); @@ -359,17 +468,14 @@ public void testQueryJsonPrecision(){ } assert bucket != null; InfluxQLQueryResult result = influxDBClient.getInfluxQLQueryApi() - .query(new InfluxQLQuery( + .queryJSON(new InfluxQLQuery( "SELECT * FROM precise WHERE time > now() - 1m", - bucket.getName()), InfluxQLQuery.AcceptHeader.JSON); + bucket.getName())); for(InfluxQLQueryResult.Result r: result.getResults()){ InfluxQLQueryResult.Series s = r.getSeries().get(0); for(InfluxQLQueryResult.Series.Record record: s.getValues()){ - System.out.println("DEBUG record: " + Arrays.toString(record.getValues())); String domain = Objects.requireNonNull(record.getValueByKey("domain")).toString(); - System.out.println("DEBUG time " + domain + " " + record.getValueByKey("time")); - System.out.println("DEBUG precision value " + precisionValues.get(domain)); Assertions.assertThat(precisionValues.get(domain)) .isEqualTo(Instant.parse( Objects.requireNonNull(record.getValueByKey("time") From b7bd44c80d742d8ec3a9fc48955214a1de814736 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 2 Jul 2024 15:10:07 +0200 Subject: [PATCH 06/16] test: adds unit tests for InfluxQLQuery --- .../client/domain/InfluxQLQueryTest.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java index 580ae689c71..037bf2672a5 100644 --- a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -24,8 +24,18 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Map; +import java.util.concurrent.TimeUnit; + public class InfluxQLQueryTest { + @Test + public void setRetentionPolicy(){ + String rp = "oneOffRP"; + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.setRetentionPolicy(rp).getRetentionPolicy()).isEqualTo(rp); + } + @Test public void headerSelectDefault(){ InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); @@ -39,4 +49,32 @@ public void headerSelect(){ InfluxQLQuery.AcceptHeader.CSV); Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); } + + @Test + public void headerSet(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/json"); + query.setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + } + + @Test + public void timeUnitPrecisionConversion(){ + Map expected = Map.of( + TimeUnit.NANOSECONDS, "n", + TimeUnit.MICROSECONDS, "u", + TimeUnit.MILLISECONDS, "ms", + TimeUnit.SECONDS, "s", + TimeUnit.MINUTES, "m", + TimeUnit.HOURS, "h"); + for(TimeUnit tu: TimeUnit.values()){ + if(!tu.equals(TimeUnit.DAYS)){ + Assertions.assertThat(expected.get(tu)).isEqualTo(InfluxQLQuery.InfluxQLPrecision.toTimePrecision(tu).getSymbol()); + } else { + Assertions.assertThatThrownBy(() -> InfluxQLQuery.InfluxQLPrecision.toTimePrecision(tu)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("time precision must be one of:[HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS]"); + } + } + } } From 3fe22f167a24f24c0508ab57f0bac293839f93ad Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 2 Jul 2024 16:12:28 +0200 Subject: [PATCH 07/16] chore: improves InfluxQLQuery.setAcceptHeader --- .../main/java/com/influxdb/client/domain/InfluxQLQuery.java | 6 +++++- .../java/com/influxdb/client/domain/InfluxQLQueryTest.java | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index 1ac8a59c7ef..9e770d4be62 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -21,6 +21,8 @@ */ package com.influxdb.client.domain; +import com.influxdb.query.InfluxQLQueryResult; + import java.util.Arrays; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -122,9 +124,11 @@ public AcceptHeader getAcceptHeader() { /*** * @param acceptHeader the AcceptHeader to be used when making queries. + * @return this */ - public void setAcceptHeader(final AcceptHeader acceptHeader) { + public InfluxQLQuery setAcceptHeader(final AcceptHeader acceptHeader) { this.acceptHeader = acceptHeader; + return this; } /** diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java index 037bf2672a5..acc88b804a6 100644 --- a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -54,8 +54,8 @@ public void headerSelect(){ public void headerSet(){ InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/json"); - query.setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV); - Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + Assertions.assertThat(query.setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV).getAcceptHeaderVal()) + .isEqualTo("application/csv"); } @Test From 7dbc006fdfb1dc0c1137c052bb1f698b1e42a045 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 2 Jul 2024 16:25:39 +0200 Subject: [PATCH 08/16] chore: cleanup imports --- .../src/main/java/com/influxdb/client/domain/InfluxQLQuery.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index 9e770d4be62..b118972ac63 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -21,8 +21,6 @@ */ package com.influxdb.client.domain; -import com.influxdb.query.InfluxQLQueryResult; - import java.util.Arrays; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; From edf6b145c2d1b313db4ea37e7b3313e2dba00ed7 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 3 Jul 2024 14:40:24 +0200 Subject: [PATCH 09/16] docs: updates CHANGELOG.md and InfluxQLExample.java --- CHANGELOG.md | 24 ++++ .../main/java/example/InfluxQLExample.java | 113 +++++++++++++++--- 2 files changed, 119 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d60cca4fd3..1026eaab866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,29 @@ ## 7.2.0 [unreleased] +### Breaking Changes + +#### InfluxQLQuery default timestamp + +The default timestamp returned by `InfluxQLQueryAPI.query()` is no longer in the POSIX epoch format. It is now in the RFC3339 format. The Epoch format is still supported. It is sufficient to add the `epoch` query parameter to a query request via `InfluxQLQuery.setPrecision()` or to use a new dedicated CSV query method, `InfluxQLQueryAPI.queryCSV()`. + +See header changes in features below. + +### Features + +- [#719](https://github.com/influxdata/influxdb-client-java/issues/719): `InfluxQLQueryService` header changes. + - Now uses `Accept` header with the value `application/json` by default. + - The `Accept` header for InfluxQLQuery calls can now be set dynamically as either `application/json` or `application/csv`. + - :warning: Side effects of these changes: + - When using `application/json` timestamp fields are returned in the [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) format. + - When using `application/csv` timestamp fields are returned in the POSIX epoch format. + - Convenience methods have been added to `InfluxQLQueryAPI` to simplify using CSV if desired. + - Epoch timestamps can also be ensured by calling `InfluxQLQuery.setPrecision()` before executing a query call. + - An `AcceptHeader` field has also been added to the `InfluxQLQuery` class and can be set with `InfluxQLQuery.setAcceptHeader()`. + - More information from the server side: + - [Generated REST API Documentation](https://docs.influxdata.com/influxdb/v2/api/v1-compatibility/#operation/PostQueryV1) + - [Influx 1.1 query compatibility](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/query/) + - See the updated InfluxQLExample + ### Dependencies Update dependencies: diff --git a/examples/src/main/java/example/InfluxQLExample.java b/examples/src/main/java/example/InfluxQLExample.java index 327c8143ed9..7ed40cf0cfc 100644 --- a/examples/src/main/java/example/InfluxQLExample.java +++ b/examples/src/main/java/example/InfluxQLExample.java @@ -24,10 +24,15 @@ import java.math.BigDecimal; import java.time.Instant; +import com.influxdb.LogLevel; +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.InfluxQLQueryApi; +import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.InfluxQLQueryResult; public class InfluxQLExample { @@ -35,11 +40,14 @@ public class InfluxQLExample { private static char[] token = "my-token".toCharArray(); private static String org = "my-org"; - private static String database = "my-org"; + private static String database = "my-bucket"; public static void main(final String[] args) { - try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org)) { + try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, database)) { + //influxDBClient.setLogLevel(LogLevel.BODY); // uncomment to inspect communication messages + + write(influxDBClient); // // Query data @@ -48,28 +56,97 @@ public static void main(final String[] args) { InfluxQLQueryApi queryApi = influxDBClient.getInfluxQLQueryApi(); - // send request - InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(influxQL, database).setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS), - (columnName, rawValue, resultIndex, seriesName) -> { + // send request - uses default Accept: application/json and returns RFC3339 timestamp + InfluxQLQueryResult result = queryApi.query( + new InfluxQLQuery(influxQL, database), + (columnName, rawValue, resultIndex, seriesName) -> { // custom valueExtractor // convert columns - switch (columnName) { - case "time": - return Instant.ofEpochSecond(Long.parseLong(rawValue)); - case "first": - return new BigDecimal(rawValue); - default: - throw new IllegalArgumentException("unexpected column " + columnName); - } + return switch (columnName) { + case "time" -> Instant.parse(rawValue); + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("unexpected column " + columnName); + }; }); - for (InfluxQLQueryResult.Result resultResult : result.getResults()) { - for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { - for (InfluxQLQueryResult.Series.Record record : series.getValues()) { - System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); - } + System.out.println("Default query with valueExtractor"); + dumpResult(result); + + // send request - use Accept: application/csv returns epoch timestamp + result = queryApi.queryCSV( + new InfluxQLQuery(influxQL,database), + (columnName, rawValue, resultIndex, seriesName) -> { // custom valueExtractor + // convert columns + return switch (columnName) { + case "time" -> { + long l = Long.parseLong(rawValue); + yield Instant.ofEpochSecond(l / 1_000_000_000L, + l % 1_000_000_000L); + } + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("unexpected column " + columnName); + }; + }); + + System.out.println("QueryCSV with valueExtractor."); + dumpResult(result); + + // send request - set `Accept` header in InfluxQLQuery object, use raw results. + // N.B. timestamp returned is Epoch nanos in String format. + result = queryApi.query( + new InfluxQLQuery(influxQL,database) + .setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV) + ); + + System.out.println("Default query method with AcceptHeader.CSV in InfluxQLQuery object. Raw results"); + dumpResult(result); + + // send request - use default `Accept` header (application/json), + // but specify epoch precision, use raw results + result = queryApi.query( + new InfluxQLQuery(influxQL, database) + .setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS) + ); + + System.out.println("Default query method with Epoch precision in InfluxQLQuery object. Raw results."); + dumpResult(result); + + } + } + + public static void write(InfluxDBClient influxDBClient){ + WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); + + InfluxQLTestData testData = new InfluxQLTestData(Instant.now().minusSeconds(1)); + + writeApi.writeMeasurement(WritePrecision.NS, testData); + + } + + public static void dumpResult(InfluxQLQueryResult result){ + for (InfluxQLQueryResult.Result resultResult : result.getResults()) { + for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { + for (InfluxQLQueryResult.Series.Record record : series.getValues()) { + System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); } } + } + } + + @Measurement(name = "influxql") + public static class InfluxQLTestData{ + @Column(timestamp = true) + Instant time; + + @Column + Long free; + + @Column(tag = true) + String machine; + public InfluxQLTestData(Instant instant) { + free = (long) (Math.random() * 100); + machine = "test"; + time = instant; } } } From 7a568e3f141a79734a3232283794e43678a46e0e Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 12 Jul 2024 10:37:36 +0200 Subject: [PATCH 10/16] fix: fixes #744 and possible NPEs --- .../influxdb/internal/AbstractQueryApi.java | 20 ++-- .../client/internal/InfluxQLQueryApiImpl.java | 101 +++++++++--------- .../internal/InfluxQLQueryApiImplTest.java | 44 ++++++++ 3 files changed, 111 insertions(+), 54 deletions(-) diff --git a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java index dde96fc10be..9cd8edad872 100644 --- a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java +++ b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java @@ -174,13 +174,21 @@ protected void query(@Nonnull final Call query, Consumer bodyConsumer = body -> { try { BufferedSource source = body.source(); - - // - // Source has data => parse - // - while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) { - + LOG.log(Level.WARNING, String.format("Query %s already exhausted.", + query.request().tag(retrofit2.Invocation.class) + .toString().split(" \\[")[1] + .replace("]", ""))); + // already exhausted - empty or very short response + if (source.exhausted()) { consumer.accept(cancellable, source); + } else { + + // + // Source has data => parse + // + while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) { + consumer.accept(cancellable, source); + } } if (!cancellable.wasCancelled) { diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index 7ce7aeeb515..d84ac224b7f 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -278,12 +278,15 @@ public InfluxQLQueryResult deserialize( final Type type, final JsonDeserializationContext ctx) throws JsonParseException { List results = new ArrayList<>(); - JsonArray jsonArray = elem.getAsJsonObject().get("results").getAsJsonArray(); - for (JsonElement jsonElement : jsonArray) { - if (cancellable.isCancelled()) { - break; + JsonObject result = elem.getAsJsonObject(); + if (result.has("results")) { + JsonArray jsonArray = result.get("results").getAsJsonArray(); + for (JsonElement jsonElement : jsonArray) { + if (cancellable.isCancelled()) { + break; + } + results.add(ctx.deserialize(jsonElement, InfluxQLQueryResult.Result.class)); } - results.add(ctx.deserialize(jsonElement, InfluxQLQueryResult.Result.class)); } return new InfluxQLQueryResult(results); } @@ -306,53 +309,55 @@ public InfluxQLQueryResult.Result deserialize( int id = eobj.get("statement_id").getAsInt(); List series = new ArrayList<>(); JsonArray seriesArray = eobj.getAsJsonArray("series"); - for (JsonElement jserie : seriesArray) { - JsonObject sobj = jserie.getAsJsonObject(); - String name = sobj.getAsJsonObject().get("name").getAsString(); - Map columns = new LinkedHashMap<>(); - Map tags = null; - // Handle columns - JsonArray jac = sobj.get("columns").getAsJsonArray(); - final AtomicInteger count = new AtomicInteger(0); - jac.forEach(e -> { - columns.put(e.getAsString(), count.getAndIncrement()); - }); - - InfluxQLQueryResult.Series serie = null; - // Handle tags - if they exist - if (sobj.get("tags") != null) { - JsonObject tagsObj = sobj.get("tags").getAsJsonObject(); - tags = new LinkedHashMap<>(); - for (String key : tagsObj.keySet()) { - tags.put(key, tagsObj.get(key).getAsString()); + if (seriesArray != null) { + for (JsonElement jserie : seriesArray) { + JsonObject sobj = jserie.getAsJsonObject(); + String name = sobj.getAsJsonObject().get("name").getAsString(); + Map columns = new LinkedHashMap<>(); + Map tags = null; + // Handle columns + JsonArray jac = sobj.get("columns").getAsJsonArray(); + final AtomicInteger count = new AtomicInteger(0); + jac.forEach(e -> { + columns.put(e.getAsString(), count.getAndIncrement()); + }); + + InfluxQLQueryResult.Series serie = null; + // Handle tags - if they exist + if (sobj.get("tags") != null) { + JsonObject tagsObj = sobj.get("tags").getAsJsonObject(); + tags = new LinkedHashMap<>(); + for (String key : tagsObj.keySet()) { + tags.put(key, tagsObj.get(key).getAsString()); + } + serie = new InfluxQLQueryResult.Series(name, tags, columns); + } else { + serie = new InfluxQLQueryResult.Series(name, columns); } - serie = new InfluxQLQueryResult.Series(name, tags, columns); - } else { - serie = new InfluxQLQueryResult.Series(name, columns); - } - JsonArray jvals = sobj.get("values").getAsJsonArray(); - for (JsonElement jval : jvals) { - List values = new ArrayList<>(); - JsonArray jae = jval.getAsJsonArray(); - int index = 0; - for (JsonElement je : jae) { - List columnKeys = new ArrayList<>(serie.getColumns().keySet()); - if (extractor != null) { - String stringVal = je.getAsString(); - Object ov = extractor.extractValue( - columnKeys.get(index), - stringVal, - id, - serie.getName()); - values.add(ov); - } else { - values.add(je.getAsString()); + JsonArray jvals = sobj.get("values").getAsJsonArray(); + for (JsonElement jval : jvals) { + List values = new ArrayList<>(); + JsonArray jae = jval.getAsJsonArray(); + int index = 0; + for (JsonElement je : jae) { + List columnKeys = new ArrayList<>(serie.getColumns().keySet()); + if (extractor != null) { + String stringVal = je.getAsString(); + Object ov = extractor.extractValue( + columnKeys.get(index), + stringVal, + id, + serie.getName()); + values.add(ov); + } else { + values.add(je.getAsString()); + } + index++; } - index++; + serie.addRecord(serie.new Record(values.toArray())); } - serie.addRecord(serie.new Record(values.toArray())); + series.add(serie); } - series.add(serie); } return new InfluxQLQueryResult.Result(id, series); } diff --git a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java index 4ef94077833..15295b731c1 100644 --- a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java +++ b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java @@ -485,4 +485,48 @@ public void readInfluxQLJSONResultCustomExtractValue(){ }); }); } + + @Test + public void deserializeNullSeriesJSON(){ + String nullSeriesResponse = "{\"results\":[{\"statement_id\":0}]}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(nullSeriesResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0).getIndex()).isEqualTo(0); + Assertions.assertThat(results.get(0).getSeries()).hasSize(0); + } + + @Test + public void deserializeNullSeriesCSV() throws IOException { + String nullSeriesResponse = "name,tags,time,val1,val2"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(new StringReader(nullSeriesResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0).getIndex()).isEqualTo(0); + Assertions.assertThat(results.get(0).getSeries()).hasSize(0); + } + + @Test + public void deserializeZeroResultJSON() throws IOException { + String zeroResultResponse = "{\"results\":[]}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(zeroResultResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); + } + + @Test + public void deserializeZeroResultsCSV() throws IOException { + String nullResponse = ""; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(new StringReader(nullResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); + } + + @Test + public void deserializeEmptyResultJSON(){ + String emptyResultResponse = "{}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(emptyResultResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); + } } From 49450ecfe048bd3de9676ef58d91bb8a889c83e2 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 12 Jul 2024 11:20:25 +0200 Subject: [PATCH 11/16] test: adds test of empty resultsfrom server --- .../com/influxdb/client/ITInfluxQLQueryApi.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 9917cbe5909..6c2f686cc9e 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -30,6 +30,7 @@ import java.util.Objects; import com.influxdb.client.domain.Bucket; +import com.influxdb.client.domain.DBRP; import com.influxdb.client.domain.DBRPCreate; import com.influxdb.client.domain.InfluxQLQuery; import com.influxdb.client.domain.WritePrecision; @@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import retrofit2.Response; import static org.assertj.core.api.InstanceOfAssertFactories.BIG_DECIMAL; import static org.assertj.core.api.InstanceOfAssertFactories.INSTANT; @@ -483,4 +485,15 @@ public void testQueryJsonPrecision(){ } } } + + @Test + public void testEmptyResultsResponse() { + + try(InfluxDBClient localClient = InfluxDBClientFactory.create(influxDB_URL, "my-token".toCharArray())) { + InfluxQLQueryResult result = localClient.getInfluxQLQueryApi().query( + new InfluxQLQuery("SHOW FIELD KEYS", "inexistant", InfluxQLQuery.AcceptHeader.CSV)); + + Assertions.assertThat(result.getResults()).hasSize(0); + } + } } From f3582c6a746196ecb48fdba76ee0bbc7bc14fda5 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 12 Jul 2024 14:13:57 +0200 Subject: [PATCH 12/16] chore: adds check for null values array in JSON desrializer --- .../client/internal/InfluxQLQueryApiImpl.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index d84ac224b7f..8d5c7b37ec0 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -335,26 +335,28 @@ public InfluxQLQueryResult.Result deserialize( serie = new InfluxQLQueryResult.Series(name, columns); } JsonArray jvals = sobj.get("values").getAsJsonArray(); - for (JsonElement jval : jvals) { - List values = new ArrayList<>(); - JsonArray jae = jval.getAsJsonArray(); - int index = 0; - for (JsonElement je : jae) { - List columnKeys = new ArrayList<>(serie.getColumns().keySet()); - if (extractor != null) { - String stringVal = je.getAsString(); - Object ov = extractor.extractValue( - columnKeys.get(index), - stringVal, - id, - serie.getName()); - values.add(ov); - } else { - values.add(je.getAsString()); + if (jvals != null) { + for (JsonElement jval : jvals) { + List values = new ArrayList<>(); + JsonArray jae = jval.getAsJsonArray(); + int index = 0; + for (JsonElement je : jae) { + List columnKeys = new ArrayList<>(serie.getColumns().keySet()); + if (extractor != null) { + String stringVal = je.getAsString(); + Object ov = extractor.extractValue( + columnKeys.get(index), + stringVal, + id, + serie.getName()); + values.add(ov); + } else { + values.add(je.getAsString()); + } + index++; } - index++; + serie.addRecord(serie.new Record(values.toArray())); } - serie.addRecord(serie.new Record(values.toArray())); } series.add(serie); } From 54a1e55d8d8477bf5d30af0ac1352d4ac7c10306 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 12 Jul 2024 15:46:27 +0200 Subject: [PATCH 13/16] chore: move log warning to correct branch --- .../main/java/com/influxdb/internal/AbstractQueryApi.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java index 9cd8edad872..39ceb0ad940 100644 --- a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java +++ b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java @@ -174,12 +174,12 @@ protected void query(@Nonnull final Call query, Consumer bodyConsumer = body -> { try { BufferedSource source = body.source(); - LOG.log(Level.WARNING, String.format("Query %s already exhausted.", - query.request().tag(retrofit2.Invocation.class) - .toString().split(" \\[")[1] - .replace("]", ""))); // already exhausted - empty or very short response if (source.exhausted()) { + LOG.log(Level.WARNING, String.format("Query %s already exhausted.", + query.request().tag(retrofit2.Invocation.class) + .toString().split(" \\[")[1] + .replace("]", ""))); consumer.accept(cancellable, source); } else { From 878cd1e3e387c833dce83b304ec28217b4933fa4 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 15 Jul 2024 13:49:59 +0200 Subject: [PATCH 14/16] chore: revert default InfluxQL serializer to CSV to keep backward compatibility. --- .../influxdb/client/domain/InfluxQLQuery.java | 4 +-- .../influxdb/client/ITInfluxQLQueryApi.java | 30 +++++++++++++++---- .../client/domain/InfluxQLQueryTest.java | 8 ++--- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index b118972ac63..39f17e15ea5 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -44,7 +44,7 @@ public class InfluxQLQuery { public InfluxQLQuery(@Nonnull final String command, @Nonnull final String database) { this.command = command; this.database = database; - this.acceptHeader = AcceptHeader.JSON; + this.acceptHeader = AcceptHeader.CSV; } /** @@ -133,7 +133,7 @@ public InfluxQLQuery setAcceptHeader(final AcceptHeader acceptHeader) { * @return the string value of the AcceptHeader used when making queries. */ public String getAcceptHeaderVal() { - return acceptHeader != null ? acceptHeader.getVal() : AcceptHeader.JSON.getVal(); + return acceptHeader != null ? acceptHeader.getVal() : AcceptHeader.CSV.getVal(); } /** diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 6c2f686cc9e..9501e92e030 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -109,8 +109,8 @@ void testQueryData() { .hasSize(1) .first() .satisfies(record -> { -// Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); +// Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("first")).isEqualTo("10"); }); } @@ -147,14 +147,31 @@ void testSelectAll() { .hasSize(1) .first() .satisfies(record -> { - // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); }); } + @Test + void testSelectAllJSON() { + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SELECT * FROM \"influxql\"", DATABASE_NAME, InfluxQLQuery.AcceptHeader.JSON) + ); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + //Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); + Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); + Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); + }); + } + @Test public void testSelectGroupBy(){ InfluxQLQueryResult result = influxQLQueryApi.query( @@ -166,8 +183,9 @@ public void testSelectGroupBy(){ .first() .satisfies(record -> { Assertions.assertThat(record.getValueByKey("region")).isNull(); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); Assertions.assertThat(record.getValueByKey("host")).isNull(); - Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); }); @@ -289,7 +307,7 @@ public void serviceHeaderDefault() throws InterruptedException { InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db")); RecordedRequest request = mockServer.takeRequest(); Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); - Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); } @Test diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java index acc88b804a6..01e6a166cd7 100644 --- a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -39,7 +39,7 @@ public void setRetentionPolicy(){ @Test public void headerSelectDefault(){ InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); - Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/json"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); } @Test @@ -53,9 +53,9 @@ public void headerSelect(){ @Test public void headerSet(){ InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); - Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/json"); - Assertions.assertThat(query.setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV).getAcceptHeaderVal()) - .isEqualTo("application/csv"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + Assertions.assertThat(query.setAcceptHeader(InfluxQLQuery.AcceptHeader.JSON).getAcceptHeaderVal()) + .isEqualTo("application/json"); } @Test From a8094e546427577cddf8ed93f098cba3fb5e73ac Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 15 Jul 2024 14:48:33 +0200 Subject: [PATCH 15/16] chore: update InfluxQL example --- .../main/java/example/InfluxQLExample.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/example/InfluxQLExample.java b/examples/src/main/java/example/InfluxQLExample.java index 7ed40cf0cfc..a8bad9b87de 100644 --- a/examples/src/main/java/example/InfluxQLExample.java +++ b/examples/src/main/java/example/InfluxQLExample.java @@ -62,7 +62,10 @@ public static void main(final String[] args) { (columnName, rawValue, resultIndex, seriesName) -> { // custom valueExtractor // convert columns return switch (columnName) { - case "time" -> Instant.parse(rawValue); + case "time" -> { + long l = Long.parseLong(rawValue); + yield Instant.ofEpochMilli(l / 1_000_000L); + } case "first" -> Long.parseLong(rawValue); default -> throw new IllegalArgumentException("unexpected column " + columnName); }; @@ -90,6 +93,22 @@ public static void main(final String[] args) { System.out.println("QueryCSV with valueExtractor."); dumpResult(result); + result = queryApi.query( + new InfluxQLQuery( + influxQL, + database, + InfluxQLQuery.AcceptHeader.JSON), + (columnName, rawValue, resultIndex, seriesName) -> { + return switch(columnName) { + case "time" -> Instant.parse(rawValue); + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("Unexpected column " + columnName); + }; + }); + + System.out.println("Query with JSON accept header and valueExtractor"); + dumpResult(result); + // send request - set `Accept` header in InfluxQLQuery object, use raw results. // N.B. timestamp returned is Epoch nanos in String format. result = queryApi.query( From 92295f97333ae18ac6322294270a3ee89192565a Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 15 Jul 2024 16:25:38 +0200 Subject: [PATCH 16/16] docs: updates CHANGELOG.md a recent javadoc. --- CHANGELOG.md | 26 +++++++--------- .../com/influxdb/client/InfluxQLQueryApi.java | 31 ++++++++++++------- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1026eaab866..091118f1a4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,28 +1,26 @@ ## 7.2.0 [unreleased] -### Breaking Changes - -#### InfluxQLQuery default timestamp - -The default timestamp returned by `InfluxQLQueryAPI.query()` is no longer in the POSIX epoch format. It is now in the RFC3339 format. The Epoch format is still supported. It is sufficient to add the `epoch` query parameter to a query request via `InfluxQLQuery.setPrecision()` or to use a new dedicated CSV query method, `InfluxQLQueryAPI.queryCSV()`. - -See header changes in features below. - ### Features - [#719](https://github.com/influxdata/influxdb-client-java/issues/719): `InfluxQLQueryService` header changes. - - Now uses `Accept` header with the value `application/json` by default. - - The `Accept` header for InfluxQLQuery calls can now be set dynamically as either `application/json` or `application/csv`. + - `Accept` header can now be defined when making `InfluxQLQuery` calls. Supoorted MIME types: + - `application/csv` + - `application/json` + - The value `application/csv` remains the default. - :warning: Side effects of these changes: - - When using `application/json` timestamp fields are returned in the [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) format. - - When using `application/csv` timestamp fields are returned in the POSIX epoch format. - - Convenience methods have been added to `InfluxQLQueryAPI` to simplify using CSV if desired. + - When using `application/json`, timestamp fields are returned in the [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) format unless `InfluxQLQuery.setPrecision()` has been previously called, in which case they are returned in the POSIX epoch format. + - When using `application/csv`, timestamp fields are returned in the POSIX epoch format. + - Convenience methods have been added to `InfluxQLQueryAPI` to simplify expressly specifying JSON or CSV calls. - Epoch timestamps can also be ensured by calling `InfluxQLQuery.setPrecision()` before executing a query call. - An `AcceptHeader` field has also been added to the `InfluxQLQuery` class and can be set with `InfluxQLQuery.setAcceptHeader()`. - More information from the server side: - [Generated REST API Documentation](https://docs.influxdata.com/influxdb/v2/api/v1-compatibility/#operation/PostQueryV1) - [Influx 1.1 query compatibility](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/query/) - - See the updated InfluxQLExample + - See the updated InfluxQLExample + +### Bug Fixes + +1. [#744](https://github.com/influxdata/influxdb-client-java/issues/744) following an `InfluxQLQueryAPI.query()` call, empty results from the server no longer result in a `null` result value. ### Dependencies diff --git a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java index c718b7f3fa8..9669a72e81e 100644 --- a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java +++ b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java @@ -34,19 +34,28 @@ * {@link InfluxQLQuery#getRetentionPolicy() retention policy} specified in the query request to * map the request to an InfluxDB bucket. * - *

Note that as of release 7.2 queries using the legacy InfluxQL compatible endpoint, will use - * the default Accept header mime type of application/json instead of the previous - * mime type of application/csv. This means timestamps will be returned in the RFC3339 format, - * e.g. "2024-06-18T11:29:48.454Z" instead of in the Epoch format, e.g. 1655900000000000000. - *

+ *

Note that as of release 7.2 queries using the legacy InfluxQL compatible endpoint can specify + * the Accept header MIME type. Two MIME types are supported.

+ *
    + *
  • application/csv - client default and legacy value.
  • + *
  • application/json
  • + *
* - *

To continue to use the application/csv mime type and to receive Epoch timestamps, use a - * new convenience method queryCSV. To explicitly indicate use of the application/json - * mime type additional convenience methods queryJSON are also now available. These are synonymous - * with the original query methods.

+ *

The selected Accept header mime type impacts the timestamp format returned from the server.

+ *
    + *
  • application/csv returns timestamps in the POSIX epoch format.
  • + *
  • application/json returns timestamps as RFC3339 strings. + *
      + *
    • Caveat. If InfluxQLQuery.setPrecision() is called before the query is sent, then + * the timestamp will be returned as a POSIX epoch reflecting the desired precision, even when using the + * application/json MIME type.
    • + *
    + *
  • + *
* - *

Note that the Accept header mime type can now also be specified when instantiating the - *{@link com.influxdb.client.domain.InfluxQLQuery} class.

+ *

To explicitly choose one or the other MIME type new convenience methods are povided: queryCSV + * and queryJSON. Note that the Accept header MIME type can now also be specified + * when instantiating the {@link com.influxdb.client.domain.InfluxQLQuery} class.

* *
* For more information, see: