diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d60cca4fd3..091118f1a4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ ## 7.2.0 [unreleased] +### Features + +- [#719](https://github.com/influxdata/influxdb-client-java/issues/719): `InfluxQLQueryService` header changes. + - `Accept` header can now be defined when making `InfluxQLQuery` calls. Supoorted MIME types: + - `application/csv` + - `application/json` + - The value `application/csv` remains the default. + - :warning: Side effects of these changes: + - When using `application/json`, timestamp fields are returned in the [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) format unless `InfluxQLQuery.setPrecision()` has been previously called, in which case they are returned in the POSIX epoch format. + - When using `application/csv`, timestamp fields are returned in the POSIX epoch format. + - Convenience methods have been added to `InfluxQLQueryAPI` to simplify expressly specifying JSON or CSV calls. + - Epoch timestamps can also be ensured by calling `InfluxQLQuery.setPrecision()` before executing a query call. + - An `AcceptHeader` field has also been added to the `InfluxQLQuery` class and can be set with `InfluxQLQuery.setAcceptHeader()`. + - More information from the server side: + - [Generated REST API Documentation](https://docs.influxdata.com/influxdb/v2/api/v1-compatibility/#operation/PostQueryV1) + - [Influx 1.1 query compatibility](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/query/) + - See the updated InfluxQLExample + +### Bug Fixes + +1. [#744](https://github.com/influxdata/influxdb-client-java/issues/744) following an `InfluxQLQueryAPI.query()` call, empty results from the server no longer result in a `null` result value. + ### Dependencies Update dependencies: diff --git a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java index dde96fc10be..39ceb0ad940 100644 --- a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java +++ b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java @@ -174,13 +174,21 @@ protected void query(@Nonnull final Call query, Consumer bodyConsumer = body -> { try { BufferedSource source = body.source(); - - // - // Source has data => parse - // - while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) { - + // already exhausted - empty or very short response + if (source.exhausted()) { + LOG.log(Level.WARNING, String.format("Query %s already exhausted.", + query.request().tag(retrofit2.Invocation.class) + .toString().split(" \\[")[1] + .replace("]", ""))); consumer.accept(cancellable, source); + } else { + + // + // Source has data => parse + // + while (source.isOpen() && !source.exhausted() && !cancellable.wasCancelled) { + consumer.accept(cancellable, source); + } } if (!cancellable.wasCancelled) { diff --git a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java index 88c9012e205..4f2fe8f7a1b 100644 --- a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java +++ b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java @@ -210,7 +210,6 @@ public Object[] getValues() { return values; } } - } } diff --git a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java index 73328f01af2..6563d022830 100644 --- a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java +++ b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java @@ -15,7 +15,7 @@ public interface InfluxQLQueryService { * @param zapTraceSpan OpenTracing span context (optional) * @return response in csv format */ - @Headers({"Accept:application/csv", "Content-Type:application/x-www-form-urlencoded"}) + @Headers({"Content-Type:application/x-www-form-urlencoded"}) @FormUrlEncoded @POST("query") Call query( @@ -23,6 +23,7 @@ Call query( @Nonnull @Query("db") String db, @Query("rp") String retentionPolicy, @Query("epoch") String epoch, - @Header("Zap-Trace-Span") String zapTraceSpan + @Header("Zap-Trace-Span") String zapTraceSpan, + @Header("Accept") String accept ); } diff --git a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java index c3624065d56..9669a72e81e 100644 --- a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java +++ b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java @@ -29,10 +29,34 @@ import com.influxdb.query.InfluxQLQueryResult; /** - * The InfluxQL can be used with /query compatibility endpoint which uses the + * The InfluxQL API can be used with the /query compatibility endpoint which uses the * {@link InfluxQLQuery#getDatabase() database} and * {@link InfluxQLQuery#getRetentionPolicy() retention policy} specified in the query request to * map the request to an InfluxDB bucket. + * + *

Note that as of release 7.2 queries using the legacy InfluxQL compatible endpoint can specify + * the Accept header MIME type. Two MIME types are supported.

+ *
    + *
  • application/csv - client default and legacy value.
  • + *
  • application/json
  • + *
+ * + *

The selected Accept header mime type impacts the timestamp format returned from the server.

+ *
    + *
  • application/csv returns timestamps in the POSIX epoch format.
  • + *
  • application/json returns timestamps as RFC3339 strings. + *
      + *
    • Caveat. If InfluxQLQuery.setPrecision() is called before the query is sent, then + * the timestamp will be returned as a POSIX epoch reflecting the desired precision, even when using the + * application/json MIME type.
    • + *
    + *
  • + *
+ * + *

To explicitly choose one or the other MIME type new convenience methods are povided: queryCSV + * and queryJSON. Note that the Accept header MIME type can now also be specified + * when instantiating the {@link com.influxdb.client.domain.InfluxQLQuery} class.

+ * *
* For more information, see: * **/ @ThreadSafe @@ -92,4 +121,49 @@ InfluxQLQueryResult query( @Nonnull InfluxQLQuery influxQlQuery, @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor ); + + /** + * Convenience method to specify use of the mime type application/csv + * in the Accept header. Result timestamps will be in the Epoch format. + * + * @param influxQLQuery the query + * @return the result + */ + @Nonnull + InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery); + + /** + * Convenience method to specify use of the mime type application/csv + * in the Accept header. Result timestamps will be in the Epoch format. + * + * @param influxQLQuery the query + * @param valueExtractor a callback, to convert column values + * @return the result + */ + InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor); + + /** + * Convenience method to specify use of the mime type application/json + * in the Accept header. Result timestamps will be in the RFC3339 format. + * + * @param influxQLQuery the query + * @return the result + */ + @Nonnull + InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery); + + /** + * Convenience method to specify use of the mime type application/json + * in the Accept header. Result timestamps will be in the RFC3339 format. + * + * @param influxQLQuery the query + * @param valueExtractor a callback, to convert column values + * @return the result + */ + @Nonnull + InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor); + + } diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java index 80d8673606c..39f17e15ea5 100644 --- a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -30,10 +30,12 @@ * A InfluxQL query. */ public class InfluxQLQuery { + private final String command; private final String database; private String retentionPolicy; private InfluxQLPrecision precision; + private AcceptHeader acceptHeader; /** * @param command the InfluxQL command to execute @@ -42,6 +44,20 @@ public class InfluxQLQuery { public InfluxQLQuery(@Nonnull final String command, @Nonnull final String database) { this.command = command; this.database = database; + this.acceptHeader = AcceptHeader.CSV; + } + + /** + * @param command the InfluxQL command to execute + * @param database the database to run this query against + * @param acceptHeader the Accept header to use in the request + */ + public InfluxQLQuery(@Nonnull final String command, + @Nonnull final String database, + @Nonnull final AcceptHeader acceptHeader) { + this.command = command; + this.database = database; + this.acceptHeader = acceptHeader; } /** @@ -97,6 +113,29 @@ public InfluxQLQuery setPrecision(@Nullable final InfluxQLPrecision precision) { return this; } + /** + * @return the current AcceptHeader used when making queries. + */ + public AcceptHeader getAcceptHeader() { + return acceptHeader; + } + + /*** + * @param acceptHeader the AcceptHeader to be used when making queries. + * @return this + */ + public InfluxQLQuery setAcceptHeader(final AcceptHeader acceptHeader) { + this.acceptHeader = acceptHeader; + return this; + } + + /** + * @return the string value of the AcceptHeader used when making queries. + */ + public String getAcceptHeaderVal() { + return acceptHeader != null ? acceptHeader.getVal() : AcceptHeader.CSV.getVal(); + } + /** * The precision used for the timestamps returned by InfluxQL queries. */ @@ -143,4 +182,22 @@ public static InfluxQLPrecision toTimePrecision(final TimeUnit t) { } } } + + /** + * The possible values to be used in the header Accept, when making queries. + */ + public enum AcceptHeader { + JSON("application/json"), + CSV("application/csv"); + + private final String val; + + AcceptHeader(final String val) { + this.val = val; + } + + public String getVal() { + return val; + } + } } diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java index 255ae17a1e9..8d5c7b37ec0 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import javax.annotation.Nonnull; @@ -44,6 +46,16 @@ import com.influxdb.query.InfluxQLQueryResult; import com.influxdb.utils.Arguments; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonIOException; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSyntaxException; import okhttp3.ResponseBody; import okio.BufferedSource; import org.apache.commons.csv.CSVFormat; @@ -64,14 +76,49 @@ public InfluxQLQueryApiImpl(@Nonnull final InfluxQLQueryService service) { @Nonnull @Override - public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQlQuery) { - return query(influxQlQuery, null); + public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, influxQLQuery.getAcceptHeader(), null); } @Nonnull @Override - public InfluxQLQueryResult query( + public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, influxQLQuery.getAcceptHeader(), valueExtractor); + } + + @Nonnull + @Override + public InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.CSV, null); + } + + @Override + public InfluxQLQueryResult queryCSV(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.CSV, valueExtractor); + + } + + @Nonnull + @Override + public InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.JSON, null); + } + + @Nonnull + @Override + public InfluxQLQueryResult queryJSON(@Nonnull final InfluxQLQuery influxQLQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) { + return query(influxQLQuery, InfluxQLQuery.AcceptHeader.JSON, valueExtractor); + + } + + + @Nonnull + private InfluxQLQueryResult query( @Nonnull final InfluxQLQuery influxQlQuery, + @Nullable final InfluxQLQuery.AcceptHeader accept, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor ) { Call call = service.query( @@ -79,12 +126,16 @@ public InfluxQLQueryResult query( influxQlQuery.getDatabase(), influxQlQuery.getRetentionPolicy(), influxQlQuery.getPrecision() != null ? influxQlQuery.getPrecision().getSymbol() : null, - null); + null, + accept != null ? accept.getVal() : InfluxQLQuery.AcceptHeader.JSON.getVal()); AtomicReference atomicReference = new AtomicReference<>(); BiConsumer consumer = (cancellable, bufferedSource) -> { try { - InfluxQLQueryResult result = parseResponse(bufferedSource, cancellable, valueExtractor); + InfluxQLQueryResult result = parseResponse(bufferedSource, + cancellable, + accept, + valueExtractor); atomicReference.set(result); } catch (IOException e) { ERROR_CONSUMER.accept(e); @@ -97,16 +148,20 @@ public InfluxQLQueryResult query( private InfluxQLQueryResult parseResponse( @Nonnull final BufferedSource bufferedSource, @Nonnull final Cancellable cancellable, + @Nonnull final InfluxQLQuery.AcceptHeader accept, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) throws IOException { Arguments.checkNotNull(bufferedSource, "bufferedSource"); try (Reader reader = new InputStreamReader(bufferedSource.inputStream(), StandardCharsets.UTF_8)) { - return readInfluxQLResult(reader, cancellable, valueExtractor); + if (accept == InfluxQLQuery.AcceptHeader.CSV) { + return readInfluxQLCSVResult(reader, cancellable, valueExtractor); + } + return readInfluxQLJsonResult(reader, cancellable, valueExtractor); } } - static InfluxQLQueryResult readInfluxQLResult( + static InfluxQLQueryResult readInfluxQLCSVResult( @Nonnull final Reader reader, @Nonnull final Cancellable cancellable, @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor @@ -189,4 +244,124 @@ private static Map parseTags(@Nonnull final String value) { return tags; } + + static InfluxQLQueryResult readInfluxQLJsonResult( + @Nonnull final Reader reader, + @Nonnull final Cancellable cancellable, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ) { + + Gson gson = new GsonBuilder() + .registerTypeAdapter(InfluxQLQueryResult.class, new ResultsDeserializer(cancellable)) + .registerTypeAdapter(InfluxQLQueryResult.Result.class, new ResultDeserializer(valueExtractor)) + .create(); + + try { + return gson.fromJson(reader, InfluxQLQueryResult.class); + } catch (JsonSyntaxException | JsonIOException jse) { + ERROR_CONSUMER.accept(jse); + return null; + } + } + + public static class ResultsDeserializer implements JsonDeserializer { + + Cancellable cancellable; + + public ResultsDeserializer(final Cancellable cancellable) { + this.cancellable = cancellable; + } + + @Override + public InfluxQLQueryResult deserialize( + final JsonElement elem, + final Type type, + final JsonDeserializationContext ctx) throws JsonParseException { + List results = new ArrayList<>(); + JsonObject result = elem.getAsJsonObject(); + if (result.has("results")) { + JsonArray jsonArray = result.get("results").getAsJsonArray(); + for (JsonElement jsonElement : jsonArray) { + if (cancellable.isCancelled()) { + break; + } + results.add(ctx.deserialize(jsonElement, InfluxQLQueryResult.Result.class)); + } + } + return new InfluxQLQueryResult(results); + } + } + + public static class ResultDeserializer implements JsonDeserializer { + + InfluxQLQueryResult.Series.ValueExtractor extractor; + + public ResultDeserializer(final InfluxQLQueryResult.Series.ValueExtractor extractor) { + this.extractor = extractor; + } + + @Override + public InfluxQLQueryResult.Result deserialize( + final JsonElement elem, + final Type type, + final JsonDeserializationContext ctx) throws JsonParseException { + JsonObject eobj = elem.getAsJsonObject(); + int id = eobj.get("statement_id").getAsInt(); + List series = new ArrayList<>(); + JsonArray seriesArray = eobj.getAsJsonArray("series"); + if (seriesArray != null) { + for (JsonElement jserie : seriesArray) { + JsonObject sobj = jserie.getAsJsonObject(); + String name = sobj.getAsJsonObject().get("name").getAsString(); + Map columns = new LinkedHashMap<>(); + Map tags = null; + // Handle columns + JsonArray jac = sobj.get("columns").getAsJsonArray(); + final AtomicInteger count = new AtomicInteger(0); + jac.forEach(e -> { + columns.put(e.getAsString(), count.getAndIncrement()); + }); + + InfluxQLQueryResult.Series serie = null; + // Handle tags - if they exist + if (sobj.get("tags") != null) { + JsonObject tagsObj = sobj.get("tags").getAsJsonObject(); + tags = new LinkedHashMap<>(); + for (String key : tagsObj.keySet()) { + tags.put(key, tagsObj.get(key).getAsString()); + } + serie = new InfluxQLQueryResult.Series(name, tags, columns); + } else { + serie = new InfluxQLQueryResult.Series(name, columns); + } + JsonArray jvals = sobj.get("values").getAsJsonArray(); + if (jvals != null) { + for (JsonElement jval : jvals) { + List values = new ArrayList<>(); + JsonArray jae = jval.getAsJsonArray(); + int index = 0; + for (JsonElement je : jae) { + List columnKeys = new ArrayList<>(serie.getColumns().keySet()); + if (extractor != null) { + String stringVal = je.getAsString(); + Object ov = extractor.extractValue( + columnKeys.get(index), + stringVal, + id, + serie.getName()); + values.add(ov); + } else { + values.add(je.getAsString()); + } + index++; + } + serie.addRecord(serie.new Record(values.toArray())); + } + } + series.add(serie); + } + } + return new InfluxQLQueryResult.Result(id, series); + } + } } diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java index 45dcda1a58f..9501e92e030 100644 --- a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -24,8 +24,13 @@ import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import com.influxdb.client.domain.Bucket; +import com.influxdb.client.domain.DBRP; import com.influxdb.client.domain.DBRPCreate; import com.influxdb.client.domain.InfluxQLQuery; import com.influxdb.client.domain.WritePrecision; @@ -33,10 +38,16 @@ import com.influxdb.client.write.Point; import com.influxdb.query.InfluxQLQueryResult; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import retrofit2.Response; import static org.assertj.core.api.InstanceOfAssertFactories.BIG_DECIMAL; import static org.assertj.core.api.InstanceOfAssertFactories.INSTANT; @@ -81,6 +92,15 @@ void testShowDatabases() { .contains(DATABASE_NAME); } + @Test + void testShowDatabasesCSV() { + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SHOW DATABASES", DATABASE_NAME, InfluxQLQuery.AcceptHeader.CSV)); + assertSingleSeriesRecords(result) + .map(record -> record.getValueByKey("name")) + // internal buckets are also available by DBRP mapping + .contains(DATABASE_NAME); + } @Test void testQueryData() { @@ -90,6 +110,7 @@ void testQueryData() { .first() .satisfies(record -> { Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); +// Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("first")).isEqualTo("10"); }); } @@ -127,12 +148,62 @@ void testSelectAll() { .first() .satisfies(record -> { Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); }); } + @Test + void testSelectAllJSON() { + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SELECT * FROM \"influxql\"", DATABASE_NAME, InfluxQLQuery.AcceptHeader.JSON) + ); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + //Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); + Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); + Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); + }); + } + + @Test + public void testSelectGroupBy(){ + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SELECT * FROM \"influxql\" GROUP By \"region\",\"host\"", DATABASE_NAME) + ); + + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + Assertions.assertThat(record.getValueByKey("region")).isNull(); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("host")).isNull(); + // Assertions.assertThat(record.getValueByKey("time")).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); + }); + + Assertions.assertThat(result) + .extracting(InfluxQLQueryResult::getResults, list(InfluxQLQueryResult.Result.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Result::getSeries, list(InfluxQLQueryResult.Series.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Series::getTags) + .satisfies(tagz -> { + Assertions.assertThat(tagz).isNotNull(); + Assertions.assertThat(tagz.get("host")).isEqualTo("A"); + Assertions.assertThat(tagz.get("region")).isEqualTo("west"); + }); + } + @Test void testInfluxDB18() { // create database @@ -166,4 +237,281 @@ private ListAssert assertSingleSeriesRecords( .first() .extracting(InfluxQLQueryResult.Series::getValues, list(InfluxQLQueryResult.Series.Record.class)); } + + @Nested + class ServiceHeaderTest { + + protected MockWebServer mockServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + mockServer.start(); + } + + @AfterEach + void tearDown() throws IOException { + mockServer.shutdown(); + } + + @Test + public void serviceHeaderCSV() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.CSV)); + Assertions.assertThat(result.getResults()).hasSize(1); + + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + + @Test + public void serviceHeaderJSON() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db", + InfluxQLQuery.AcceptHeader.JSON)); + Assertions.assertThat(result.getResults()).hasSize(0); + + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderDefault() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.query(new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + @Test + public void serviceHeaderMethodQueryCSV() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryCSV( + new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + Assertions.assertThat(result.getResults()).hasSize(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + @Test + public void serverHeaderMethodQueryCSVExtractor(){ + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,tags,c,d,e\n\"mem\",\"foo=bar\",2,3,4")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryCSV( + new InfluxQLQuery("SELECT * FROM cpu", "test_db"), + (columnName, rawValue, resultIndex, seriesName) -> { + switch(columnName) { + case "c": + return Long.valueOf(rawValue); + case "d": + return Double.valueOf(rawValue); + } + return rawValue; + }); + InfluxQLQueryResult.Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertThat(series.getName()).isEqualTo("mem"); + Assertions.assertThat(series.getTags().get("foo")).isEqualTo("bar"); + Assertions.assertThat(series.getColumns().get("c")).isEqualTo(0); + Assertions.assertThat(series.getColumns().get("d")).isEqualTo(1); + Assertions.assertThat(series.getColumns().get("e")).isEqualTo(2); + Assertions.assertThat(series.getValues().get(0).getValueByKey("c")).isEqualTo(2L); + Assertions.assertThat(series.getValues().get(0).getValueByKey("d")).isEqualTo(3.0); + Assertions.assertThat(series.getValues().get(0).getValueByKey("e")).isEqualTo("4"); + } + + @Test + public void serviceHeaderMethodQueryJSON() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON(new InfluxQLQuery("SELECT * FROM cpu", "test_db")); + Assertions.assertThat(result.getResults()).hasSize(0); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderMethodQueryJSONExtractor() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"results\":[{\"statement_id\":0," + + "\"series\":[{\"name\":\"mem\",\"tags\": { \"foo\":\"bar\"},\"columns\": [\"c\",\"d\",\"e\"]," + + "\"values\":[[2,3,4]]}]}]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON + (new InfluxQLQuery("SELECT * FROM cpu", "test_db"), + (columnName, rawValue, resultIndex, seriesName) -> { + switch(columnName) { + case "c": + return Long.valueOf(rawValue); + case "d": + return Double.valueOf(rawValue); + } + return rawValue; + }); + InfluxQLQueryResult.Series series = result.getResults().get(0).getSeries().get(0); + Assertions.assertThat(series.getName()).isEqualTo("mem"); + Assertions.assertThat(series.getTags().get("foo")).isEqualTo("bar"); + Assertions.assertThat(series.getColumns().get("c")).isEqualTo(0); + Assertions.assertThat(series.getColumns().get("d")).isEqualTo(1); + Assertions.assertThat(series.getColumns().get("e")).isEqualTo(2); + Assertions.assertThat(series.getValues().get(0).getValueByKey("c")).isEqualTo(2L); + Assertions.assertThat(series.getValues().get(0).getValueByKey("d")).isEqualTo(3.0); + Assertions.assertThat(series.getValues().get(0).getValueByKey("e")).isEqualTo("4"); + } + + @Test + public void serviceHeaderMethodQueryCSVPrecedent() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("a,b,c,d,e,f")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryCSV( + new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.JSON)); + Assertions.assertThat(result.getResults()).hasSize(1); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/csv"); + } + + @Test + public void serviceHeaderMethodQueryJSONPrecedent() throws InterruptedException { + mockServer.enqueue(new MockResponse().setResponseCode(200).setBody("{results:[]}")); + InfluxDBClient client = InfluxDBClientFactory.create( + mockServer.url("/").toString(), + "my_token".toCharArray(), + "my_org", + "my_bucket" + ); + InfluxQLQueryApi influxQuery = client.getInfluxQLQueryApi(); + InfluxQLQueryResult result = influxQuery.queryJSON( + new InfluxQLQuery("SELECT * FROM cpu", "test_db", InfluxQLQuery.AcceptHeader.CSV)); + Assertions.assertThat(result.getResults()).hasSize(0); + RecordedRequest request = mockServer.takeRequest(); + Assertions.assertThat(request.getHeader("Authorization")).isEqualTo("Token my_token"); + Assertions.assertThat(request.getHeader("Accept")).isEqualTo("application/json"); + } + } + + @Test + public void testQueryJsonPrecision(){ + Bucket bucket = influxDBClient.getBucketsApi().findBucketByName("my-bucket"); + int idx = 0; + Map precisionValues = new HashMap<>(); + for(WritePrecision precision : WritePrecision.values()){ + Instant time = Instant.now().minusSeconds(10 * (1 + idx++)); + long nanoTimestamp = (time.getEpochSecond() * 1_000_000_000L) + time.getNano(); + + long timestamp = 0; + switch(precision){ + case S: + timestamp = nanoTimestamp/1_000_000_000L; + precisionValues.put(precision.getValue(), Instant.ofEpochSecond(timestamp)); + break; + case MS: + timestamp = nanoTimestamp/1_000_000L; + precisionValues.put(precision.getValue(), Instant.ofEpochMilli(timestamp)); + break; + case US: + timestamp = nanoTimestamp/1_000L; + precisionValues.put(precision.getValue(), + Instant.ofEpochSecond(timestamp/1_000_000L, (timestamp%1_000_000L) * 1000)); + break; + case NS: + timestamp = nanoTimestamp; + precisionValues.put(precision.getValue(), + Instant.ofEpochSecond(timestamp/1_000_000_000L, timestamp%1_000_000_000L)); + break; + } + influxDBClient.getWriteApiBlocking() + .writePoint(bucket.getId(), bucket.getOrgID(), new Point("precise") + .time(timestamp, precision) + .addField("cpu_usage", 10.42) + .addTag("domain", precision.toString())); + } + assert bucket != null; + InfluxQLQueryResult result = influxDBClient.getInfluxQLQueryApi() + .queryJSON(new InfluxQLQuery( + "SELECT * FROM precise WHERE time > now() - 1m", + bucket.getName())); + + for(InfluxQLQueryResult.Result r: result.getResults()){ + InfluxQLQueryResult.Series s = r.getSeries().get(0); + for(InfluxQLQueryResult.Series.Record record: s.getValues()){ + String domain = Objects.requireNonNull(record.getValueByKey("domain")).toString(); + Assertions.assertThat(precisionValues.get(domain)) + .isEqualTo(Instant.parse( + Objects.requireNonNull(record.getValueByKey("time") + ).toString())); + } + } + } + + @Test + public void testEmptyResultsResponse() { + + try(InfluxDBClient localClient = InfluxDBClientFactory.create(influxDB_URL, "my-token".toCharArray())) { + InfluxQLQueryResult result = localClient.getInfluxQLQueryApi().query( + new InfluxQLQuery("SHOW FIELD KEYS", "inexistant", InfluxQLQuery.AcceptHeader.CSV)); + + Assertions.assertThat(result.getResults()).hasSize(0); + } + } } diff --git a/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java b/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java index af876ccf752..c3a53bb238c 100644 --- a/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java +++ b/client/src/test/java/com/influxdb/client/InfluxDBClientTest.java @@ -33,6 +33,8 @@ import java.util.logging.Logger; import javax.annotation.Nonnull; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.service.InfluxQLQueryService; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpHandler; @@ -48,12 +50,14 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import okhttp3.ResponseBody; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import retrofit2.Call; /** * @author Jakub Bednar (bednar@github) (05/09/2018 14:00) @@ -122,6 +126,28 @@ public void createNotificationRulesApi() { Assertions.assertThat(influxDBClient.getNotificationRulesApi()).isNotNull(); } + @Test + public void serviceHeaderDefault() { + InfluxQLQueryService service = influxDBClient.getService(InfluxQLQueryService.class); + Call call = service.query("SELECT * FROM cpu", "test_db", + null, + null, + null, + InfluxQLQuery.AcceptHeader.JSON.getVal()); + Assertions.assertThat(call.request().header("Accept")).isEqualTo("application/json"); + } + + @Test + public void serviceHeaderChange() { + InfluxQLQueryService service = influxDBClient.getService(InfluxQLQueryService.class); + Call call = service.query("SELECT * FROM cpu", "test_db", + null, + null, + null, + InfluxQLQuery.AcceptHeader.CSV.getVal()); + Assertions.assertThat(call.request().header("accept")).isEqualTo("application/csv"); + } + @Test void logLevel() { diff --git a/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java new file mode 100644 index 00000000000..01e6a166cd7 --- /dev/null +++ b/client/src/test/java/com/influxdb/client/domain/InfluxQLQueryTest.java @@ -0,0 +1,80 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.domain; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class InfluxQLQueryTest { + + @Test + public void setRetentionPolicy(){ + String rp = "oneOffRP"; + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.setRetentionPolicy(rp).getRetentionPolicy()).isEqualTo(rp); + } + + @Test + public void headerSelectDefault(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + } + + @Test + public void headerSelect(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", + "test_db", + InfluxQLQuery.AcceptHeader.CSV); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + } + + @Test + public void headerSet(){ + InfluxQLQuery query = new InfluxQLQuery("SELECT * FROM cpu", "test_db"); + Assertions.assertThat(query.getAcceptHeaderVal()).isEqualTo("application/csv"); + Assertions.assertThat(query.setAcceptHeader(InfluxQLQuery.AcceptHeader.JSON).getAcceptHeaderVal()) + .isEqualTo("application/json"); + } + + @Test + public void timeUnitPrecisionConversion(){ + Map expected = Map.of( + TimeUnit.NANOSECONDS, "n", + TimeUnit.MICROSECONDS, "u", + TimeUnit.MILLISECONDS, "ms", + TimeUnit.SECONDS, "s", + TimeUnit.MINUTES, "m", + TimeUnit.HOURS, "h"); + for(TimeUnit tu: TimeUnit.values()){ + if(!tu.equals(TimeUnit.DAYS)){ + Assertions.assertThat(expected.get(tu)).isEqualTo(InfluxQLQuery.InfluxQLPrecision.toTimePrecision(tu).getSymbol()); + } else { + Assertions.assertThatThrownBy(() -> InfluxQLQuery.InfluxQLPrecision.toTimePrecision(tu)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("time precision must be one of:[HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS]"); + } + } + } +} diff --git a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java index fca1109ad86..15295b731c1 100644 --- a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java +++ b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java @@ -29,10 +29,9 @@ import com.influxdb.Cancellable; import com.influxdb.query.InfluxQLQueryResult; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; import org.assertj.core.api.Assertions; +import org.junit.Ignore; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; class InfluxQLQueryApiImplTest { @@ -77,7 +76,7 @@ void readInfluxQLResult() throws IOException { "cpu,\"region=us-east-1,host=server2\",1483225200,67.91,1.3\n" ); - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, extractValues); + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(reader, NO_CANCELLING, extractValues); List results = result.getResults(); Assertions.assertThat(results).hasSize(4); @@ -187,7 +186,7 @@ public void readInfluxQLShowSeriesRequest() throws IOException { ",,\"temperature,locale=nw002,device=rpi5_88e1\"" ); - InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(reader, NO_CANCELLING, (columnName, rawValue, resultIndex, seriesName) -> { return rawValue;}); Assertions.assertThat(result.getResults().get(0)) @@ -213,6 +212,321 @@ public void readInfluxQLShowSeriesRequest() throws IOException { }); }); }); + } + + StringReader sampleReader = new StringReader("{\n" + + " \"results\":\n" + + "[\n" + + " {\n" + + " \"statement_id\": 0,\n" + + " \"series\": \n" + + " [ \n" + + " {\n" + + " \"name\": \"data1\",\n" + + " \"columns\": [\"time\",\"first\"],\n" + + " \"values\": [\n" + + " [1483225200, 1]\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"data2\",\n" + + " \"columns\": [\"time\",\"first\"],\n" + + " \"values\": [\n" + + " [1483225200, 2]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 1,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"data\",\n" + + " \"columns\": [\"time\",\"first\",\"text\"],\n" + + " \"values\": [\n" + + " [1500000000, 42, \"foo\"]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 2,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"databases\",\n" + + " \"columns\" : [\"name\"],\n" + + " \"values\" : [\n" + + " [\"measurement-1\"],\n" + + " [\"measurement-2\"]\n" + + " ]\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 3,\n" + + " \"series\": \n" + + " [ \n" + + " {\n" + + " \"name\": \"cpu\",\n" + + " \"tags\": {\"region\": \"us-east-1\", \"host\": \"server1\" },\n" + + " \"columns\": [\"time\", \"usage_user\", \"usage_system\"],\n" + + " \"values\" : [\n" + + " [1483225200,13.57,1.4],\n" + + " [1483225201,14.06,1.7]\n" + + " ] \n" + + " },\n" + + " {\n" + + " \"name\": \"cpu\",\n" + + " \"tags\": {\"region\": \"us-east-1\", \"host\": \"server2\" },\n" + + " \"columns\": [\"time\", \"usage_user\", \"usage_system\"],\n" + + " \"values\" : [\n" + + " [1483225200,67.91,1.3]\n" + + " ] \n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"statement_id\": 4,\n" + + " \"series\":\n" + + " [ \n" + + " {\n" + + " \"name\": \"login\",\n" + + " \"tags\": {\"region\": \"eu-west-3\", \"host\": \"portal-17\"},\n" + + " \"columns\": [\"time\", \"user_id\", \"success\", \"stay\"],\n" + + " \"values\" : [\n" + + " [ \"2024-06-18T11:29:48.454Z\", 958772110, true, 1.27],\n" + + " [ \"2024-06-18T11:29:47.124Z\", 452223904, false, 0.0],\n" + + " [ \"2024-06-18T11:29:45.007Z\", 147178901, true, 15.5],\n" + + " [ \"2024-06-18T11:29:41.881Z\", 71119178, true, 78.4]\n" + + " ]\n" + + " }\n" + + " ] \n" + + " } \n" + + "]\n" + + "}"); + + // All values as Strings - universal default + @Test + public void readInfluxQLJSONResult(){ + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("2"); + }); + }); + Assertions.assertThat(results.get(1)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first", "text"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1500000000"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("42"); + Assertions.assertThat(record.getValueByKey("text")).isEqualTo("foo"); + }); + }); + Assertions.assertThat(results.get(2)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("databases"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("name"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("name")) + .isEqualTo("measurement-1"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("name")) + .isEqualTo("measurement-2"); + }); + }); + Assertions.assertThat(results.get(3)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("cpu"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("server1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo("13.57"); + Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo("1.4"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_user")) + .isEqualTo("14.06"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_system")) + .isEqualTo("1.7"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("cpu"); + Assertions.assertThat(series2.getTags()).containsOnlyKeys("region", "host"); + Assertions.assertThat(series2.getTags().get("region")).isEqualTo("us-east-1"); + Assertions.assertThat(series2.getTags().get("host")).isEqualTo("server2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time","usage_user","usage_system"); + Assertions.assertThat(series2.getValues()).hasSize(1); + + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_user")) + .isEqualTo("67.91"); + Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_system")) + .isEqualTo("1.3"); + }); + }); + Assertions.assertThat(results.get(4)) + .satisfies(r -> { + Assertions.assertThat(r.getIndex()).isEqualTo(4); + }) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("login"); + Assertions.assertThat(series1.getTags()).containsOnlyKeys("region","host"); + Assertions.assertThat(series1.getTags().get("region")).isEqualTo("eu-west-3"); + Assertions.assertThat(series1.getTags().get("host")).isEqualTo("portal-17"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","user_id","success","stay"); + Assertions.assertThat(series1.getValues()).hasSize(4); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("time")).isEqualTo("2024-06-18T11:29:48.454Z"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("user_id")).isEqualTo("958772110"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(0).getValueByKey("stay")).isEqualTo("1.27"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("time")).isEqualTo("2024-06-18T11:29:47.124Z"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("user_id")).isEqualTo("452223904"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("success")).isEqualTo("false"); + Assertions.assertThat(series1.getValues().get(1).getValueByKey("stay")).isEqualTo("0.0"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("time")).isEqualTo("2024-06-18T11:29:41.881Z"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("user_id")).isEqualTo("71119178"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("success")).isEqualTo("true"); + Assertions.assertThat(series1.getValues().get(3).getValueByKey("stay")).isEqualTo("78.4"); + }); + }); + } + + // Custom + @Test + public void readInfluxQLJSONResultCustomExtractValue(){ + InfluxQLQueryResult.Series.ValueExtractor extractValues = (columnName, rawValue, resultIndex, seriesName) -> { + if (resultIndex == 0 && seriesName.equals("data2")){ + switch (columnName){ + case "time": + return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": + return Double.valueOf(rawValue); + } + } + if(seriesName.equals("login")){ + if (columnName.equals("success")) { + return Boolean.parseBoolean(rawValue); + } + } + return rawValue; + }; + + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(sampleReader, + NO_CANCELLING, + extractValues + ); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(5); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(Instant.ofEpochSecond(1483225200L)); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(2.0); + }); + }); + } + + @Test + public void deserializeNullSeriesJSON(){ + String nullSeriesResponse = "{\"results\":[{\"statement_id\":0}]}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(nullSeriesResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0).getIndex()).isEqualTo(0); + Assertions.assertThat(results.get(0).getSeries()).hasSize(0); + } + + @Test + public void deserializeNullSeriesCSV() throws IOException { + String nullSeriesResponse = "name,tags,time,val1,val2"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(new StringReader(nullSeriesResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0).getIndex()).isEqualTo(0); + Assertions.assertThat(results.get(0).getSeries()).hasSize(0); + } + + @Test + public void deserializeZeroResultJSON() throws IOException { + String zeroResultResponse = "{\"results\":[]}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(zeroResultResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); + } + @Test + public void deserializeZeroResultsCSV() throws IOException { + String nullResponse = ""; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLCSVResult(new StringReader(nullResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); + } + + @Test + public void deserializeEmptyResultJSON(){ + String emptyResultResponse = "{}"; + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLJsonResult(new StringReader(emptyResultResponse), NO_CANCELLING, null); + List results = result.getResults(); + Assertions.assertThat(results).hasSize(0); } } diff --git a/examples/src/main/java/example/InfluxQLExample.java b/examples/src/main/java/example/InfluxQLExample.java index 327c8143ed9..a8bad9b87de 100644 --- a/examples/src/main/java/example/InfluxQLExample.java +++ b/examples/src/main/java/example/InfluxQLExample.java @@ -24,10 +24,15 @@ import java.math.BigDecimal; import java.time.Instant; +import com.influxdb.LogLevel; +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.InfluxQLQueryApi; +import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.InfluxQLQueryResult; public class InfluxQLExample { @@ -35,11 +40,14 @@ public class InfluxQLExample { private static char[] token = "my-token".toCharArray(); private static String org = "my-org"; - private static String database = "my-org"; + private static String database = "my-bucket"; public static void main(final String[] args) { - try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org)) { + try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, database)) { + //influxDBClient.setLogLevel(LogLevel.BODY); // uncomment to inspect communication messages + + write(influxDBClient); // // Query data @@ -48,28 +56,116 @@ public static void main(final String[] args) { InfluxQLQueryApi queryApi = influxDBClient.getInfluxQLQueryApi(); - // send request - InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(influxQL, database).setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS), - (columnName, rawValue, resultIndex, seriesName) -> { + // send request - uses default Accept: application/json and returns RFC3339 timestamp + InfluxQLQueryResult result = queryApi.query( + new InfluxQLQuery(influxQL, database), + (columnName, rawValue, resultIndex, seriesName) -> { // custom valueExtractor // convert columns - switch (columnName) { - case "time": - return Instant.ofEpochSecond(Long.parseLong(rawValue)); - case "first": - return new BigDecimal(rawValue); - default: - throw new IllegalArgumentException("unexpected column " + columnName); + return switch (columnName) { + case "time" -> { + long l = Long.parseLong(rawValue); + yield Instant.ofEpochMilli(l / 1_000_000L); } + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("unexpected column " + columnName); + }; }); - for (InfluxQLQueryResult.Result resultResult : result.getResults()) { - for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { - for (InfluxQLQueryResult.Series.Record record : series.getValues()) { - System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); - } + System.out.println("Default query with valueExtractor"); + dumpResult(result); + + // send request - use Accept: application/csv returns epoch timestamp + result = queryApi.queryCSV( + new InfluxQLQuery(influxQL,database), + (columnName, rawValue, resultIndex, seriesName) -> { // custom valueExtractor + // convert columns + return switch (columnName) { + case "time" -> { + long l = Long.parseLong(rawValue); + yield Instant.ofEpochSecond(l / 1_000_000_000L, + l % 1_000_000_000L); + } + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("unexpected column " + columnName); + }; + }); + + System.out.println("QueryCSV with valueExtractor."); + dumpResult(result); + + result = queryApi.query( + new InfluxQLQuery( + influxQL, + database, + InfluxQLQuery.AcceptHeader.JSON), + (columnName, rawValue, resultIndex, seriesName) -> { + return switch(columnName) { + case "time" -> Instant.parse(rawValue); + case "first" -> Long.parseLong(rawValue); + default -> throw new IllegalArgumentException("Unexpected column " + columnName); + }; + }); + + System.out.println("Query with JSON accept header and valueExtractor"); + dumpResult(result); + + // send request - set `Accept` header in InfluxQLQuery object, use raw results. + // N.B. timestamp returned is Epoch nanos in String format. + result = queryApi.query( + new InfluxQLQuery(influxQL,database) + .setAcceptHeader(InfluxQLQuery.AcceptHeader.CSV) + ); + + System.out.println("Default query method with AcceptHeader.CSV in InfluxQLQuery object. Raw results"); + dumpResult(result); + + // send request - use default `Accept` header (application/json), + // but specify epoch precision, use raw results + result = queryApi.query( + new InfluxQLQuery(influxQL, database) + .setPrecision(InfluxQLQuery.InfluxQLPrecision.MILLISECONDS) + ); + + System.out.println("Default query method with Epoch precision in InfluxQLQuery object. Raw results."); + dumpResult(result); + + } + } + + public static void write(InfluxDBClient influxDBClient){ + WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); + + InfluxQLTestData testData = new InfluxQLTestData(Instant.now().minusSeconds(1)); + + writeApi.writeMeasurement(WritePrecision.NS, testData); + + } + + public static void dumpResult(InfluxQLQueryResult result){ + for (InfluxQLQueryResult.Result resultResult : result.getResults()) { + for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { + for (InfluxQLQueryResult.Series.Record record : series.getValues()) { + System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); } } + } + } + + @Measurement(name = "influxql") + public static class InfluxQLTestData{ + @Column(timestamp = true) + Instant time; + + @Column + Long free; + + @Column(tag = true) + String machine; + public InfluxQLTestData(Instant instant) { + free = (long) (Math.random() * 100); + machine = "test"; + time = instant; } } }