Skip to content

Commit

Permalink
feat: bind dbname to client
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Dec 21, 2023
1 parent a9e9b7b commit 41d7ee2
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 243 deletions.
9 changes: 3 additions & 6 deletions ingester-example/src/main/java/io/greptime/QuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Database;
import io.greptime.models.Err;
import io.greptime.models.Metric;
import io.greptime.models.Result;
Expand All @@ -39,7 +38,7 @@ public class QuickStart {

public static void main(String[] args) throws Exception {
String endpoint = "127.0.0.1:4001";
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoint) //
GreptimeOptions opts = GreptimeOptions.newBuilder("public", endpoint) //
.writeMaxRetries(1) //
.routeTableRefreshPeriodSeconds(-1) //
.build();
Expand All @@ -60,8 +59,7 @@ public static void main(String[] args) throws Exception {
runInsertWithStream(greptimeDB, now);
}

@Database(name = "public")
@Metric(name = "monitor")
@Metric(name = "monitor1")
static class Monitor {
@Column(name = "host", tag = true, dataType = DataType.String)
String host;
Expand All @@ -75,8 +73,7 @@ static class Monitor {
BigDecimal decimalValue;
}

@Database(name = "public")
@Metric(name = "monitor_cpu")
@Metric(name = "monitor_cpu1")
static class MonitorCpu {
@Column(name = "host", tag = true, dataType = DataType.String)
String host;
Expand Down
19 changes: 1 addition & 18 deletions ingester-protocol/src/main/java/io/greptime/PojoMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.greptime.errors.PojoException;
import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Database;
import io.greptime.models.Metric;
import io.greptime.models.SemanticType;
import io.greptime.models.TableName;
import io.greptime.models.TableRows;
import io.greptime.models.TableSchema;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -57,11 +55,8 @@ public <M> TableRows toTableRows(List<M> pojos) {

Map<String, Field> fieldMap = getAndCacheMetricClass(metricType);

String database = getDatabase(metricType);
String metricName = getMetricName(metricType);

TableName tableName = TableName.with(database, metricName);

String[] columnNames = new String[fieldMap.size()];
DataType[] dataTypes = new DataType[fieldMap.size()];
SemanticType[] semanticTypes = new SemanticType[fieldMap.size()];
Expand All @@ -85,7 +80,7 @@ public <M> TableRows toTableRows(List<M> pojos) {
i++;
}

TableSchema schema = TableSchema.newBuilder(tableName) //
TableSchema schema = TableSchema.newBuilder(metricName) //
.columnNames(columnNames) //
.semanticTypes(semanticTypes) //
.dataTypes(dataTypes) //
Expand Down Expand Up @@ -113,18 +108,6 @@ public <M> TableRows toTableRows(List<M> pojos) {
return tableRows;
}

private <M> String getDatabase(Class<M> metricType) {
Database databaseAnnotation = metricType.getAnnotation(Database.class);
if (databaseAnnotation != null) {
return databaseAnnotation.name();
}

String err =
String.format("Unable to determine Database for '%s'." + " Does it have a @Database annotation?",
metricType);
throw new PojoException(err);
}

private String getMetricName(Class<?> metricType) {
// From @Metirc annotation
Metric metricAnnotation = metricType.getAnnotation(Metric.class);
Expand Down
19 changes: 9 additions & 10 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import io.greptime.errors.StreamException;
import io.greptime.limit.LimitedPolicy;
import io.greptime.limit.WriteLimiter;
import io.greptime.models.AuthInfo;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.TableName;
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.TableRowsHelper;
Expand All @@ -48,7 +48,6 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
* Default Write API impl.
Expand Down Expand Up @@ -161,10 +160,9 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> row
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
Collection<TableName> tableNames = rows.stream() //
.map(TableRows::tableName) //
.collect(Collectors.toList());
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(tableNames, rows, writeOp, this.opts.getAuthInfo());
String database = this.opts.getDatabase();
AuthInfo authInfo = this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand All @@ -175,7 +173,7 @@ private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Colle
int statusCode = status.getStatusCode();
if (Status.isSuccess(statusCode)) {
int affectedRows = resp.getAffectedRows().getValue();
return WriteOk.ok(affectedRows, 0, tableNames).mapToResult();
return WriteOk.ok(affectedRows, 0).mapToResult();
} else {
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult();
}
Expand All @@ -190,7 +188,7 @@ private Observer<WriteTable> streamWriteTo(Endpoint endpoint, Context ctx, Obser
@Override
public void onNext(Database.GreptimeResponse resp) {
int affectedRows = resp.getAffectedRows().getValue();
Result<WriteOk, Err> ret = WriteOk.ok(affectedRows, 0, null).mapToResult();
Result<WriteOk, Err> ret = WriteOk.ok(affectedRows, 0).mapToResult();
if (ret.isOk()) {
respObserver.onNext(ret.getOk());
} else {
Expand All @@ -215,8 +213,9 @@ public void onCompleted() {
public void onNext(WriteTable writeTable) {
TableRows rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
Database.GreptimeRequest req =
TableRowsHelper.toGreptimeRequest(rows, writeOp, WriteClient.this.opts.getAuthInfo());
String database = WriteClient.this.opts.getDatabase();
AuthInfo authInfo = WriteClient.this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
rpcObserver.onNext(req);
}

Expand Down
31 changes: 0 additions & 31 deletions ingester-protocol/src/main/java/io/greptime/models/Database.java

This file was deleted.

80 changes: 0 additions & 80 deletions ingester-protocol/src/main/java/io/greptime/models/TableName.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface TableRows {
/**
* The table name to write.
*/
TableName tableName();
String tableName();

/**
* The rows count to write.
Expand Down Expand Up @@ -89,7 +89,7 @@ public Builder(TableSchema tableSchema) {
}

public TableRows build() {
TableName tableName = this.tableSchema.getTableName();
String tableName = this.tableSchema.getTableName();
List<String> columnNames = this.tableSchema.getColumnNames();
List<Common.SemanticType> semanticTypes = this.tableSchema.getSemanticTypes();
List<Common.ColumnDataType> dataTypes = this.tableSchema.getDataTypes();
Expand All @@ -110,7 +110,7 @@ public TableRows build() {
return buildRow(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions);
}

private static TableRows buildRow(TableName tableName, //
private static TableRows buildRow(String tableName, //
int columnCount, //
List<String> columnNames, //
List<Common.SemanticType> semanticTypes, //
Expand All @@ -134,13 +134,13 @@ private static TableRows buildRow(TableName tableName, //

class RowBasedTableRows implements TableRows, Into<RowData.Rows> {

private TableName tableName;
private String tableName;

private List<RowData.ColumnSchema> columnSchemas;
private final List<RowData.Row> rows = new ArrayList<>();

@Override
public TableName tableName() {
public String tableName() {
return tableName;
}

Expand Down Expand Up @@ -172,15 +172,15 @@ public TableRows insert(Object... values) {
@Override
public Database.RowInsertRequest intoRowInsertRequest() {
return Database.RowInsertRequest.newBuilder() //
.setTableName(this.tableName.getTableName()) //
.setTableName(this.tableName) //
.setRows(into()) //
.build();
}

@Override
public Database.RowDeleteRequest intoRowDeleteRequest() {
return Database.RowDeleteRequest.newBuilder() //
.setTableName(this.tableName.getTableName()) //
.setTableName(this.tableName) //
.setRows(into()) //
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,25 @@
*/
public class TableRowsHelper {

public static Database.GreptimeRequest toGreptimeRequest(TableRows rows, WriteOp writeOp, AuthInfo authInfo) {
return toGreptimeRequest(Collections.singleton(rows.tableName()), Collections.singleton(rows), writeOp,
authInfo);
public static Database.GreptimeRequest toGreptimeRequest(TableRows rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
return toGreptimeRequest(Collections.singleton(rows), writeOp, database, authInfo);
}

public static Database.GreptimeRequest toGreptimeRequest(Collection<TableName> tableNames, //
Collection<TableRows> rows, //
public static Database.GreptimeRequest toGreptimeRequest(Collection<TableRows> rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
String dbName = null;
for (TableName t : tableNames) {
if (dbName == null) {
dbName = t.getDatabaseName();
} else if (!dbName.equals(t.getDatabaseName())) {
String errMsg =
String.format("Write to multiple databases is not supported: %s, %s", dbName,
t.getDatabaseName());
throw new IllegalArgumentException(errMsg);
}
}

Common.RequestHeader.Builder headerBuilder = Common.RequestHeader.newBuilder();
if (dbName != null) {
headerBuilder.setDbname(dbName);
if (database != null) {
headerBuilder.setDbname(database);
}
if (authInfo != null) {
headerBuilder.setAuthorization(authInfo.into());
}


switch (writeOp) {
case Insert:
Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder();
Expand Down
Loading

0 comments on commit 41d7ee2

Please sign in to comment.