Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bind dbname to client #5

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions ingester-example/src/main/java/io/greptime/QuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Database;
import io.greptime.models.Err;
import io.greptime.models.Metric;
import io.greptime.models.Result;
Expand All @@ -39,7 +38,7 @@ public class QuickStart {

public static void main(String[] args) throws Exception {
String endpoint = "127.0.0.1:4001";
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoint) //
GreptimeOptions opts = GreptimeOptions.newBuilder("public", endpoint) //
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider:

GreptimeOptions.newBuilder(endpoint, "public")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it on next pr

.writeMaxRetries(1) //
.routeTableRefreshPeriodSeconds(-1) //
.build();
Expand All @@ -60,8 +59,7 @@ public static void main(String[] args) throws Exception {
runInsertWithStream(greptimeDB, now);
}

@Database(name = "public")
@Metric(name = "monitor")
@Metric(name = "monitor1")
static class Monitor {
@Column(name = "host", tag = true, dataType = DataType.String)
String host;
Expand All @@ -75,8 +73,7 @@ static class Monitor {
BigDecimal decimalValue;
}

@Database(name = "public")
@Metric(name = "monitor_cpu")
@Metric(name = "monitor_cpu1")
static class MonitorCpu {
@Column(name = "host", tag = true, dataType = DataType.String)
String host;
Expand Down
19 changes: 1 addition & 18 deletions ingester-protocol/src/main/java/io/greptime/PojoMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.greptime.errors.PojoException;
import io.greptime.models.Column;
import io.greptime.models.DataType;
import io.greptime.models.Database;
import io.greptime.models.Metric;
import io.greptime.models.SemanticType;
import io.greptime.models.TableName;
import io.greptime.models.TableRows;
import io.greptime.models.TableSchema;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -57,11 +55,8 @@ public <M> TableRows toTableRows(List<M> pojos) {

Map<String, Field> fieldMap = getAndCacheMetricClass(metricType);

String database = getDatabase(metricType);
String metricName = getMetricName(metricType);

TableName tableName = TableName.with(database, metricName);

String[] columnNames = new String[fieldMap.size()];
DataType[] dataTypes = new DataType[fieldMap.size()];
SemanticType[] semanticTypes = new SemanticType[fieldMap.size()];
Expand All @@ -85,7 +80,7 @@ public <M> TableRows toTableRows(List<M> pojos) {
i++;
}

TableSchema schema = TableSchema.newBuilder(tableName) //
TableSchema schema = TableSchema.newBuilder(metricName) //
.columnNames(columnNames) //
.semanticTypes(semanticTypes) //
.dataTypes(dataTypes) //
Expand Down Expand Up @@ -113,18 +108,6 @@ public <M> TableRows toTableRows(List<M> pojos) {
return tableRows;
}

private <M> String getDatabase(Class<M> metricType) {
Database databaseAnnotation = metricType.getAnnotation(Database.class);
if (databaseAnnotation != null) {
return databaseAnnotation.name();
}

String err =
String.format("Unable to determine Database for '%s'." + " Does it have a @Database annotation?",
metricType);
throw new PojoException(err);
}

private String getMetricName(Class<?> metricType) {
// From @Metirc annotation
Metric metricAnnotation = metricType.getAnnotation(Metric.class);
Expand Down
19 changes: 9 additions & 10 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import io.greptime.errors.StreamException;
import io.greptime.limit.LimitedPolicy;
import io.greptime.limit.WriteLimiter;
import io.greptime.models.AuthInfo;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.TableName;
import io.greptime.models.WriteOk;
import io.greptime.models.TableRows;
import io.greptime.models.TableRowsHelper;
Expand All @@ -48,7 +48,6 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
* Default Write API impl.
Expand Down Expand Up @@ -161,10 +160,9 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<TableRows> row
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<TableRows> rows, WriteOp writeOp, Context ctx, int retries) {
Collection<TableName> tableNames = rows.stream() //
.map(TableRows::tableName) //
.collect(Collectors.toList());
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(tableNames, rows, writeOp, this.opts.getAuthInfo());
String database = this.opts.getDatabase();
AuthInfo authInfo = this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand All @@ -175,7 +173,7 @@ private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Colle
int statusCode = status.getStatusCode();
if (Status.isSuccess(statusCode)) {
int affectedRows = resp.getAffectedRows().getValue();
return WriteOk.ok(affectedRows, 0, tableNames).mapToResult();
return WriteOk.ok(affectedRows, 0).mapToResult();
} else {
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult();
}
Expand All @@ -190,7 +188,7 @@ private Observer<WriteTable> streamWriteTo(Endpoint endpoint, Context ctx, Obser
@Override
public void onNext(Database.GreptimeResponse resp) {
int affectedRows = resp.getAffectedRows().getValue();
Result<WriteOk, Err> ret = WriteOk.ok(affectedRows, 0, null).mapToResult();
Result<WriteOk, Err> ret = WriteOk.ok(affectedRows, 0).mapToResult();
if (ret.isOk()) {
respObserver.onNext(ret.getOk());
} else {
Expand All @@ -215,8 +213,9 @@ public void onCompleted() {
public void onNext(WriteTable writeTable) {
TableRows rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
Database.GreptimeRequest req =
TableRowsHelper.toGreptimeRequest(rows, writeOp, WriteClient.this.opts.getAuthInfo());
String database = WriteClient.this.opts.getDatabase();
AuthInfo authInfo = WriteClient.this.opts.getAuthInfo();
Database.GreptimeRequest req = TableRowsHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
rpcObserver.onNext(req);
}

Expand Down
31 changes: 0 additions & 31 deletions ingester-protocol/src/main/java/io/greptime/models/Database.java

This file was deleted.

80 changes: 0 additions & 80 deletions ingester-protocol/src/main/java/io/greptime/models/TableName.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface TableRows {
/**
* The table name to write.
*/
TableName tableName();
String tableName();

/**
* The rows count to write.
Expand Down Expand Up @@ -89,7 +89,7 @@ public Builder(TableSchema tableSchema) {
}

public TableRows build() {
TableName tableName = this.tableSchema.getTableName();
String tableName = this.tableSchema.getTableName();
List<String> columnNames = this.tableSchema.getColumnNames();
List<Common.SemanticType> semanticTypes = this.tableSchema.getSemanticTypes();
List<Common.ColumnDataType> dataTypes = this.tableSchema.getDataTypes();
Expand All @@ -110,7 +110,7 @@ public TableRows build() {
return buildRow(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions);
}

private static TableRows buildRow(TableName tableName, //
private static TableRows buildRow(String tableName, //
int columnCount, //
List<String> columnNames, //
List<Common.SemanticType> semanticTypes, //
Expand All @@ -134,13 +134,13 @@ private static TableRows buildRow(TableName tableName, //

class RowBasedTableRows implements TableRows, Into<RowData.Rows> {

private TableName tableName;
private String tableName;

private List<RowData.ColumnSchema> columnSchemas;
private final List<RowData.Row> rows = new ArrayList<>();

@Override
public TableName tableName() {
public String tableName() {
return tableName;
}

Expand Down Expand Up @@ -172,15 +172,15 @@ public TableRows insert(Object... values) {
@Override
public Database.RowInsertRequest intoRowInsertRequest() {
return Database.RowInsertRequest.newBuilder() //
.setTableName(this.tableName.getTableName()) //
.setTableName(this.tableName) //
.setRows(into()) //
.build();
}

@Override
public Database.RowDeleteRequest intoRowDeleteRequest() {
return Database.RowDeleteRequest.newBuilder() //
.setTableName(this.tableName.getTableName()) //
.setTableName(this.tableName) //
.setRows(into()) //
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,25 @@
*/
public class TableRowsHelper {

public static Database.GreptimeRequest toGreptimeRequest(TableRows rows, WriteOp writeOp, AuthInfo authInfo) {
return toGreptimeRequest(Collections.singleton(rows.tableName()), Collections.singleton(rows), writeOp,
authInfo);
public static Database.GreptimeRequest toGreptimeRequest(TableRows rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
return toGreptimeRequest(Collections.singleton(rows), writeOp, database, authInfo);
}

public static Database.GreptimeRequest toGreptimeRequest(Collection<TableName> tableNames, //
Collection<TableRows> rows, //
public static Database.GreptimeRequest toGreptimeRequest(Collection<TableRows> rows, //
WriteOp writeOp, //
String database, //
AuthInfo authInfo) {
String dbName = null;
for (TableName t : tableNames) {
if (dbName == null) {
dbName = t.getDatabaseName();
} else if (!dbName.equals(t.getDatabaseName())) {
String errMsg =
String.format("Write to multiple databases is not supported: %s, %s", dbName,
t.getDatabaseName());
throw new IllegalArgumentException(errMsg);
}
}

Common.RequestHeader.Builder headerBuilder = Common.RequestHeader.newBuilder();
if (dbName != null) {
headerBuilder.setDbname(dbName);
if (database != null) {
headerBuilder.setDbname(database);
}
if (authInfo != null) {
headerBuilder.setAuthorization(authInfo.into());
}


switch (writeOp) {
case Insert:
Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder();
Expand Down
Loading
Loading