Skip to content

Commit

Permalink
[Docs ] Refine flink sql and python docs (lakesoul-io#337)
Browse files Browse the repository at this point in the history
* refine docs

Signed-off-by: chenxu <[email protected]>

* update docs for python

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Sep 21, 2023
1 parent 2edb0b5 commit acbbf80
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;

public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite, SupportsRowLevelDelete, SupportsRowLevelUpdate {
public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning,
SupportsOverwrite, SupportsRowLevelDelete, SupportsRowLevelUpdate {

private final String summaryName;
private final String tableName;
Expand Down Expand Up @@ -153,19 +156,20 @@ public void applyStaticPartition(Map<String, String> map) {

@Override
public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) {
if(flinkConf.getBoolean(USE_CDC, false)){
flinkConf.set(DMLTYPE,DELETE_CDC);
}else{
flinkConf.set(DMLTYPE,DELETE);
if (flinkConf.getBoolean(USE_CDC, false)) {
flinkConf.set(DMLTYPE, DELETE_CDC);
} else {
flinkConf.set(DMLTYPE, DELETE);
}

return new LakeSoulRowLevelDelete();
}

@Override
public RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) {
flinkConf.set(DMLTYPE,UPDATE);
return new LakeSoulRowLevelUpdate();
public RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns,
@Nullable RowLevelModificationScanContext context) {
flinkConf.set(DMLTYPE, UPDATE);
return new LakeSoulRowLevelUpdate();
}

private class LakeSoulRowLevelDelete implements RowLevelDeleteInfo {
Expand All @@ -175,9 +179,9 @@ public Optional<List<Column>> requiredColumns() {
}

public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
if(flinkConf.getBoolean(USE_CDC, false)){
if (flinkConf.getBoolean(USE_CDC, false)) {
return RowLevelDeleteMode.DELETED_ROWS;
}else{
} else {
return RowLevelDeleteMode.REMAINING_ROWS;
}
}
Expand All @@ -190,9 +194,9 @@ public Optional<List<Column>> requiredColumns() {
}

public SupportsRowLevelUpdate.RowLevelUpdateMode getRowLevelUpdateMode() {
if (primaryKeyList.isEmpty()){
if (primaryKeyList.isEmpty()) {
return RowLevelUpdateMode.ALL_ROWS;
}else{
} else {
return SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.lakesoul.handle.LakeSoulTableColumnHandle;
import com.facebook.presto.lakesoul.handle.LakeSoulTableHandle;
import com.facebook.presto.lakesoul.handle.LakeSoulTableLayoutHandle;
import com.facebook.presto.lakesoul.pojo.TableSchema;
import com.facebook.presto.lakesoul.util.JsonUtil;
import com.facebook.presto.lakesoul.util.PrestoUtil;
import com.facebook.presto.spi.*;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.facebook.presto.common.type.Type;
import org.apache.spark.sql.types.StructType;

import java.time.ZoneId;
Expand Down Expand Up @@ -52,9 +47,11 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
if (!listSchemaNames(session).contains(tableName.getSchemaName())) {
return null;
}
TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName.getTableName(), tableName.getSchemaName());
TableInfo
tableInfo =
dbManager.getTableInfoByNameAndNamespace(tableName.getTableName(), tableName.getSchemaName());

if(tableInfo == null) {
if (tableInfo == null) {
throw new RuntimeException("no such table: " + tableName);
}

Expand All @@ -80,13 +77,15 @@ public List<ConnectorTableLayoutResult> getTableLayouts(
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
HashMap<String, ColumnHandle> allColumns = new HashMap<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
LakeSoulTableColumnHandle columnHandle =
new LakeSoulTableColumnHandle(tableHandle, field.getName(), PrestoUtil.convertToPrestoType(field.getType()));
new LakeSoulTableColumnHandle(tableHandle,
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()));
allColumns.put(field.getName(), columnHandle);
}
ConnectorTableLayout layout = new ConnectorTableLayout(
Expand Down Expand Up @@ -116,7 +115,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
}

TableInfo tableInfo = dbManager.getTableInfoByTableId(handle.getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + handle.getNames());
}
JSONObject properties = JSON.parseObject(tableInfo.getProperties());
Expand All @@ -126,13 +125,13 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect

List<ColumnMetadata> columns = new LinkedList<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
Map<String, Object> props = new HashMap<>();
for(Map.Entry<String, String> entry : field.getMetadata().entrySet()){
for (Map.Entry<String, String> entry : field.getMetadata().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
ColumnMetadata columnMetadata = new ColumnMetadata(
Expand All @@ -152,14 +151,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
columns,
properties,
Optional.of("")
);
);
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
LakeSoulTableHandle table = (LakeSoulTableHandle) tableHandle;
TableInfo tableInfo = dbManager.getTableInfoByTableId(table.getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + table.getNames());
}
JSONObject properties = JSON.parseObject(tableInfo.getProperties());
Expand All @@ -168,35 +167,39 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
HashMap<String, ColumnHandle> map = new HashMap<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
LakeSoulTableColumnHandle columnHandle =
new LakeSoulTableColumnHandle(table, field.getName(), PrestoUtil.convertToPrestoType(field.getType()));
new LakeSoulTableColumnHandle(table,
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()));
map.put(field.getName(), columnHandle);
}
return map;
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
public ColumnMetadata getColumnMetadata(ConnectorSession session,
ConnectorTableHandle tableHandle,
ColumnHandle columnHandle) {
LakeSoulTableColumnHandle handle = (LakeSoulTableColumnHandle) columnHandle;
TableInfo tableInfo = dbManager.getTableInfoByTableId(handle.getTableHandle().getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + handle.getTableHandle().getNames());
}

StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableInfo.getTableSchema());
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
Map<String, Object> properties = new HashMap<>();
for(Map.Entry<String, String> entry : field.getMetadata().entrySet()){
for (Map.Entry<String, String> entry : field.getMetadata().entrySet()) {
properties.put(entry.getKey(), entry.getValue());
}
if(field.getName().equals(handle.getColumnName())){
if (field.getName().equals(handle.getColumnName())) {
return new ColumnMetadata(
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()),
Expand All @@ -213,14 +216,15 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
SchemaTablePrefix prefix) {
//prefix: lakesoul.default.table1
String schema = prefix.getSchemaName();
String tableNamePrefix = prefix.getTableName();
List<String> tableNames = dbManager.listTableNamesByNamespace(schema);
Map<SchemaTableName, List<ColumnMetadata>> results = new HashMap<>();
for(String tableName : tableNames){
if(tableName.startsWith(tableNamePrefix)){
for (String tableName : tableNames) {
if (tableName.startsWith(tableNamePrefix)) {
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName);
ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {

List<Field> fields = recordSet.getColumnHandles().stream().map(item -> {
LakeSoulTableColumnHandle columnHandle = (LakeSoulTableColumnHandle) item;
return Field.nullable(columnHandle.getColumnName(), ArrowUtil.convertToArrowType(columnHandle.getColumnType()));
return Field.nullable(columnHandle.getColumnName(),
ArrowUtil.convertToArrowType(columnHandle.getColumnType()));
}).collect(Collectors.toList());
HashMap<String, ColumnHandle> allcolumns = split.getLayout().getAllColumns();
List<String> dataCols = recordSet.getColumnHandles().stream().map(item -> {
Expand All @@ -64,12 +65,15 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
for (String item : prikeys) {
if (!dataCols.contains(item)) {
LakeSoulTableColumnHandle columnHandle = (LakeSoulTableColumnHandle) allcolumns.get(item);
fields.add(Field.nullable(columnHandle.getColumnName(), ArrowUtil.convertToArrowType(columnHandle.getColumnType())));
fields.add(Field.nullable(columnHandle.getColumnName(),
ArrowUtil.convertToArrowType(columnHandle.getColumnType())));
}
}
// add extra cdc column
String cdcColumn = this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if(cdcColumn != null){
String
cdcColumn =
this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if (cdcColumn != null) {
fields.add(Field.notNullable(cdcColumn, new ArrowType.Utf8()));
}

Expand All @@ -78,7 +82,11 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
for (Map.Entry<String, String> partition : this.partitions.entrySet()) {
reader.setDefaultColumnValue(partition.getKey(), partition.getValue());
}
desiredTypes = recordSet.getColumnHandles().stream().map(item -> ((LakeSoulTableColumnHandle) item).getColumnType()).collect(Collectors.toList());
desiredTypes =
recordSet.getColumnHandles()
.stream()
.map(item -> ((LakeSoulTableColumnHandle) item).getColumnType())
.collect(Collectors.toList());
// set filters
this.recordSet.getSplit().getLayout().getFilters().forEach((filter) -> reader.addFilter(filter.toString()));
// set s3 options
Expand Down Expand Up @@ -122,11 +130,13 @@ public Type getType(int field) {

@Override
public boolean advanceNextPosition() {
String cdcColumn = this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
String
cdcColumn =
this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if (cdcColumn != null) {
while(next()){
while (next()) {
FieldVector vector = currentVCR.getVector(cdcColumn);
if(!vector.getObject(curRecordIdx).toString().equals("delete")){
if (!vector.getObject(curRecordIdx).toString().equals("delete")) {
return true;
}
}
Expand All @@ -136,7 +146,7 @@ public boolean advanceNextPosition() {
}
}

private boolean next(){
private boolean next() {
if (currentVCR == null) {
return false;
}
Expand Down Expand Up @@ -188,7 +198,8 @@ public long getLong(int field) {
if (timeZone.equals("") || !Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZone)) {
timeZone = TimeZone.getDefault().getID();
}
return DateTimeEncoding.packDateTimeWithZone(((TimeStampMicroTZVector) fv).get(curRecordIdx) / 1000, ZoneId.of(timeZone).toString());
return DateTimeEncoding.packDateTimeWithZone(((TimeStampMicroTZVector) fv).get(curRecordIdx) / 1000,
ZoneId.of(timeZone).toString());
}
if (fv instanceof DecimalVector) {
BigDecimal dv = ((DecimalVector) fv).getObject(curRecordIdx);
Expand Down Expand Up @@ -231,7 +242,10 @@ public Slice getSlice(int field) {
if (value instanceof BigDecimal) {
return Decimals.encodeScaledValue((BigDecimal) value);
}
throw new IllegalArgumentException("Field " + field + " is not a String, but is a " + value.getClass().getName());
throw new IllegalArgumentException("Field " +
field +
" is not a String, but is a " +
value.getClass().getName());
}

@Override
Expand Down
Loading

0 comments on commit acbbf80

Please sign in to comment.