Skip to content

Commit

Permalink
test: add more integration test for kafka wal (GreptimeTeam#3190)
Browse files Browse the repository at this point in the history
* test: add integration tests for kafka wal

* chore: rebase main

* chore: unify naming convention for wal config

* chore: add register loaders switch

* chore: alter tables by adding a new column

* chore: move rand to dev-dependencies

* chore: update Cargo.lock
  • Loading branch information
niebayes authored Mar 28, 2024
1 parent e3b37ee commit d5ba2fc
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ impl StartCommand {
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
);
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl DdlManager {
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
register_loaders: bool,
) -> Result<Self> {
let manager = Self {
procedure_manager,
Expand All @@ -88,7 +89,9 @@ impl DdlManager {
table_metadata_allocator,
memory_region_keeper,
};
manager.register_loaders()?;
if register_loaders {
manager.register_loaders()?;
}
Ok(manager)
}

Expand Down Expand Up @@ -767,6 +770,7 @@ mod tests {
Arc::new(WalOptionsAllocator::default()),
)),
Arc::new(MemoryRegionKeeper::default()),
true,
);

let expected_loaders = vec![
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ fn build_ddl_manager(
table_metadata_manager.clone(),
table_metadata_allocator.clone(),
memory_region_keeper.clone(),
true,
)
.context(error::InitDdlManagerSnafu)?,
))
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ opentelemetry-proto.workspace = true
partition.workspace = true
paste.workspace = true
prost.workspace = true
rand.workspace = true
script.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
Expand Down
22 changes: 11 additions & 11 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ pub struct GreptimeDbClusterBuilder {
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
wal_config: DatanodeWalConfig,
meta_wal_config: MetaSrvWalConfig,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetaSrvWalConfig,
shared_home_dir: Option<Arc<TempDir>>,
meta_selector: Option<SelectorRef>,
}
Expand Down Expand Up @@ -108,8 +108,8 @@ impl GreptimeDbClusterBuilder {
store_config: None,
store_providers: None,
datanodes: None,
wal_config: DatanodeWalConfig::default(),
meta_wal_config: MetaSrvWalConfig::default(),
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetaSrvWalConfig::default(),
shared_home_dir: None,
meta_selector: None,
}
Expand All @@ -134,14 +134,14 @@ impl GreptimeDbClusterBuilder {
}

#[must_use]
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
self.wal_config = wal_config;
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
self.datanode_wal_config = datanode_wal_config;
self
}

#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
self.meta_wal_config = wal_meta;
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
self.metasrv_wal_config = metasrv_wal_config;
self
}

Expand Down Expand Up @@ -174,7 +174,7 @@ impl GreptimeDbClusterBuilder {
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
},
wal: self.meta_wal_config.clone(),
wal: self.metasrv_wal_config.clone(),
..Default::default()
};

Expand Down Expand Up @@ -249,15 +249,15 @@ impl GreptimeDbClusterBuilder {
store_config.clone(),
vec![],
home_dir,
self.wal_config.clone(),
self.datanode_wal_config.clone(),
)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
mode,
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
self.wal_config.clone(),
self.datanode_wal_config.clone(),
);

storage_guards.push(guard.storage_guards);
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(assert_matches)]

pub mod cluster;
mod grpc;
mod influxdb;
Expand Down
26 changes: 14 additions & 12 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ pub struct GreptimeDbStandalone {

pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: DatanodeWalConfig,
meta_wal_config: MetaSrvWalConfig,
datanode_wal_config: DatanodeWalConfig,
metasrv_wal_config: MetaSrvWalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
Expand All @@ -64,8 +64,8 @@ impl GreptimeDbStandaloneBuilder {
store_providers: None,
plugin: None,
default_store: None,
wal_config: DatanodeWalConfig::default(),
meta_wal_config: MetaSrvWalConfig::default(),
datanode_wal_config: DatanodeWalConfig::default(),
metasrv_wal_config: MetaSrvWalConfig::default(),
}
}

Expand Down Expand Up @@ -96,23 +96,24 @@ impl GreptimeDbStandaloneBuilder {
}

#[must_use]
pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self {
self.wal_config = wal_config;
pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self {
self.datanode_wal_config = datanode_wal_config;
self
}

#[must_use]
pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self {
self.meta_wal_config = wal_meta;
pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self {
self.metasrv_wal_config = metasrv_wal_config;
self
}

pub async fn build_with(
&self,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
guard: TestGuard,
mix_options: MixOptions,
procedure_manager: ProcedureManagerRef,
register_procedure_loaders: bool,
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();

Expand Down Expand Up @@ -153,6 +154,7 @@ impl GreptimeDbStandaloneBuilder {
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
register_procedure_loaders,
)
.unwrap(),
);
Expand Down Expand Up @@ -194,7 +196,7 @@ impl GreptimeDbStandaloneBuilder {
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
self.datanode_wal_config.clone(),
);

let kv_backend_config = KvBackendConfig::default();
Expand All @@ -207,7 +209,7 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();

let wal_meta = self.meta_wal_config.clone();
let wal_meta = self.metasrv_wal_config.clone();
let mix_options = MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
Expand All @@ -218,7 +220,7 @@ impl GreptimeDbStandaloneBuilder {
wal_meta,
};

self.build_with(kv_backend, procedure_manager, guard, mix_options)
self.build_with(kv_backend, guard, mix_options, procedure_manager, true)
.await
}
}
Loading

0 comments on commit d5ba2fc

Please sign in to comment.