Skip to content

Commit

Permalink
[Native] use pool for pg connections (#558)
Browse files Browse the repository at this point in the history
* [Native] use pool for pg connections

Signed-off-by: chenxu <[email protected]>

* fix free

Signed-off-by: chenxu <[email protected]>

* add statement cache

Signed-off-by: chenxu <[email protected]>

* fix statement set with connection level cache

Signed-off-by: chenxu <[email protected]>

* fix compilation

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Nov 10, 2024
1 parent d25ac74 commit 54e7198
Show file tree
Hide file tree
Showing 13 changed files with 628 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,10 @@ public static LinkedHashMap<String, String> parsePartitionDesc(String partitionD

public static void fillDataSourceConfig(HikariConfig config) {
config.setConnectionTimeout(10000);
config.setIdleTimeout(30000);
config.setIdleTimeout(10000);
config.setMaximumPoolSize(2);
config.setKeepaliveTime(10000);
config.setMinimumIdle(1);
config.setKeepaliveTime(5000);
config.setMinimumIdle(0);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,25 @@ public interface LibLakeSoulMetaData {

void free_tokio_runtime(Pointer runtime);

Pointer create_prepared_statement();

void free_prepared_statement(Pointer prepared);

Pointer create_tokio_postgres_client(BooleanCallback booleanCallback, String config, Pointer runtime);

void free_tokio_postgres_client(Pointer client);

Pointer execute_query(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
Pointer execute_query(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, String texts);

void export_bytes_result(BooleanCallback booleanCallback, Pointer bytes, Integer len, @LongLong long addr);

void free_bytes_result(Pointer bytes);

void execute_update(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
void execute_update(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, String texts);

void execute_query_scalar(StringCallback stringCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, String texts);
void execute_query_scalar(StringCallback stringCallback, Pointer runtime, Pointer client, Integer type, String texts);

void execute_insert(IntegerCallback integerCallback, Pointer runtime, Pointer client, Pointer prepared, Integer type, @LongLong long addr, int length);
void execute_insert(IntegerCallback integerCallback, Pointer runtime, Pointer client, Integer type, @LongLong long addr, int length);

void clean_meta_for_test(IntegerCallback integerCallback, Pointer runtime, Pointer client);

Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer prepared, Pointer runtime, String tableName, String namespace);
Pointer create_split_desc_array(BooleanCallback booleanCallback, Pointer client, Pointer runtime, String tableName, String namespace);

void free_split_desc_array(Pointer json);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.dmetasoul.lakesoul.meta.jnr;

import com.alibaba.fastjson.JSON;
import com.dmetasoul.lakesoul.meta.DBConnector;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataBaseProperty;
import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
Expand Down Expand Up @@ -39,9 +38,6 @@ public class NativeMetadataJavaClient implements AutoCloseable {
private Pointer tokioPostgresClient = null;
private Pointer tokioRuntime = null;

private Pointer preparedStatement = null;


protected final LibLakeSoulMetaData libLakeSoulMetaData;

protected final ObjectReferenceManager<LibLakeSoulMetaData.BooleanCallback> booleanCallbackObjectReferenceManager;
Expand Down Expand Up @@ -193,7 +189,7 @@ private void initialize() {
tokioRuntime = libLakeSoulMetaData.create_tokio_runtime();

String config = String.format(
"host=%s port=%s dbname=%s user=%s password=%s",
"host=%s port=%s dbname=%s user=%s password=%s connect_timeout=10 ",
dataBaseProperty.getHost(),
dataBaseProperty.getPort(),
dataBaseProperty.getDbName(),
Expand All @@ -212,7 +208,6 @@ private void initialize() {
config,
tokioRuntime
);
preparedStatement = libLakeSoulMetaData.create_prepared_statement();
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -241,7 +236,6 @@ public JniWrapper executeQuery(Integer queryType, List<String> params) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -340,7 +334,6 @@ public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
insertType,
buffer.address(),
bytes.length
Expand Down Expand Up @@ -388,7 +381,6 @@ public Integer executeUpdate(Integer updateType, List<String> params) {
}, getIntegerCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
updateType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -435,7 +427,6 @@ public List<String> executeQueryScalar(Integer queryScalarType, List<String> par
}, getStringCallbackObjectReferenceManager()),
tokioRuntime,
tokioPostgresClient,
preparedStatement,
queryScalarType,
String.join(PARAM_DELIM, params)
);
Expand Down Expand Up @@ -529,10 +520,6 @@ public void close() {
libLakeSoulMetaData.free_tokio_postgres_client(tokioPostgresClient);
tokioPostgresClient = null;
}
if (preparedStatement != null) {
libLakeSoulMetaData.free_prepared_statement(preparedStatement);
preparedStatement = null;
}
}

public static void closeAll() {
Expand Down Expand Up @@ -566,7 +553,6 @@ public List<SplitDesc> createSplitDescArray(String tableName, String namespace)
future.complete(result);
}, getbooleanCallbackObjectReferenceManager()),
tokioPostgresClient,
preparedStatement,
tokioRuntime,
tableName,
namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE of rows will be used to write a new row group
""".stripMargin)
.intConf
.createWithDefault(1000000)
.createWithDefault(100000)

val NATIVE_IO_THREAD_NUM: ConfigEntry[Int] =
buildConf("native.io.thread.num")
Expand Down
26 changes: 26 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 23 additions & 58 deletions rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use prost::Message;

use lakesoul_metadata::error::LakeSoulMetaDataError;
use lakesoul_metadata::transfusion::SplitDesc;
use lakesoul_metadata::{Builder, Client, MetaDataClient, PreparedStatementMap, Runtime};
use lakesoul_metadata::{Builder, MetaDataClient, PooledClient, Runtime};
use proto::proto::entity;

#[allow(non_camel_case_types)]
Expand All @@ -34,13 +34,13 @@ impl<OpaqueT> CResult<OpaqueT> {
pub fn new<T>(obj: T) -> Self {
CResult {
ptr: convert_to_opaque_raw::<T, OpaqueT>(obj),
err: std::ptr::null(),
err: null(),
}
}

pub fn error(err_msg: &str) -> Self {
CResult {
ptr: std::ptr::null_mut(),
ptr: null_mut(),
err: CString::new(err_msg).unwrap().into_raw(),
}
}
Expand Down Expand Up @@ -86,20 +86,6 @@ fn _call_integer_result_callback(callback: IntegerResultCallBack, status: i32, e
}
}

// #[repr(C)]
// struct CVoid {
// data: *const c_void,
// }

// unsafe impl Send for CVoid {}

// unsafe impl Sync for CVoid {}

#[repr(C)]
pub struct PreparedStatement {
private: [u8; 0],
}

#[repr(C)]
pub struct TokioPostgresClient {
private: [u8; 0],
Expand Down Expand Up @@ -140,21 +126,19 @@ pub extern "C" fn execute_insert(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
insert_type: i32,
addr: c_ptrdiff_t,
len: i32,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };

let raw_parts = unsafe { std::slice::from_raw_parts(addr as *const u8, len as usize) };
let wrapper = entity::JniWrapper::decode(prost::bytes::Bytes::from(raw_parts)).unwrap();
let result =
runtime.block_on(async { lakesoul_metadata::execute_insert(client, prepared, insert_type, wrapper).await });
runtime.block_on(async { lakesoul_metadata::execute_insert(client, insert_type, wrapper).await });
match result {
Ok(count) => call_result_callback(callback, count, std::ptr::null()),
Ok(count) => call_result_callback(callback, count, null()),
Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()),
}
}
Expand All @@ -164,19 +148,17 @@ pub extern "C" fn execute_update(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
update_type: i32,
joined_string: *const c_char,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_update(client, prepared, update_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_update(client, update_type, string_from_ptr(joined_string)).await
});
match result {
Ok(count) => call_result_callback(callback, count, std::ptr::null()),
Ok(count) => call_result_callback(callback, count, null()),
Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()),
}
}
Expand All @@ -186,20 +168,18 @@ pub extern "C" fn execute_query_scalar(
callback: extern "C" fn(*const c_char, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
update_type: i32,
joined_string: *const c_char,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_mut() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_mut() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_query_scalar(client, prepared, update_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_query_scalar(client, update_type, string_from_ptr(joined_string)).await
});
let (result, err): (*mut c_char, *const c_char) = match result {
Ok(Some(result)) => (CString::new(result.as_str()).unwrap().into_raw(), std::ptr::null()),
Ok(None) => (CString::new("").unwrap().into_raw(), std::ptr::null()),
Ok(Some(result)) => (CString::new(result.as_str()).unwrap().into_raw(), null()),
Ok(None) => (CString::new("").unwrap().into_raw(), null()),
Err(e) => (
CString::new("").unwrap().into_raw(),
CString::new(e.to_string().as_str()).unwrap().into_raw(),
Expand All @@ -216,21 +196,19 @@ pub extern "C" fn execute_query(
callback: extern "C" fn(i32, *const c_char),
runtime: NonNull<CResult<TokioRuntime>>,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
query_type: i32,
joined_string: *const c_char,
) -> NonNull<CResult<BytesResult>> {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_ref() };

let result = runtime.block_on(async {
lakesoul_metadata::execute_query(client, prepared, query_type, string_from_ptr(joined_string)).await
lakesoul_metadata::execute_query(client, query_type, string_from_ptr(joined_string)).await
});
match result {
Ok(u8_vec) => {
let len = u8_vec.len();
call_result_callback(callback, len as i32, std::ptr::null());
call_result_callback(callback, len as i32, null());
convert_to_nonnull(CResult::<BytesResult>::new::<Vec<u8>>(u8_vec))
}
Err(e) => {
Expand Down Expand Up @@ -267,7 +245,7 @@ pub extern "C" fn export_bytes_result(
let mut writer = dst.writer();
let _ = writer.write_all(bytes.as_slice());

call_result_callback(callback, true, std::ptr::null());
call_result_callback(callback, true, null());
}

#[no_mangle]
Expand All @@ -282,10 +260,10 @@ pub extern "C" fn clean_meta_for_test(
client: NonNull<CResult<TokioPostgresClient>>,
) {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_ref() };
let result = runtime.block_on(async { lakesoul_metadata::clean_meta_for_test(client).await });
match result {
Ok(count) => call_result_callback(callback, count, std::ptr::null()),
Ok(count) => call_result_callback(callback, count, null()),
Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()),
}
}
Expand Down Expand Up @@ -319,7 +297,7 @@ pub extern "C" fn create_tokio_postgres_client(

let result = match result {
Ok(client) => {
call_result_callback(callback, true, std::ptr::null());
call_result_callback(callback, true, null());
CResult::<TokioPostgresClient>::new(client)
}
Err(e) => {
Expand All @@ -336,18 +314,7 @@ pub extern "C" fn create_tokio_postgres_client(

#[no_mangle]
pub extern "C" fn free_tokio_postgres_client(client: NonNull<CResult<TokioPostgresClient>>) {
from_nonnull(client).free::<Client>();
}

#[no_mangle]
pub extern "C" fn create_prepared_statement() -> NonNull<CResult<PreparedStatement>> {
let prepared = PreparedStatementMap::new();
convert_to_nonnull(CResult::<PreparedStatement>::new(prepared))
}

#[no_mangle]
pub extern "C" fn free_prepared_statement(prepared: NonNull<CResult<PreparedStatement>>) {
from_nonnull(prepared).free::<PreparedStatementMap>();
from_nonnull(client).free::<PooledClient>();
}

#[no_mangle]
Expand Down Expand Up @@ -376,18 +343,16 @@ fn c_char2str<'a>(ptr: *const c_char) -> &'a str {
pub extern "C" fn create_split_desc_array(
callback: ResultCallback,
client: NonNull<CResult<TokioPostgresClient>>,
prepared: NonNull<CResult<PreparedStatement>>,
runtime: NonNull<CResult<TokioRuntime>>,
table_name: *const c_char,
namespace: *const c_char,
) -> *mut c_char {
let runtime = unsafe { NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() };
let prepared = unsafe { NonNull::new_unchecked(prepared.as_ref().ptr as *mut PreparedStatementMap).as_mut() };
let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut PooledClient).as_ref() };
let table_name = c_char2str(table_name);
let namespace = c_char2str(namespace);
let result: Result<*mut c_char, LakeSoulMetaDataError> = runtime.block_on(async {
let ret = lakesoul_metadata::transfusion::split_desc_array(client, prepared, table_name, namespace).await?;
let ret = lakesoul_metadata::transfusion::split_desc_array(client, table_name, namespace).await?;
let v = serde_json::to_vec(&ret)?;
Ok(CString::new(v)
.map_err(|e| LakeSoulMetaDataError::Internal(e.to_string()))?
Expand Down
Loading

0 comments on commit 54e7198

Please sign in to comment.