Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor API #9

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.greptime.models.DataType;
import io.greptime.models.SemanticType;
import io.greptime.models.TableRows;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
Expand All @@ -32,9 +32,9 @@
/**
* @author jiachun.fjc
*/
public class StreamWriteTableRowsQuickStart {
public class StreamWriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(StreamWriteTableRowsQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "public", we can use it as the test database
Expand Down Expand Up @@ -67,8 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
.addColumn("field2", SemanticType.Field, DataType.Float64) //
.build();

TableRows myMetric3Rows = TableRows.from(myMetric3Schema);
TableRows myMetric4Rows = TableRows.from(myMetric4Schema);
Table myMetric3Rows = Table.from(myMetric3Schema);
Table myMetric4Rows = Table.from(myMetric4Schema);

for (int i = 0; i < 10; i++) {
String tag1v = "tag_value_1_" + i;
Expand All @@ -80,7 +80,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
BigDecimal field3 = new BigDecimal(i);
int field4 = i + 1;

myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
}

for (int i = 0; i < 10; i++) {
Expand All @@ -90,10 +90,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
Date field1 = Calendar.getInstance().getTime();
double field2 = i + 0.1;

myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2);
myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2);
}

StreamWriter<TableRows, WriteOk> writer = greptimeDB.streamWriter();
StreamWriter<Table, WriteOk> writer = greptimeDB.streamWriter();

// write data into stream
writer.write(myMetric3Rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.SemanticType;
import io.greptime.models.TableRows;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
import io.greptime.options.GreptimeOptions;
Expand All @@ -37,9 +37,9 @@
/**
* @author jiachun.fjc
*/
public class WriteTableRowsQuickStart {
public class WriteQuickStart {

private static final Logger LOG = LoggerFactory.getLogger(WriteTableRowsQuickStart.class);
private static final Logger LOG = LoggerFactory.getLogger(WriteQuickStart.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
// GreptimeDB has a default database named "public", we can use it as the test database
Expand Down Expand Up @@ -72,8 +72,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
.addColumn("field2", SemanticType.Field, DataType.Float64) //
.build();

TableRows myMetric3Rows = TableRows.from(myMetric3Schema);
TableRows myMetric4Rows = TableRows.from(myMetric4Schema);
Table myMetric3Rows = Table.from(myMetric3Schema);
Table myMetric4Rows = Table.from(myMetric4Schema);

for (int i = 0; i < 10; i++) {
String tag1v = "tag_value_1_" + i;
Expand All @@ -85,7 +85,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
BigDecimal field3 = new BigDecimal(i);
int field4 = i + 1;

myMetric3Rows.insert(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
}

for (int i = 0; i < 10; i++) {
Expand All @@ -95,10 +95,10 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
Date field1 = Calendar.getInstance().getTime();
double field2 = i + 0.1;

myMetric4Rows.insert(tag1v, tag2v, ts, field1, field2);
myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2);
}

Collection<TableRows> rows = Arrays.asList(myMetric3Rows, myMetric4Rows);
Collection<Table> rows = Arrays.asList(myMetric3Rows, myMetric4Rows);

// For performance reasons, the SDK is designed to be purely asynchronous.
// The return value is a future object. If you want to immediately obtain
Expand All @@ -113,7 +113,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
LOG.error("Failed to write: {}", result.getErr());
}

List<TableRows> delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5));
List<Table> delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5));
Result<WriteOk, Err> deletes = greptimeDB.write(delete_pojos, WriteOp.Delete).get();

if (deletes.isOk()) {
Expand Down
14 changes: 7 additions & 7 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.Table;
import io.greptime.options.GreptimeOptions;
import io.greptime.options.RouterOptions;
import io.greptime.options.WriteOptions;
Expand Down Expand Up @@ -139,20 +139,20 @@ public void ensureInitialized() {

@Override
public CompletableFuture<Result<WriteOk, Err>> writePOJOs(Collection<List<?>> pojos, WriteOp writeOp, Context ctx) {
List<TableRows> rows = new ArrayList<>(pojos.size());
List<Table> rows = new ArrayList<>(pojos.size());
for (List<?> pojo : pojos) {
rows.add(this.pojoMapper.toTableRows(pojo));
rows.add(this.pojoMapper.toTableData(pojo));
}
return write(rows, writeOp, ctx);
}

@Override
public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, Context ctx) {
StreamWriter<TableRows, WriteOk> delegate = streamWriter(maxPointsPerSecond, ctx);
StreamWriter<Table, WriteOk> delegate = streamWriter(maxPointsPerSecond, ctx);
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
TableRows rows = pojoMapper.toTableRows(val);
Table rows = pojoMapper.toTableData(val);
delegate.write(rows, writeOp);
return this;
}
Expand All @@ -165,13 +165,13 @@ public CompletableFuture<WriteOk> completed() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
ensureInitialized();
return this.writeClient.write(rows, writeOp, attachCtx(ctx));
}

@Override
public StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx) {
public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx) {
return this.writeClient.streamWriter(maxPointsPerSecond, attachCtx(ctx));
}

Expand Down
12 changes: 6 additions & 6 deletions ingester-protocol/src/main/java/io/greptime/PojoMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.greptime.models.DataType;
import io.greptime.models.Metric;
import io.greptime.models.SemanticType;
import io.greptime.models.TableRows;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import java.lang.reflect.Field;
import java.util.HashMap;
Expand All @@ -31,7 +31,7 @@
import java.util.concurrent.ConcurrentMap;

/**
* This utility class converts POJO classes into {@link io.greptime.models.TableRows} objects,
* This utility class converts POJO classes into {@link Table} objects,
* inspired by <a href="https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/internal/MeasurementMapper.java">InfluxDB client-java</a>.
*
* @author jiachun.fjc
Expand All @@ -45,7 +45,7 @@ public PojoMapper(int maxCachedPOJOs) {
this.maxCachedPOJOs = maxCachedPOJOs;
}

public <M> TableRows toTableRows(List<M> pojos) {
public <M> Table toTableData(List<M> pojos) {
Ensures.ensureNonNull(pojos, "pojos");
Ensures.ensure(!pojos.isEmpty(), "pojos can not be empty");

Expand All @@ -72,7 +72,7 @@ public <M> TableRows toTableRows(List<M> pojos) {
schemaBuilder.addColumn(name, semanticType, dataType);
}

TableRows tableRows = TableRows.from(schemaBuilder.build());
Table table = Table.from(schemaBuilder.build());
for (M pojo : pojos) {
Class<?> type = pojo.getClass();
if (!type.equals(metricType)) {
Expand All @@ -88,10 +88,10 @@ public <M> TableRows toTableRows(List<M> pojos) {

j++;
}
tableRows.insert(values);
table.addRow(values);
}

return tableRows;
return table;
}

private String getMetricName(Class<?> metricType) {
Expand Down
16 changes: 8 additions & 8 deletions ingester-protocol/src/main/java/io/greptime/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.Table;
import io.greptime.rpc.Context;

import java.util.Collection;
Expand All @@ -34,14 +34,14 @@ public interface Write {
/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows) {
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows) {
return write(rows, WriteOp.Insert, Context.newDefault());
}

/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp) {
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp) {
return write(rows, writeOp, Context.newDefault());
}

Expand All @@ -53,29 +53,29 @@ default CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx);
CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx);

/**
* @see #streamWriter(int, Context)
*/
default StreamWriter<TableRows, WriteOk> streamWriter() {
default StreamWriter<Table, WriteOk> streamWriter() {
return streamWriter(-1);
}

/**
* @see #streamWriter(int, Context)
*/
default StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond) {
default StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond) {
return streamWriter(maxPointsPerSecond, Context.newDefault());
}

/**
* Create a `Stream` to write `TableRows` data.
* Create a `Stream` to write `Table` data.
*
* @param maxPointsPerSecond The max number of points that can be written per second,
* exceeding which may cause blockage.
* @param ctx invoke context
* @return a stream writer instance
*/
StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx);
StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx);
}
30 changes: 15 additions & 15 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.TableRowsHelper;
import io.greptime.models.Table;
import io.greptime.models.TableHelper;
import io.greptime.models.WriteTable;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void shutdownGracefully() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows, WriteOp writeOp, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(rows, "null `rows`");
Ensures.ensure(!rows.isEmpty(), "empty `rows`");

Expand Down Expand Up @@ -108,7 +108,7 @@ public CompletableFuture<Result<WriteOk, Err>> write(Collection<TableRows> rows,
}

@Override
public StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx) {
public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx) {
int permitsPerSecond = maxPointsPerSecond > 0 ? maxPointsPerSecond : this.opts.getDefaultStreamMaxWritePointsPerSecond();

CompletableFuture<WriteOk> respFuture = new CompletableFuture<>();
Expand All @@ -118,7 +118,7 @@ public StreamWriter<TableRows, WriteOk> streamWriter(int maxPointsPerSecond, Con
.thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) {

@Override
public StreamWriter<TableRows, WriteOk> write(TableRows rows, WriteOp writeOp) {
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
if (respFuture.isCompletedExceptionally()) {
respFuture.getNow(null); // throw the exception now
}
Expand All @@ -133,7 +133,7 @@ public CompletableFuture<WriteOk> completed() {
}).join();
}

private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> write0(Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
InnerMetricHelper.writeByRetries(retries).mark();

return this.routerClient.route()
Expand All @@ -159,10 +159,10 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> row
}, this.asyncPool);
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
String database = this.opts.getDatabase();
AuthInfo authInfo = this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand Down Expand Up @@ -211,11 +211,11 @@ public void onCompleted() {

@Override
public void onNext(WriteTable writeTable) {
TableRows rows = writeTable.getRows();
Table rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
String database = WriteClient.this.opts.getDatabase();
AuthInfo authInfo = WriteClient.this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
rpcObserver.onNext(req);
}

Expand Down Expand Up @@ -306,12 +306,12 @@ public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) {
}

@Override
public int calculatePermits(Collection<TableRows> in) {
return in.stream().map(TableRows::rowCount).reduce(0, Integer::sum);
public int calculatePermits(Collection<Table> in) {
return in.stream().map(Table::rowCount).reduce(0, Integer::sum);
}

@Override
public Result<WriteOk, Err> rejected(Collection<TableRows> in, RejectedState state) {
public Result<WriteOk, Err> rejected(Collection<Table> in, RejectedState state) {
String errMsg =
String.format("Write limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", //
state.acquirePermits(), //
Expand All @@ -322,7 +322,7 @@ public Result<WriteOk, Err> rejected(Collection<TableRows> in, RejectedState sta
}

@SuppressWarnings("UnstableApiUsage")
static abstract class RateLimitingStreamWriter implements StreamWriter<TableRows, WriteOk> {
static abstract class RateLimitingStreamWriter implements StreamWriter<Table, WriteOk> {

private final Observer<WriteTable> observer;
private final RateLimiter rateLimiter;
Expand All @@ -337,7 +337,7 @@ static abstract class RateLimitingStreamWriter implements StreamWriter<TableRows
}

@Override
public StreamWriter<TableRows, WriteOk> write(TableRows rows, WriteOp writeOp) {
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
Ensures.ensureNonNull(rows, "null `rows`");

if (this.rateLimiter != null) {
Expand Down
Loading
Loading