Skip to content

Commit

Permalink
storage: support raw value encode for API V2 (tikv#11139)
Browse files Browse the repository at this point in the history
* implement api v2 raw value

Signed-off-by: Andy Lok <[email protected]>

* storage: support raw value encode for API V2

ref tikv#10938 close tikv#11041
Signed-off-by: Andy Lok <[email protected]>

* fix clippy

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* improve

Signed-off-by: Andy Lok <[email protected]>

* fix clippy

Signed-off-by: Andy Lok <[email protected]>

* remove println

Signed-off-by: Andy Lok <[email protected]>

* improve test

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* add test

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

* disable test

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

* fix

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix test again

Signed-off-by: Andy Lok <[email protected]>

* fix test again

Signed-off-by: Andy Lok <[email protected]>

* fix test again

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

* fix overflow

Signed-off-by: Andy Lok <[email protected]>

* fix overflow

Signed-off-by: Andy Lok <[email protected]>

* improve

Signed-off-by: Andy Lok <[email protected]>

* improve comment

Signed-off-by: Andy Lok <[email protected]>

* add case

Signed-off-by: Andy Lok <[email protected]>

* improve comment

Signed-off-by: Andy Lok <[email protected]>

* fix clippy

Signed-off-by: Andy Lok <[email protected]>

* improve

Signed-off-by: Andy Lok <[email protected]>

* fix test

Signed-off-by: Andy Lok <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
andylokandy and ti-chi-bot authored Nov 26, 2021
1 parent 1da634c commit 225bbdd
Show file tree
Hide file tree
Showing 50 changed files with 1,921 additions and 1,267 deletions.
9 changes: 2 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ toml = "0.5"
txn_types = { path = "components/txn_types", default-features = false }
url = "2"
uuid = { version = "0.8.1", features = ["serde", "v4"] }
unsigned-varint = "0.7"
walkdir = "2"
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }
resource_metering = { path = "components/resource_metering" }
Expand Down
2 changes: 1 addition & 1 deletion cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn new_debug_executor(
kv_db_opts.set_paranoid_checks(!skip_paranoid_checks);
let kv_cfs_opts = cfg
.rocksdb
.build_cf_opts(&cache, None, cfg.storage.enable_ttl);
.build_cf_opts(&cache, None, cfg.storage.api_version());
let kv_path = PathBuf::from(kv_path).canonicalize().unwrap();
let kv_path = kv_path.to_str().unwrap();
let kv_db = match new_engine_opt(kv_path, kv_db_opts, kv_cfs_opts) {
Expand Down
4 changes: 2 additions & 2 deletions components/coprocessor_plugin_api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum PluginError {
///
/// If such an error appears, plugins can run some cleanup code and return early from the
/// request. The error will be passed to the client and the client might retry the request.
Other(Box<dyn Any>),
Other(String, Box<dyn Any>),
}

impl fmt::Display for PluginError {
Expand All @@ -41,7 +41,7 @@ impl fmt::Display for PluginError {
}
PluginError::Timeout(d) => write!(f, "timeout after {:?}", d),
PluginError::Canceled => write!(f, "request canceled"),
PluginError::Other(e) => write!(f, "{:?}", e),
PluginError::Other(s, _) => write!(f, "{}", s),
}
}
}
Expand Down
109 changes: 75 additions & 34 deletions components/engine_rocks/src/ttl_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::collections::HashMap;

use crate::decode_properties::DecodeProperties;
use crate::{RocksEngine, UserProperties};
use engine_traits::util::get_expire_ts;
use engine_traits::key_prefix;
use engine_traits::raw_value::RawValue;
use engine_traits::{Range, Result, TtlProperties, TtlPropertiesExt};
use kvproto::kvrpcpb::ApiVersion;
use rocksdb::{DBEntryType, TablePropertiesCollector, TablePropertiesCollectorFactory};
use tikv_util::error;

Expand Down Expand Up @@ -56,9 +58,9 @@ impl TtlPropertiesExt for RocksEngine {
}
}

#[derive(Default)]
/// Can only be used for default CF.
pub struct TtlPropertiesCollector {
api_version: ApiVersion,
prop: TtlProperties,
}

Expand All @@ -67,31 +69,45 @@ impl TablePropertiesCollector for TtlPropertiesCollector {
if entry_type != DBEntryType::Put {
return;
}
// only consider data keys
// Only consider data keys.
if !key.starts_with(keys::DATA_PREFIX_KEY) {
return;
}
// Only consider raw keys.
match self.api_version {
// TTL is not enabled in V1.
ApiVersion::V1 => unreachable!(),
// In V1TTL, txnkv is disabled, so all data keys are raw keys.
ApiVersion::V1ttl => (),
ApiVersion::V2 => {
let origin_key = &key[keys::DATA_PREFIX_KEY.len()..];
if !key_prefix::is_raw_key(origin_key) {
return;
}
}
}

let expire_ts = match get_expire_ts(value) {
Ok(ts) => ts,
Err(e) => {
error!("failed to get expire ts";
match RawValue::from_bytes(value, self.api_version) {
Ok(RawValue {
expire_ts: Some(expire_ts),
..
}) => {
self.prop.max_expire_ts = std::cmp::max(self.prop.max_expire_ts, expire_ts);
if self.prop.min_expire_ts == 0 {
self.prop.min_expire_ts = expire_ts;
} else {
self.prop.min_expire_ts = std::cmp::min(self.prop.min_expire_ts, expire_ts);
}
}
Err(err) => {
error!(
"failed to get expire ts";
"key" => log_wrappers::Value::key(key),
"value" => log_wrappers::Value::value(value),
"err" => %e,
"err" => %err,
);
0
}
};
if expire_ts == 0 {
return;
}

self.prop.max_expire_ts = std::cmp::max(self.prop.max_expire_ts, expire_ts);
if self.prop.min_expire_ts == 0 {
self.prop.min_expire_ts = expire_ts;
} else {
self.prop.min_expire_ts = std::cmp::min(self.prop.min_expire_ts, expire_ts);
_ => {}
}
}

Expand All @@ -103,28 +119,48 @@ impl TablePropertiesCollector for TtlPropertiesCollector {
}
}

pub struct TtlPropertiesCollectorFactory {}
pub struct TtlPropertiesCollectorFactory {
pub api_version: ApiVersion,
}

impl TablePropertiesCollectorFactory<TtlPropertiesCollector> for TtlPropertiesCollectorFactory {
fn create_table_properties_collector(&mut self, _: u32) -> TtlPropertiesCollector {
TtlPropertiesCollector::default()
TtlPropertiesCollector {
api_version: self.api_version,
prop: Default::default(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use engine_traits::util::append_expire_ts;
use tikv_util::time::UnixSecs;

#[test]
fn test_ttl_properties() {
test_ttl_properties_impl(ApiVersion::V1ttl);
test_ttl_properties_impl(ApiVersion::V2);
}

fn test_ttl_properties_impl(api_version: ApiVersion) {
let get_properties = |case: &[(&'static str, u64)]| -> Result<TtlProperties> {
let mut collector = TtlPropertiesCollector::default();
let mut collector = TtlPropertiesCollector {
api_version,
prop: Default::default(),
};
for &(k, ts) in case {
let mut v = vec![0; 10];
append_expire_ts(&mut v, ts);
collector.add(k.as_bytes(), &v, DBEntryType::Put, 0, 0);
let v = RawValue {
user_value: &[0; 10][..],
expire_ts: Some(ts),
};
collector.add(
k.as_bytes(),
&v.to_bytes(api_version),
DBEntryType::Put,
0,
0,
);
}
for &(k, _) in case {
let v = vec![0; 10];
Expand All @@ -135,23 +171,28 @@ mod tests {
};

let case1 = [
("za", 0),
("zb", UnixSecs::now().into_inner()),
("zc", 1),
("zd", u64::MAX),
("ze", 0),
("zr\0a", 0),
("zr\0b", UnixSecs::now().into_inner()),
("zr\0c", 1),
("zr\0d", u64::MAX),
("zr\0e", 0),
];
let props = get_properties(&case1).unwrap();
assert_eq!(props.max_expire_ts, u64::MAX);
assert_eq!(props.min_expire_ts, 1);
match api_version {
ApiVersion::V1 => unreachable!(),
ApiVersion::V1ttl => assert_eq!(props.min_expire_ts, 1),
// expire_ts = 0 is no longer a special case in API V2
ApiVersion::V2 => assert_eq!(props.min_expire_ts, 0),
}

let case2 = [("za", 0)];
let case2 = [("zr\0a", 0)];
assert!(get_properties(&case2).is_err());

let case3 = [];
assert!(get_properties(&case3).is_err());

let case4 = [("za", 1)];
let case4 = [("zr\0a", 1)];
let props = get_properties(&case4).unwrap();
assert_eq!(props.max_expire_ts, 1);
assert_eq!(props.min_expire_ts, 1);
Expand Down
2 changes: 2 additions & 0 deletions components/engine_traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ publish = false
failpoints = ["fail/failpoints"]

[dependencies]
bitflags = "1.3"
codec = { path = "../codec", default-features = false }
error_code = { path = "../error_code", default-features = false }
file_system = { path = "../file_system", default-features = false }
log_wrappers = { path = "../log_wrappers" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! Key prefix definistions and utils for API V2.
use codec::number::NumberDecoder;

pub const TIDB_RANGES: &[(&[u8], &[u8])] = &[(&[b'm'], &[b'm' + 1]), (&[b't'], &[b't' + 1])];
pub const TIDB_RANGES_COMPLEMENT: &[(&[u8], &[u8])] =
&[(&[], &[b'm']), (&[b'm' + 1], &[b't']), (&[b't' + 1], &[])];
Expand Down Expand Up @@ -33,9 +35,9 @@ pub fn is_txn_key(key: &[u8]) -> bool {
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum KeyPrefix {
/// Raw key prefix.
Raw { keyspace_id: usize },
Raw { keyspace_id: u64 },
/// Transaction key prefix.
Txn { keyspace_id: usize },
Txn { keyspace_id: u64 },
/// TiDB key prefix.
TiDB,
/// Unrecognised key prefix.
Expand All @@ -50,12 +52,20 @@ impl KeyPrefix {
}

match key[0] {
RAW_KEY_PREFIX => unsigned_varint::decode::usize(&key[1..])
.map(|(keyspace_id, rest)| (KeyPrefix::Raw { keyspace_id }, rest))
.unwrap_or((KeyPrefix::Unknown, key)),
TXN_KEY_PREFIX => unsigned_varint::decode::usize(&key[1..])
.map(|(keyspace_id, rest)| (KeyPrefix::Txn { keyspace_id }, rest))
.unwrap_or((KeyPrefix::Unknown, key)),
RAW_KEY_PREFIX => {
let mut slice = &key[1..];
match slice.read_var_u64() {
Ok(keyspace_id) => (KeyPrefix::Raw { keyspace_id }, slice),
Err(_) => (KeyPrefix::Unknown, key),
}
}
TXN_KEY_PREFIX => {
let mut slice = &key[1..];
match slice.read_var_u64() {
Ok(keyspace_id) => (KeyPrefix::Txn { keyspace_id }, slice),
Err(_) => (KeyPrefix::Unknown, key),
}
}
b'm' | b't' => {
// TiDB prefix is also a part of the user key, so don't strip the prefix.
(KeyPrefix::TiDB, key)
Expand All @@ -68,14 +78,15 @@ impl KeyPrefix {
#[cfg(test)]
mod tests {
use super::*;
use codec::number::NumberEncoder;

const KEYSPACE_ID_500: &[u8] = &[244, 3];

#[test]
fn test_keyspace_id() {
let mut buf = [0; 10];
let slice = unsigned_varint::encode::usize(500, &mut buf);
assert_eq!(slice, KEYSPACE_ID_500);
let mut buf = Vec::new();
buf.write_var_u64(500).unwrap();
assert_eq!(&buf, KEYSPACE_ID_500);
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions components/engine_traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
//! `engine_traits` and reexported from `engine` to ease the transition.
//! Likewise `engine_rocks` can temporarily call code from inside `engine`.
#![feature(min_specialization)]
#![feature(assert_matches)]

#[cfg(test)]
extern crate serde_derive;
Expand Down Expand Up @@ -336,6 +337,8 @@ pub use raft_engine::{CacheStats, RaftEngine, RaftEngineReadOnly, RaftLogBatch,
// These modules need further scrutiny

pub mod compaction_job;
pub mod key_prefix;
pub mod raw_value;
pub mod util;
pub use compaction_job::*;

Expand Down
Loading

0 comments on commit 225bbdd

Please sign in to comment.