Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: wait for lsn lease duration after transition into AttachedSingle #9024

Merged
merged 17 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions pageserver/src/tenant/gc_block.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use super::remote_timeline_client::index::GcBlockingReason;
use tokio::time::Instant;
use utils::id::TimelineId;

use super::remote_timeline_client::index::GcBlockingReason;
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;

type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
#[derive(Default)]
struct Storage {
timelines_blocked: TimelinesBlocked,
/// The deadline before which we are blocked from GC so that
/// leases have a chance to be renewed.
lsn_lease_deadline: Option<Instant>,
}

impl Storage {
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
self.lsn_lease_deadline
.map(|d| Instant::now() < d)
.unwrap_or(false)
}
}

#[derive(Default)]
pub(crate) struct GcBlock {
Expand Down Expand Up @@ -42,6 +58,17 @@ impl GcBlock {
}
}

/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
/// length, we guarantee that all the leases we granted before will have a chance to renew
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
let deadline = Instant::now() + lsn_lease_length;
let mut g = self.reasons.lock().unwrap();
g.lsn_lease_deadline = Some(deadline);
}

yliang412 marked this conversation as resolved.
Show resolved Hide resolved
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();

Expand All @@ -64,7 +91,7 @@ impl GcBlock {
) -> anyhow::Result<bool> {
let (added, uploaded) = {
let mut g = self.reasons.lock().unwrap();
let set = g.entry(timeline.timeline_id).or_default();
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
let added = set.insert(reason);

// LOCK ORDER: intentionally hold the lock, see self.reasons.
Expand Down Expand Up @@ -95,7 +122,7 @@ impl GcBlock {

let (remaining_blocks, uploaded) = {
let mut g = self.reasons.lock().unwrap();
match g.entry(timeline.timeline_id) {
match g.timelines_blocked.entry(timeline.timeline_id) {
Entry::Occupied(mut oe) => {
let set = oe.get_mut();
set.remove(reason);
Expand All @@ -109,7 +136,7 @@ impl GcBlock {
}
}

let remaining_blocks = g.len();
let remaining_blocks = g.timelines_blocked.len();

// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
let uploaded = timeline
Expand All @@ -134,11 +161,11 @@ impl GcBlock {
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.is_empty() {
if g.timelines_blocked.is_empty() {
return;
}

g.remove(&timeline.timeline_id);
g.timelines_blocked.remove(&timeline.timeline_id);

BlockingReasons::clean_and_summarize(g).is_none()
};
Expand All @@ -149,10 +176,11 @@ impl GcBlock {
}

/// Initialize with the non-deleted timelines of this tenant.
pub(crate) fn set_scanned(&self, scanned: Storage) {
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
let mut g = self.reasons.lock().unwrap();
assert!(g.is_empty());
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
assert!(g.timelines_blocked.is_empty());
g.timelines_blocked
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));

if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
tracing::info!(summary=?reasons, "initialized with gc blocked");
Expand All @@ -166,6 +194,7 @@ pub(super) struct Guard<'a> {

#[derive(Debug)]
pub(crate) struct BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: bool,
timelines: usize,
reasons: enumset::EnumSet<GcBlockingReason>,
}
Expand All @@ -174,22 +203,24 @@ impl std::fmt::Display for BlockingReasons {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} timelines block for {:?}",
self.timelines, self.reasons
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
)
}
}

impl BlockingReasons {
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let mut reasons = enumset::EnumSet::empty();
g.retain(|_key, value| {
g.timelines_blocked.retain(|_key, value| {
reasons = reasons.union(*value);
!value.is_empty()
});
if !g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
} else {
Expand All @@ -198,14 +229,17 @@ impl BlockingReasons {
}

fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
if g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
None
} else {
let reasons = g
.timelines_blocked
.values()
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
}
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ impl TenantManager {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
match attach_conf.generation.cmp(&tenant.generation) {
Ordering::Equal => {
if attach_conf.attach_mode == AttachmentMode::Single {
tenant
.gc_block
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
}

// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.
Expand Down
18 changes: 1 addition & 17 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);

let mut first = true;
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
loop {
tokio::select! {
_ = cancel.cancelled() => {
Expand All @@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
first = false;

let delays = async {
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
koivunej marked this conversation as resolved.
Show resolved Hide resolved
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
Expand Down Expand Up @@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay(
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};

match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}

/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}

struct Iteration {
started_at: Instant,
period: Duration,
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_branch_and_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
"image_creation_threshold": "1",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
"lsn_lease_length": "0s",
}
)

Expand Down
4 changes: 3 additions & 1 deletion test_runner/regress/test_branch_behind.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#
def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
env = neon_env_builder.init_start(
initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"}
)

error_regexes = [
".*invalid branch start lsn.*",
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_branching.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def start_creating_timeline():


def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})

client = env.pageserver.http_client()

Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def test_uploads_and_deletions(
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_hot_standby.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep):
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
agressive_vacuum_conf = [
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_layer_eviction.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
"lsn_lease_length": "0s",
}

def tenant_update_config(changes):
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_pageserver_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}


Expand Down
2 changes: 2 additions & 0 deletions test_runner/regress/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries(
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}
)

Expand Down Expand Up @@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric(
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
"lsn_lease_length": "0s",
}
)

Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}

neon_env_builder.storage_controller_config = {
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def handler(request: Request):
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)

# Start running
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})

# Initial notification from tenant creation
assert len(notifications) == 1
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_storage_scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors(
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
"lsn_lease_length": "0s",
},
)

Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_tenant_detach.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant(conf={"lsn_lease_length": "0s"})

# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()
Expand Down
5 changes: 4 additions & 1 deletion test_runner/regress/test_timeline_gc_blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
assert (
gc_blocking
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
)

wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)
Expand Down
Loading