Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: init PgElection with candidate registration #5209

Merged
merged 23 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ const METADKV_CREATION: &str =

const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K";

const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1";
pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1";

const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K";
pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K";

const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K";

Expand All @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str =

const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v";

const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";
pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;";

const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;";

Expand All @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >=
const RANGE_DELETE_FULL_RANGE: &str =
"DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;";

const CAS: &str = r#"
pub const CAS: &str = r#"
WITH prev AS (
SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2
), update AS (
Expand All @@ -79,7 +79,7 @@ WHERE
SELECT k, v FROM prev;
"#;

const PUT_IF_NOT_EXISTS: &str = r#"
pub const PUT_IF_NOT_EXISTS: &str = r#"
WITH prev AS (
select k,v from greptime_metakv where k = $1
), insert AS (
Expand Down
16 changes: 13 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use tonic::transport::server::{Router, TcpIncoming};

use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
Expand Down Expand Up @@ -224,9 +226,17 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
// TODO(jeremy, weny): implement election for postgres
(kv_backend, None)
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
)
.await?;
(kv_backend, Some(election))
}
};

Expand Down
19 changes: 15 additions & 4 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

pub mod etcd;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;

use std::fmt;
use std::fmt::{self, Debug};
use std::sync::Arc;

use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
Expand All @@ -26,10 +27,20 @@ use crate::metasrv::MetasrvNodeInfo;
pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";

const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<LeaderKey>),
StepDown(Arc<LeaderKey>),
Elected(Arc<dyn LeaderKey>),
StepDown(Arc<dyn LeaderKey>),
}

pub trait LeaderKey: Send + Sync + Debug {
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
fn name(&self) -> &[u8];
fn key(&self) -> &[u8];
fn rev(&self) -> i64;
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
fn lease(&self) -> i64;
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
}

impl fmt::Display for LeaderChangeMessage {
Expand Down
54 changes: 34 additions & 20 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,41 @@ use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use etcd_client::{
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};

use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::election::{
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

impl LeaderKey for EtcdLeaderKey {
fn name(&self) -> &[u8] {
self.name()
}

fn key(&self) -> &[u8] {
self.key()
}

fn rev(&self) -> i64 {
self.rev()
}

fn lease(&self) -> i64 {
self.lease()
}
}

pub struct EtcdElection {
leader_value: String,
client: Client,
Expand Down Expand Up @@ -75,14 +98,14 @@ impl EtcdElection {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
key.name_str(),
String::from_utf8_lossy(key.name()),
key.lease()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
key.name_str(),
String::from_utf8_lossy(key.name()),
key.lease()
);
}
Expand Down Expand Up @@ -133,9 +156,6 @@ impl Election for EtcdElection {
}

async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

let mut lease_client = self.client.lease_client();
let res = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
Expand Down Expand Up @@ -239,7 +259,7 @@ impl Election for EtcdElection {
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
keep_lease_duration,
self.keep_alive(&mut keeper, &mut receiver, leader),
self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
)
.await
{
Expand All @@ -262,12 +282,9 @@ impl Election for EtcdElection {
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
self.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
.context(error::SendLeaderChangeSnafu)?;
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -303,7 +320,7 @@ impl EtcdElection {
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: &LeaderKey,
leader: EtcdLeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
Expand All @@ -322,12 +339,9 @@ impl EtcdElection {
{
self.infancy.store(true, Ordering::Relaxed);

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
self.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader)))
.context(error::SendLeaderChangeSnafu)?;
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading
Loading