From fe69e4ce805a331c512a15c69e1e90d3732db53f Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 26 Dec 2023 17:36:37 +0800 Subject: [PATCH] refactor: refactor API --- ...kStart.java => StreamWriteQuickStart.java} | 16 ++++----- ...wsQuickStart.java => WriteQuickStart.java} | 18 +++++----- .../src/main/java/io/greptime/GreptimeDB.java | 14 ++++---- .../src/main/java/io/greptime/PojoMapper.java | 12 +++---- .../src/main/java/io/greptime/Write.java | 16 ++++----- .../main/java/io/greptime/WriteClient.java | 30 ++++++++-------- .../java/io/greptime/limit/WriteLimiter.java | 4 +-- .../main/java/io/greptime/models/Column.java | 2 +- .../src/main/java/io/greptime/models/Err.java | 6 ++-- .../main/java/io/greptime/models/Metric.java | 2 +- .../models/{TableRows.java => Table.java} | 36 +++++++++---------- ...{TableRowsHelper.java => TableHelper.java} | 12 +++---- .../java/io/greptime/models/WriteTable.java | 6 ++-- .../test/java/io/greptime/PojoMapperTest.java | 8 ++--- .../src/test/java/io/greptime/TestUtil.java | 8 ++--- .../java/io/greptime/WriteClientTest.java | 28 +++------------ .../test/java/io/greptime/WriteLimitTest.java | 12 +++---- .../{TableRowsTest.java => TableTest.java} | 22 ++++++------ 18 files changed, 117 insertions(+), 135 deletions(-) rename ingester-example/src/main/java/io/greptime/{StreamWriteTableRowsQuickStart.java => StreamWriteQuickStart.java} (90%) rename ingester-example/src/main/java/io/greptime/{WriteTableRowsQuickStart.java => WriteQuickStart.java} (89%) rename ingester-protocol/src/main/java/io/greptime/models/{TableRows.java => Table.java} (83%) rename ingester-protocol/src/main/java/io/greptime/models/{TableRowsHelper.java => TableHelper.java} (92%) rename ingester-protocol/src/test/java/io/greptime/models/{TableRowsTest.java => TableTest.java} (79%) diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java similarity index 90% rename from ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java rename to ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index 6899bc0..ea76efe 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteTableRowsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -17,7 +17,7 @@ import io.greptime.models.DataType; import io.greptime.models.SemanticType; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; import io.greptime.options.GreptimeOptions; @@ -32,9 +32,9 @@ /** * @author jiachun.fjc */ -public class StreamWriteTableRowsQuickStart { +public class StreamWriteQuickStart { - private static final Logger LOG = LoggerFactory.getLogger(StreamWriteTableRowsQuickStart.class); + private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { // GreptimeDB has a default database named "public", we can use it as the test database @@ -67,8 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc .addColumn("field2", SemanticType.Field, DataType.Float64) // .build(); - TableRows myMetric3Rows = TableRows.from(myMetric3Schema); - TableRows myMetric4Rows = TableRows.from(myMetric4Schema); + Table myMetric3Rows = Table.from(myMetric3Schema); + Table myMetric4Rows = Table.from(myMetric4Schema); for (int i = 0; i < 10; i++) { String tag1v = "tag_value_1_" + i; @@ -80,7 +80,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc BigDecimal field3 = new BigDecimal(i); int field4 = i + 1; - myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); } for (int i = 0; i < 10; i++) { @@ -90,10 +90,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc Date field1 = Calendar.getInstance().getTime(); double field2 = i + 0.1; - myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2); + myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2); } - StreamWriter writer = greptimeDB.streamWriter(); + StreamWriter writer = greptimeDB.streamWriter(); // write data into stream writer.write(myMetric3Rows); diff --git a/ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java similarity index 89% rename from ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java rename to ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 8e49652..7f03ffa 100644 --- a/ingester-example/src/main/java/io/greptime/WriteTableRowsQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -19,7 +19,7 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.SemanticType; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; import io.greptime.options.GreptimeOptions; @@ -37,9 +37,9 @@ /** * @author jiachun.fjc */ -public class WriteTableRowsQuickStart { +public class WriteQuickStart { - private static final Logger LOG = LoggerFactory.getLogger(WriteTableRowsQuickStart.class); + private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class); public static void main(String[] args) throws ExecutionException, InterruptedException { // GreptimeDB has a default database named "public", we can use it as the test database @@ -72,8 +72,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc .addColumn("field2", SemanticType.Field, DataType.Float64) // .build(); - TableRows myMetric3Rows = TableRows.from(myMetric3Schema); - TableRows myMetric4Rows = TableRows.from(myMetric4Schema); + Table myMetric3Rows = Table.from(myMetric3Schema); + Table myMetric4Rows = Table.from(myMetric4Schema); for (int i = 0; i < 10; i++) { String tag1v = "tag_value_1_" + i; @@ -85,7 +85,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc BigDecimal field3 = new BigDecimal(i); int field4 = i + 1; - myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); } for (int i = 0; i < 10; i++) { @@ -95,10 +95,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc Date field1 = Calendar.getInstance().getTime(); double field2 = i + 0.1; - myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2); + myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2); } - Collection rows = Arrays.asList(myMetric3Rows, myMetric4Rows); + Collection rows = Arrays.asList(myMetric3Rows, myMetric4Rows); // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain @@ -113,7 +113,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5)); + List
delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5)); Result deletes = greptimeDB.write(delete_pojos, WriteOp.Delete).get(); if (deletes.isOk()) { diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 59712f6..fbfd718 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -26,7 +26,7 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.options.GreptimeOptions; import io.greptime.options.RouterOptions; import io.greptime.options.WriteOptions; @@ -139,20 +139,20 @@ public void ensureInitialized() { @Override public CompletableFuture> writePOJOs(Collection> pojos, WriteOp writeOp, Context ctx) { - List rows = new ArrayList<>(pojos.size()); + List
rows = new ArrayList<>(pojos.size()); for (List pojo : pojos) { - rows.add(this.pojoMapper.toTableRows(pojo)); + rows.add(this.pojoMapper.toTableData(pojo)); } return write(rows, writeOp, ctx); } @Override public StreamWriter, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, Context ctx) { - StreamWriter delegate = streamWriter(maxPointsPerSecond, ctx); + StreamWriter delegate = streamWriter(maxPointsPerSecond, ctx); return new StreamWriter, WriteOk>() { @Override public StreamWriter, WriteOk> write(List val, WriteOp writeOp) { - TableRows rows = pojoMapper.toTableRows(val); + Table rows = pojoMapper.toTableData(val); delegate.write(rows, writeOp); return this; } @@ -165,13 +165,13 @@ public CompletableFuture completed() { } @Override - public CompletableFuture> write(Collection rows, WriteOp writeOp, Context ctx) { + public CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx) { ensureInitialized(); return this.writeClient.write(rows, writeOp, attachCtx(ctx)); } @Override - public StreamWriter streamWriter(int maxPointsPerSecond, Context ctx) { + public StreamWriter streamWriter(int maxPointsPerSecond, Context ctx) { return this.writeClient.streamWriter(maxPointsPerSecond, attachCtx(ctx)); } diff --git a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java index 390648c..210f364 100644 --- a/ingester-protocol/src/main/java/io/greptime/PojoMapper.java +++ b/ingester-protocol/src/main/java/io/greptime/PojoMapper.java @@ -21,7 +21,7 @@ import io.greptime.models.DataType; import io.greptime.models.Metric; import io.greptime.models.SemanticType; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.TableSchema; import java.lang.reflect.Field; import java.util.HashMap; @@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentMap; /** - * This utility class converts POJO classes into {@link io.greptime.models.TableRows} objects, + * This utility class converts POJO classes into {@link Table} objects, * inspired by InfluxDB client-java. * * @author jiachun.fjc @@ -45,7 +45,7 @@ public PojoMapper(int maxCachedPOJOs) { this.maxCachedPOJOs = maxCachedPOJOs; } - public TableRows toTableRows(List pojos) { + public Table toTableData(List pojos) { Ensures.ensureNonNull(pojos, "pojos"); Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty"); @@ -72,7 +72,7 @@ public TableRows toTableRows(List pojos) { schemaBuilder.addColumn(name, semanticType, dataType); } - TableRows tableRows = TableRows.from(schemaBuilder.build()); + Table table = Table.from(schemaBuilder.build()); for (M pojo : pojos) { Class type = pojo.getClass(); if (!type.equals(metricType)) { @@ -88,10 +88,10 @@ public TableRows toTableRows(List pojos) { j++; } - tableRows.insert(values); + table.addRow(values); } - return tableRows; + return table; } private String getMetricName(Class metricType) { diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index e9a11a4..3a8b12e 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -18,7 +18,7 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.rpc.Context; import java.util.Collection; @@ -34,14 +34,14 @@ public interface Write { /** * @see #write(Collection, WriteOp, Context) */ - default CompletableFuture> write(Collection rows) { + default CompletableFuture> write(Collection
rows) { return write(rows, WriteOp.Insert, Context.newDefault()); } /** * @see #write(Collection, WriteOp, Context) */ - default CompletableFuture> write(Collection rows, WriteOp writeOp) { + default CompletableFuture> write(Collection
rows, WriteOp writeOp) { return write(rows, writeOp, Context.newDefault()); } @@ -53,29 +53,29 @@ default CompletableFuture> write(Collection rows * @param ctx invoke context * @return write result */ - CompletableFuture> write(Collection rows, WriteOp writeOp, Context ctx); + CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx); /** * @see #streamWriter(int, Context) */ - default StreamWriter streamWriter() { + default StreamWriter streamWriter() { return streamWriter(-1); } /** * @see #streamWriter(int, Context) */ - default StreamWriter streamWriter(int maxPointsPerSecond) { + default StreamWriter streamWriter(int maxPointsPerSecond) { return streamWriter(maxPointsPerSecond, Context.newDefault()); } /** - * Create a `Stream` to write `TableRows` data. + * Create a `Stream` to write `Table` data. * * @param maxPointsPerSecond The max number of points that can be written per second, * exceeding which may cause blockage. * @param ctx invoke context * @return a stream writer instance */ - StreamWriter streamWriter(int maxPointsPerSecond, Context ctx); + StreamWriter streamWriter(int maxPointsPerSecond, Context ctx); } diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index 253a5f1..3b20cfe 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -35,8 +35,8 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.models.TableRows; -import io.greptime.models.TableRowsHelper; +import io.greptime.models.Table; +import io.greptime.models.TableHelper; import io.greptime.models.WriteTable; import io.greptime.options.WriteOptions; import io.greptime.rpc.Context; @@ -79,7 +79,7 @@ public void shutdownGracefully() { } @Override - public CompletableFuture> write(Collection rows, WriteOp writeOp, Context ctx) { + public CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx) { Ensures.ensureNonNull(rows, "null `rows`"); Ensures.ensure(!rows.isEmpty(), "empty `rows`"); @@ -108,7 +108,7 @@ public CompletableFuture> write(Collection rows, } @Override - public StreamWriter streamWriter(int maxPointsPerSecond, Context ctx) { + public StreamWriter streamWriter(int maxPointsPerSecond, Context ctx) { int permitsPerSecond = maxPointsPerSecond > 0 ? maxPointsPerSecond : this.opts.getDefaultStreamMaxWritePointsPerSecond(); CompletableFuture respFuture = new CompletableFuture<>(); @@ -118,7 +118,7 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Con .thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) { @Override - public StreamWriter write(TableRows rows, WriteOp writeOp) { + public StreamWriter write(Table rows, WriteOp writeOp) { if (respFuture.isCompletedExceptionally()) { respFuture.getNow(null); // throw the exception now } @@ -133,7 +133,7 @@ public CompletableFuture completed() { }).join(); } - private CompletableFuture> write0(Collection rows, WriteOp writeOp, Context ctx, int retries) { + private CompletableFuture> write0(Collection
rows, WriteOp writeOp, Context ctx, int retries) { InnerMetricHelper.writeByRetries(retries).mark(); return this.routerClient.route() @@ -159,10 +159,10 @@ private CompletableFuture> write0(Collection row }, this.asyncPool); } - private CompletableFuture> writeTo(Endpoint endpoint, Collection rows, WriteOp writeOp, Context ctx, int retries) { + private CompletableFuture> writeTo(Endpoint endpoint, Collection
rows, WriteOp writeOp, Context ctx, int retries) { String database = this.opts.getDatabase(); AuthInfo authInfo = this.opts.getAuthInfo(); - Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo); + Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo); ctx.with("retries", retries); CompletableFuture future = this.routerClient.invoke(endpoint, req, ctx); @@ -211,11 +211,11 @@ public void onCompleted() { @Override public void onNext(WriteTable writeTable) { - TableRows rows = writeTable.getRows(); + Table rows = writeTable.getRows(); WriteOp writeOp = writeTable.getWriteOp(); String database = WriteClient.this.opts.getDatabase(); AuthInfo authInfo = WriteClient.this.opts.getAuthInfo(); - Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo); + Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo); rpcObserver.onNext(req); } @@ -306,12 +306,12 @@ public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) { } @Override - public int calculatePermits(Collection in) { - return in.stream().map(TableRows::rowCount).reduce(0, Integer::sum); + public int calculatePermits(Collection
in) { + return in.stream().map(Table::rowCount).reduce(0, Integer::sum); } @Override - public Result rejected(Collection in, RejectedState state) { + public Result rejected(Collection
in, RejectedState state) { String errMsg = String.format("Write limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", // state.acquirePermits(), // @@ -322,7 +322,7 @@ public Result rejected(Collection in, RejectedState sta } @SuppressWarnings("UnstableApiUsage") - static abstract class RateLimitingStreamWriter implements StreamWriter { + static abstract class RateLimitingStreamWriter implements StreamWriter { private final Observer observer; private final RateLimiter rateLimiter; @@ -337,7 +337,7 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(TableRows rows, WriteOp writeOp) { + public StreamWriter write(Table rows, WriteOp writeOp) { Ensures.ensureNonNull(rows, "null `rows`"); if (this.rateLimiter != null) { diff --git a/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java b/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java index 0dded72..83b0873 100644 --- a/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java +++ b/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java @@ -18,7 +18,7 @@ import io.greptime.models.Err; import io.greptime.models.Result; import io.greptime.models.WriteOk; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import java.util.Collection; /** @@ -26,7 +26,7 @@ * * @author jiachun.fjc */ -public abstract class WriteLimiter extends AbstractLimiter, Result> { +public abstract class WriteLimiter extends AbstractLimiter, Result> { public WriteLimiter(int maxInFlight, LimitedPolicy policy, String metricPrefix) { super(maxInFlight, policy, metricPrefix); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Column.java b/ingester-protocol/src/main/java/io/greptime/models/Column.java index f9845fe..f711fa9 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Column.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Column.java @@ -21,7 +21,7 @@ import java.lang.annotation.Target; /** - * This annotation is used to mapping POJO class into {@link io.greptime.models.TableRows}. + * This annotation is used to mapping POJO class into {@link Table}. * * @author jiachun.fjc */ diff --git a/ingester-protocol/src/main/java/io/greptime/models/Err.java b/ingester-protocol/src/main/java/io/greptime/models/Err.java index b91e2d6..c4cdbfe 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Err.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Err.java @@ -31,7 +31,7 @@ public class Err { // the server address where the error occurred private Endpoint errTo; // the data of wrote failed, can be used to retry - private Collection rowsFailed; + private Collection
rowsFailed; // the QL failed to query private String failedQl; @@ -59,7 +59,7 @@ public Endpoint getErrTo() { /** * Returns the data of wrote failed, can be used to retry. */ - public Collection getRowsFailed() { + public Collection
getRowsFailed() { return rowsFailed; } @@ -96,7 +96,7 @@ public String toString() { * @param rowsFailed the data of wrote failed, can be used to retry * @return a new {@link Err} for write error */ - public static Err writeErr(int code, Throwable error, Endpoint errTo, Collection rowsFailed) { + public static Err writeErr(int code, Throwable error, Endpoint errTo, Collection
rowsFailed) { Err err = new Err(); err.code = code; err.error = error; diff --git a/ingester-protocol/src/main/java/io/greptime/models/Metric.java b/ingester-protocol/src/main/java/io/greptime/models/Metric.java index 6408d7d..e1e2e68 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Metric.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Metric.java @@ -21,7 +21,7 @@ import java.lang.annotation.Target; /** - * This annotation is used to mapping POJO class into {@link io.greptime.models.TableRows}. + * This annotation is used to mapping POJO class into {@link Table}. * * @author jiachun.fjc */ diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableRows.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java similarity index 83% rename from ingester-protocol/src/main/java/io/greptime/models/TableRows.java rename to ingester-protocol/src/main/java/io/greptime/models/Table.java index c6940ea..8f3681d 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableRows.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -29,7 +29,7 @@ * * @author jiachun.fjc */ -public interface TableRows { +public interface Table { /** * The table name to write. @@ -56,9 +56,9 @@ default int pointCount() { /** * Insets one row. */ - TableRows insert(Object... values); + Table addRow(Object... values); - TableRows subRange(int fromIndex, int toIndex); + Table subRange(int fromIndex, int toIndex); /** * Convert to {@link Database.RowInsertRequest}. @@ -79,7 +79,7 @@ default void checkNumValues(int len) { Ensures.ensure(columnCount == len, "Expected values num: %d, actual: %d", columnCount, len); } - static TableRows from(TableSchema tableSchema) { + static Table from(TableSchema tableSchema) { return new Builder(tableSchema).build(); } @@ -90,7 +90,7 @@ public Builder(TableSchema tableSchema) { this.tableSchema = tableSchema; } - public TableRows build() { + public Table build() { String tableName = this.tableSchema.getTableName(); List columnNames = this.tableSchema.getColumnNames(); List semanticTypes = this.tableSchema.getSemanticTypes(); @@ -112,13 +112,13 @@ public TableRows build() { return buildRow(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions); } - private static TableRows buildRow(String tableName, // - int columnCount, // - List columnNames, // - List semanticTypes, // - List dataTypes, // - List dataTypeExtensions) { - RowBasedTableRows rows = new RowBasedTableRows(); + private static Table buildRow(String tableName, // + int columnCount, // + List columnNames, // + List semanticTypes, // + List dataTypes, // + List dataTypeExtensions) { + RowBasedTable rows = new RowBasedTable(); rows.tableName = tableName; rows.columnSchemas = new ArrayList<>(columnCount); @@ -134,18 +134,18 @@ private static TableRows buildRow(String tableName, // } } - class RowBasedTableRows implements TableRows, Into { + class RowBasedTable implements Table, Into { private String tableName; private List columnSchemas; private final List rows; - public RowBasedTableRows() { + public RowBasedTable() { this.rows = new ArrayList<>(); } - private RowBasedTableRows(String tableName, List columnSchemas, List rows) { + private RowBasedTable(String tableName, List columnSchemas, List rows) { this.tableName = tableName; this.columnSchemas = columnSchemas; this.rows = rows; @@ -167,7 +167,7 @@ public int columnCount() { } @Override - public TableRows insert(Object... values) { + public Table addRow(Object... values) { checkNumValues(values.length); RowData.Row.Builder rowBuilder = RowData.Row.newBuilder(); @@ -182,9 +182,9 @@ public TableRows insert(Object... values) { } @Override - public TableRows subRange(int fromIndex, int toIndex) { + public Table subRange(int fromIndex, int toIndex) { List rows = this.rows.subList(fromIndex, toIndex); - return new RowBasedTableRows(this.tableName, this.columnSchemas, rows); + return new RowBasedTable(this.tableName, this.columnSchemas, rows); } @Override diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableRowsHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java similarity index 92% rename from ingester-protocol/src/main/java/io/greptime/models/TableRowsHelper.java rename to ingester-protocol/src/main/java/io/greptime/models/TableHelper.java index 16a50a8..14f4295 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableRowsHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java @@ -24,16 +24,16 @@ /** * @author jiachun.fjc */ -public class TableRowsHelper { +public class TableHelper { - public static Database.GreptimeRequest toGreptimeRequest(TableRows rows, // + public static Database.GreptimeRequest toGreptimeRequest(Table rows, // WriteOp writeOp, // String database, // AuthInfo authInfo) { return toGreptimeRequest(Collections.singleton(rows), writeOp, database, authInfo); } - public static Database.GreptimeRequest toGreptimeRequest(Collection rows, // + public static Database.GreptimeRequest toGreptimeRequest(Collection
rows, // WriteOp writeOp, // String database, // AuthInfo authInfo) { @@ -48,7 +48,7 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection r switch (writeOp) { case Insert: Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder(); - for (TableRows r : rows) { + for (Table r : rows) { insertBuilder.addInserts(r.intoRowInsertRequest()); } return Database.GreptimeRequest.newBuilder() // @@ -57,7 +57,7 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection r .build(); case Delete: Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder(); - for (TableRows r : rows) { + for (Table r : rows) { deleteBuilder.addDeletes(r.intoRowDeleteRequest()); } return Database.GreptimeRequest.newBuilder() // @@ -69,5 +69,5 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection r } } - private TableRowsHelper() {} + private TableHelper() {} } diff --git a/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java b/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java index b84e08e..4a0eb3a 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java +++ b/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java @@ -21,15 +21,15 @@ * @author jiachun.fjc */ public class WriteTable { - private final TableRows rows; + private final Table rows; private final WriteOp writeOp; - public WriteTable(TableRows rows, WriteOp writeOp) { + public WriteTable(Table rows, WriteOp writeOp) { this.rows = rows; this.writeOp = writeOp; } - public TableRows getRows() { + public Table getRows() { return rows; } diff --git a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java index 3eeca9e..8cefe17 100644 --- a/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java +++ b/ingester-protocol/src/test/java/io/greptime/PojoMapperTest.java @@ -18,7 +18,7 @@ import io.greptime.models.Column; import io.greptime.models.DataType; import io.greptime.models.Metric; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -31,13 +31,13 @@ public class PojoMapperTest { @Test - public void testToTableRows() { + public void testToTable() { List pojos1 = new ArrayList<>(); for (int i = 0; i < 10; i++) { Pojo1Test pojo1 = createNewPojo1Test(); pojos1.add(pojo1); } - TableRows tp1 = new PojoMapper(65536).toTableRows(pojos1); + Table tp1 = new PojoMapper(65536).toTableData(pojos1); Assert.assertEquals("pojo1", tp1.tableName()); Assert.assertEquals(50, tp1.pointCount()); @@ -47,7 +47,7 @@ public void testToTableRows() { Pojo2Test pojo2 = createNewPojo2Test(); pojos2.add(pojo2); } - TableRows tp2 = new PojoMapper(65536).toTableRows(pojos2); + Table tp2 = new PojoMapper(65536).toTableData(pojos2); Assert.assertEquals("pojo2", tp2.tableName()); Assert.assertEquals(30, tp2.pointCount()); } diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java index 58eae65..b60e918 100644 --- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java +++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java @@ -17,7 +17,7 @@ import io.greptime.models.DataType; import io.greptime.models.SemanticType; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.TableSchema; import java.util.Collection; import java.util.Collections; @@ -27,16 +27,16 @@ */ public class TestUtil { - public static Collection testTableRows(String tableName, int rowCount) { + public static Collection
testTable(String tableName, int rowCount) { TableSchema tableSchema = TableSchema.newBuilder(tableName) // .addColumn("host", SemanticType.Tag, DataType.String) // .addColumn("ts", SemanticType.Timestamp, DataType.TimestampMillisecond) // .addColumn("cpu", SemanticType.Field, DataType.Float64) // .build(); - TableRows rows = TableRows.from(tableSchema); + Table rows = Table.from(tableSchema); for (int i = 0; i < rowCount; i++) { - rows.insert("127.0.0.1", System.currentTimeMillis(), i); + rows.addRow("127.0.0.1", System.currentTimeMillis(), i); } return Collections.singleton(rows); } diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index a0f6217..e0beaff 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -19,8 +19,7 @@ import io.greptime.models.DataType; import io.greptime.models.Err; import io.greptime.models.Result; -import io.greptime.models.SemanticType; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; import io.greptime.options.WriteOptions; @@ -37,23 +36,6 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; -import static io.greptime.models.DataType.Binary; -import static io.greptime.models.DataType.Bool; -import static io.greptime.models.DataType.Date; -import static io.greptime.models.DataType.DateTime; -import static io.greptime.models.DataType.Float32; -import static io.greptime.models.DataType.Float64; -import static io.greptime.models.DataType.Int16; -import static io.greptime.models.DataType.Int32; -import static io.greptime.models.DataType.Int64; -import static io.greptime.models.DataType.Int8; -import static io.greptime.models.DataType.TimestampMillisecond; -import static io.greptime.models.DataType.TimestampNanosecond; -import static io.greptime.models.DataType.TimestampSecond; -import static io.greptime.models.DataType.UInt16; -import static io.greptime.models.DataType.UInt32; -import static io.greptime.models.DataType.UInt64; -import static io.greptime.models.DataType.UInt8; import static io.greptime.models.SemanticType.Field; import static io.greptime.models.SemanticType.Tag; import static io.greptime.models.SemanticType.Timestamp; @@ -106,12 +88,12 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException { .addColumn("field16", Field, DataType.TimestampMillisecond) // .addColumn("field17", Field, DataType.TimestampNanosecond) // .build(); - TableRows rows = TableRows.from(schema); + Table rows = Table.from(schema); long ts = System.currentTimeMillis(); - rows.insert("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); - rows.insert("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15); - rows.insert("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15); + rows.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); + rows.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15); + rows.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15); Endpoint addr = Endpoint.parse("127.0.0.1:8081"); Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() // diff --git a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java index 21fe63e..2e61022 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteLimitTest.java @@ -20,7 +20,7 @@ import io.greptime.limit.WriteLimiter; import io.greptime.models.Err; import io.greptime.models.Result; -import io.greptime.models.TableRows; +import io.greptime.models.Table; import io.greptime.models.WriteOk; import org.junit.Assert; import org.junit.Test; @@ -38,7 +38,7 @@ public class WriteLimitTest { @Test(expected = LimitedException.class) public void abortWriteLimitTest() throws ExecutionException, InterruptedException { WriteLimiter limiter = new WriteClient.DefaultWriteLimiter(1, new LimitedPolicy.AbortPolicy()); - Collection rows = TestUtil.testTableRows("test1", 1); + Collection
rows = TestUtil.testTable("test1", 1); // consume the permits limiter.acquireAndDo(rows, CompletableFuture::new); @@ -49,7 +49,7 @@ public void abortWriteLimitTest() throws ExecutionException, InterruptedExceptio @Test public void discardWriteLimitTest() throws ExecutionException, InterruptedException { WriteLimiter limiter = new WriteClient.DefaultWriteLimiter(1, new LimitedPolicy.DiscardPolicy()); - Collection rows = TestUtil.testTableRows("test1", 1); + Collection
rows = TestUtil.testTable("test1", 1); // consume the permits limiter.acquireAndDo(rows, CompletableFuture::new); @@ -63,7 +63,7 @@ public void discardWriteLimitTest() throws ExecutionException, InterruptedExcept @Test public void blockingWriteLimitTest() throws InterruptedException { WriteLimiter limiter = new WriteClient.DefaultWriteLimiter(1, new LimitedPolicy.BlockingPolicy()); - Collection rows = TestUtil.testTableRows("test1", 1); + Collection
rows = TestUtil.testTable("test1", 1); // consume the permits limiter.acquireAndDo(rows, CompletableFuture::new); @@ -93,7 +93,7 @@ public void blockingTimeoutWriteLimitTest() throws ExecutionException, Interrupt int timeoutSecs = 2; WriteLimiter limiter = new WriteClient.DefaultWriteLimiter(1, new LimitedPolicy.BlockingTimeoutPolicy(timeoutSecs, TimeUnit.SECONDS)); - Collection rows = TestUtil.testTableRows("test1", 1); + Collection
rows = TestUtil.testTable("test1", 1); // consume the permits limiter.acquireAndDo(rows, CompletableFuture::new); @@ -111,7 +111,7 @@ public void abortOnBlockingTimeoutWriteLimitTest() throws ExecutionException, In int timeoutSecs = 2; WriteLimiter limiter = new WriteClient.DefaultWriteLimiter(1, new LimitedPolicy.AbortOnBlockingTimeoutPolicy(timeoutSecs, TimeUnit.SECONDS)); - Collection rows = TestUtil.testTableRows("test1", 1); + Collection
rows = TestUtil.testTable("test1", 1); // consume the permits limiter.acquireAndDo(rows, CompletableFuture::new); diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java similarity index 79% rename from ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java rename to ingester-protocol/src/test/java/io/greptime/models/TableTest.java index ddd2057..ed0a6d4 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/TableRowsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java @@ -22,20 +22,20 @@ /** * @author jiachun.fjc */ -public class TableRowsTest { +public class TableTest { @Test - public void testWriteRowsNonNull() { + public void testTableNonNull() { TableSchema schema = TableSchema.newBuilder("test_table") // .addColumn("col1", SemanticType.Tag, DataType.String) // .addColumn("col2", SemanticType.Tag, DataType.String) // .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.from(schema); - rows.insert("1", "11", 111) // - .insert("2", "22", 222) // - .insert("3", "33", 333); + Table.RowBasedTable rows = (Table.RowBasedTable) Table.from(schema); + rows.addRow("1", "11", 111) // + .addRow("2", "22", 222) // + .addRow("3", "33", 333); Assert.assertEquals(3, rows.rowCount()); RowData.Rows rawRows = rows.into(); @@ -45,17 +45,17 @@ public void testWriteRowsNonNull() { } @Test - public void testWriteRowsSomeNull() { + public void testTableSomeNull() { TableSchema schema = TableSchema.newBuilder("test_table") // .addColumn("col1", SemanticType.Tag, DataType.String) // .addColumn("col2", SemanticType.Tag, DataType.String) // .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - TableRows.RowBasedTableRows rows = (TableRows.RowBasedTableRows) TableRows.from(schema); - rows.insert("1", "11", 111) // - .insert("2", null, 222) // - .insert("3", "33", null); + Table.RowBasedTable rows = (Table.RowBasedTable) Table.from(schema); + rows.addRow("1", "11", 111) // + .addRow("2", null, 222) // + .addRow("3", "33", null); Assert.assertEquals(3, rows.rowCount()); RowData.Rows rawRows = rows.into();