Skip to content

Commit

Permalink
feat(core): the restarting policy can be overridden via configuration…
Browse files Browse the repository at this point in the history
… for each actor.

`RestartParams` have been added for `RestartPolicy::always(..)` and `RestartPolicy::on_failure(..)`. The linear backoff has been replaced with an exponential approach, with a configurable limit for restarts.

BREAKING CHANGE: The default RestartPolicy is set to `RestartPolicy::never()`.
  • Loading branch information
sargarass committed Dec 15, 2023
1 parent 63bc111 commit ca1f3f1
Show file tree
Hide file tree
Showing 24 changed files with 601 additions and 141 deletions.
6 changes: 5 additions & 1 deletion elfo-configurer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use elfo_core::{
},
msg, scope,
signal::{Signal, SignalKind},
ActorGroup, ActorStatus, Addr, Blueprint, Context, Topology,
ActorGroup, ActorStatus, Addr, Blueprint, Context, RestartParams, RestartPolicy, Topology,
};

pub use self::protocol::*;
Expand All @@ -48,6 +48,10 @@ fn blueprint(topology: &Topology, source: ConfigSource) -> Blueprint {
let topology = topology.clone();
ActorGroup::new()
.stop_order(100)
.restart_policy(RestartPolicy::on_failure(RestartParams::new(
Duration::from_secs(5),
Duration::from_secs(30),
)))
.exec(move |ctx| Configurer::new(ctx, topology.clone(), source.clone()).main())
}

Expand Down
1 change: 1 addition & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ regex = "1.6.0"
thread_local = { version = "1.1.3", optional = true }
unicycle = "0.9.3"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"

[dev-dependencies]
elfo-utils = { version = "0.2.3", path = "../elfo-utils", features = ["test-util"] }
Expand Down
53 changes: 52 additions & 1 deletion elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use tracing::{error, info, warn};
use crate::{
envelope::Envelope,
errors::{SendError, TrySendError},
group::{RestartPolicy, TerminationPolicy},
group::TerminationPolicy,
mailbox::{Mailbox, RecvResult},
messages::{ActorStatusReport, Terminate},
msg,
request_table::RequestTable,
restarting::RestartPolicy,
scope,
subscription::SubscriptionManager,
Addr,
Expand Down Expand Up @@ -117,6 +118,49 @@ impl ActorStatusKind {
}
}

// === ActorStartInfo ===

/// A struct holding information related to an actor start.
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct ActorStartInfo {
/// The cause for the actor start, indicating why the actor is being
/// initialized.
pub cause: ActorStartCause,
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// An enum representing various causes for an actor to start.
#[non_exhaustive]
pub enum ActorStartCause {
/// The actor started because its group was mounted.
GroupMounted,
/// The actor started in response to a message.
OnMessage,
/// The actor started due to the restart policy.
Restarted,
}

impl ActorStartInfo {
pub(crate) fn on_group_mounted() -> Self {
Self {
cause: ActorStartCause::GroupMounted,
}
}

pub(crate) fn on_message() -> Self {
Self {
cause: ActorStartCause::OnMessage,
}
}

pub(crate) fn on_restart() -> Self {
Self {
cause: ActorStartCause::Restarted,
}
}
}

// === Actor ===

pub(crate) struct Actor {
Expand All @@ -127,6 +171,7 @@ pub(crate) struct Actor {
control: RwLock<ControlBlock>,
finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
status_subscription: Arc<SubscriptionManager>,
start_info: ActorStartInfo,
}

struct ControlBlock {
Expand All @@ -139,6 +184,7 @@ impl Actor {
pub(crate) fn new(
meta: Arc<ActorMeta>,
addr: Addr,
start_info: ActorStartInfo,
termination_policy: TerminationPolicy,
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Expand All @@ -153,6 +199,7 @@ impl Actor {
}),
finished: ManualResetEvent::new(false),
status_subscription,
start_info,
}
}

Expand Down Expand Up @@ -209,6 +256,10 @@ impl Actor {
&self.request_table
}

pub(crate) fn start_info(&self) -> ActorStartInfo {
self.start_info.clone()
}

pub(crate) fn restart_policy(&self) -> Option<RestartPolicy> {
self.control.read().restart_policy.clone()
}
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub(crate) struct SystemConfig {
pub(crate) logging: crate::logging::LoggingConfig,
pub(crate) dumping: crate::dumping::DumpingConfig,
pub(crate) telemetry: crate::telemetry::TelemetryConfig,
pub(crate) restarting: crate::restarting::RestartingConfig,
}

// === Secret ===
Expand Down
39 changes: 37 additions & 2 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use tracing::{info, trace};
use elfo_utils::unlikely;

use crate::{
actor::{Actor, ActorStatus},
actor::{Actor, ActorStartInfo, ActorStatus},
addr::Addr,
address_book::AddressBook,
config::AnyConfig,
demux::Demux,
dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
envelope::{AnyMessageBorrowed, AnyMessageOwned, Envelope, EnvelopeOwned, MessageKind},
errors::{RequestError, SendError, TryRecvError, TrySendError},
group::RestartPolicy,
mailbox::RecvResult,
message::{Message, Request},
messages, msg,
object::ObjectArc,
request_table::ResponseToken,
restarting::RestartPolicy,
routers::Singleton,
scope,
source::{SourceHandle, Sources, UnattachedSource},
Expand Down Expand Up @@ -624,6 +624,41 @@ impl<C, K> Context<C, K> {
}
}

/// Retrieves information related to the start of the actor.
///
/// # Panics
///
/// This method will panic if the context is pruned, indicating that the
/// required information is no longer available.
///
/// # Examples
///
/// ```
/// # use elfo_core as elfo;
/// # use elfo_core::{ActorStartCause, ActorStartInfo};
/// # async fn exec(mut ctx: elfo::Context) {
/// match ctx.start_info().cause {
/// ActorStartCause::GroupMounted => {
/// // The actor started because its group was mounted.
/// }
/// ActorStartCause::OnMessage => {
/// // The actor started in response to a message.
/// }
/// ActorStartCause::Restarted => {
/// // The actor started due to the restart policy.
/// }
/// _ => {}
/// }
/// # }
/// ```
pub fn start_info(&self) -> ActorStartInfo {
self.actor
.as_ref()
.and_then(|o| o.as_actor())
.map(|a| a.start_info())
.expect("pruned context is not supported")
}

fn pre_recv(&mut self) {
self.stats.on_recv();

Expand Down
42 changes: 2 additions & 40 deletions elfo-core/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
envelope::Envelope,
exec::{Exec, ExecResult},
object::{GroupHandle, GroupVisitor, Object},
restarting::RestartPolicy,
routers::Router,
runtime::RuntimeManager,
supervisor::Supervisor,
Expand Down Expand Up @@ -47,7 +48,7 @@ impl<R, C> ActorGroup<R, C> {
}

/// The behaviour on actor termination.
/// `RestartPolicy::on_failures` is used by default.
/// `RestartPolicy::never` is used by default.
pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
self.restart_policy = policy;
self
Expand Down Expand Up @@ -183,42 +184,3 @@ impl TerminationPolicy {

// TODO: add `stop_spawning`?
}

/// The behaviour on actor termination.
#[derive(Debug, Clone)]
pub struct RestartPolicy {
pub(crate) mode: RestartMode,
}

impl Default for RestartPolicy {
fn default() -> Self {
Self::on_failures()
}
}

#[derive(Debug, Clone)]
pub(crate) enum RestartMode {
Always,
OnFailures,
Never,
}

impl RestartPolicy {
pub fn always() -> Self {
Self {
mode: RestartMode::Always,
}
}

pub fn on_failures() -> Self {
Self {
mode: RestartMode::OnFailures,
}
}

pub fn never() -> Self {
Self {
mode: RestartMode::Never,
}
}
}
3 changes: 2 additions & 1 deletion elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use elfo_utils::time::Instant;
use crate::{memory_tracker::MemoryTracker, time::Interval};

use crate::{
actor::{Actor, ActorMeta, ActorStatus},
actor::{Actor, ActorMeta, ActorStartInfo, ActorStatus},
addr::{Addr, GroupNo},
config::SystemConfig,
context::Context,
Expand Down Expand Up @@ -173,6 +173,7 @@ pub async fn do_start<F: Future>(
let actor = Actor::new(
meta.clone(),
addr,
ActorStartInfo::on_group_mounted(),
Default::default(),
Arc::new(SubscriptionManager::new(ctx.clone())),
);
Expand Down
6 changes: 4 additions & 2 deletions elfo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ extern crate self as elfo_core;

// TODO: revise this list
pub use crate::{
actor::{ActorMeta, ActorStatus, ActorStatusKind},
actor::{ActorMeta, ActorStartCause, ActorStartInfo, ActorStatus, ActorStatusKind},
addr::Addr,
config::Config,
context::{Context, RequestBuilder},
envelope::Envelope,
group::{ActorGroup, Blueprint, RestartPolicy, TerminationPolicy},
group::{ActorGroup, Blueprint, TerminationPolicy},
local::{Local, MoveOwnership},
message::{Message, Request},
request_table::ResponseToken,
restarting::{RestartParams, RestartPolicy},
source::{SourceHandle, UnattachedSource},
topology::Topology,
};
Expand Down Expand Up @@ -65,6 +66,7 @@ pub mod remote;
#[cfg(all(feature = "network", not(feature = "unstable")))]
mod remote;
mod request_table;
mod restarting;
mod runtime;
mod source;
mod subscription;
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl From<u16> for NodeNo {

static NODE_NO: AtomicU16 = AtomicU16::new(0);

#[stability::unstable]
/// Returns the current `node_no`.
pub fn node_no() -> Option<crate::addr::NodeNo> {
crate::addr::NodeNo::from_bits(NODE_NO.load(Ordering::Relaxed))
Expand Down
69 changes: 69 additions & 0 deletions elfo-core/src/restarting/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{num::NonZeroU64, time::Duration};

use serde::{Deserialize, Deserializer};

use crate::restarting::restart_policy::{RestartParams, RestartPolicy};

#[derive(Debug, Default)]
pub(crate) struct RestartingConfig {
pub(crate) overriding_policy: Option<RestartPolicy>,
}

impl<'de> Deserialize<'de> for RestartingConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Option::<RestartPolicyConfig>::deserialize(deserializer).map(|p| RestartingConfig {
overriding_policy: p.map(Into::into),
})
}
}

#[derive(Debug, Deserialize)]
struct RestartParamsConfig {
#[serde(with = "humantime_serde")]
min_backoff: Duration,
#[serde(with = "humantime_serde")]
max_backoff: Duration,
#[serde(with = "humantime_serde", default)]
auto_reset: Option<Duration>,
max_retries: Option<NonZeroU64>,
#[serde(default)]
factor: Option<f32>,
}

#[derive(Debug, Deserialize)]
#[serde(tag = "when")]
enum RestartPolicyConfig {
Always(RestartParamsConfig),
OnFailure(RestartParamsConfig),
Never,
}

impl From<RestartPolicyConfig> for RestartPolicy {
fn from(p: RestartPolicyConfig) -> Self {
match p {
RestartPolicyConfig::Always(p) => RestartPolicy::always(p.into()),
RestartPolicyConfig::OnFailure(p) => RestartPolicy::on_failure(p.into()),
RestartPolicyConfig::Never => RestartPolicy::never(),
}
}
}

impl From<RestartParamsConfig> for RestartParams {
fn from(p: RestartParamsConfig) -> Self {
let restart_params = RestartParams::new(p.min_backoff, p.max_backoff);
let restart_params = p
.factor
.map(|f| restart_params.factor(f))
.unwrap_or(restart_params);
let restart_params = p
.max_retries
.map(|max_retries| restart_params.max_retries(max_retries))
.unwrap_or(restart_params);
p.auto_reset
.map(|auto_reset| restart_params.auto_reset(auto_reset))
.unwrap_or(restart_params)
}
}
5 changes: 5 additions & 0 deletions elfo-core/src/restarting/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod config;
mod restart_policy;

pub(crate) use self::config::RestartingConfig;
pub use restart_policy::{RestartParams, RestartPolicy};
Loading

0 comments on commit ca1f3f1

Please sign in to comment.