Skip to content

Commit

Permalink
CreatedAt filter, runnig worker filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon committed Mar 29, 2024
1 parent 2319a6a commit 21d67c1
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 87 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ poem-openapi = { version = "4.0.0", features = [
prometheus = { version = "0.13.3", features = ["process"] }
proptest = "1.4.0"
prost = "0.12.3"
prost-types = "0.12.3"
rustls = { version = "0.22.2" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
Expand Down
1 change: 1 addition & 0 deletions golem-api-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = { workspace = true }
futures-core = { workspace = true }
golem-wasm-rpc = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true, features = [] }
serde = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion golem-api-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,27 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.file_descriptor_set_path(out_dir.join("services.bin"))
.extern_path(".wasm.rpc", "::golem_wasm_rpc::protobuf")
.type_attribute(
".",
"golem.worker.LogEvent",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.LogEvent.event",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.StdOutLog",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.StdErrLog",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.Level",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"golem.worker.Log",
"#[derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize)]",
)
.include_file("mod.rs")
Expand Down
15 changes: 11 additions & 4 deletions golem-api-grpc/proto/golem/worker/worker_filter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import "golem/common/string_filter_comparator.proto";
import "golem/common/filter_comparator.proto";
import "golem/worker/worker_id.proto";
import "golem/worker/worker_status.proto";
import "google/protobuf/timestamp.proto";

message WorkerFilter {
oneof filter {
WorkerNameFilter name = 1;
WorkerVersionFilter version = 2;
WorkerStatusFilter status = 3;
WorkerEnvFilter env = 4;
WorkerAndFilter and = 5;
WorkerOrFilter or = 6;
WorkerNotFilter not = 7;
WorkerCreatedAtFilter created_at = 4;
WorkerEnvFilter env = 5;
WorkerAndFilter and = 6;
WorkerOrFilter or = 7;
WorkerNotFilter not = 8;
}
}

Expand All @@ -42,6 +44,11 @@ message WorkerVersionFilter {
int32 value = 2;
}

message WorkerCreatedAtFilter {
golem.common.FilterComparator comparator = 1;
google.protobuf.Timestamp value = 2;
}

message WorkerStatusFilter {
WorkerStatus value = 1;
}
Expand Down
1 change: 1 addition & 0 deletions golem-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ poem = { workspace = true }
poem-openapi = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = "0.8.5"
range-set-blaze = "0.1.16"
serde = { workspace = true }
Expand Down
117 changes: 69 additions & 48 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::Add;
use std::str::FromStr;
Expand All @@ -27,7 +27,6 @@ use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{BorrowDecode, Decode, Encode};
use derive_more::FromStr;
use golem_api_grpc::proto::golem;
use poem_openapi::registry::{MetaSchema, MetaSchemaRef};
use poem_openapi::types::{ParseFromJSON, ParseFromParameter, ParseResult, ToJSON};
use poem_openapi::{Enum, Object};
Expand Down Expand Up @@ -119,6 +118,27 @@ impl<'de> bincode::BorrowDecode<'de> for Timestamp {
}
}

impl From<Timestamp> for prost_types::Timestamp {
fn from(value: Timestamp) -> Self {
let d = value
.0
.duration_since(iso8601_timestamp::Timestamp::UNIX_EPOCH);
Self {
seconds: d.whole_seconds(),
nanos: d.subsec_nanoseconds(),
}
}
}

impl From<prost_types::Timestamp> for Timestamp {
fn from(value: prost_types::Timestamp) -> Self {
Timestamp(
iso8601_timestamp::Timestamp::UNIX_EPOCH
.add(Duration::new(value.seconds as u64, value.nanos as u32)),
)
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
pub struct VersionedWorkerId {
#[serde(rename = "instance_id")]
Expand Down Expand Up @@ -521,21 +541,6 @@ impl WorkerMetadata {
}
}

impl From<WorkerMetadata> for golem_api_grpc::proto::golem::worker::WorkerMetadata {
fn from(value: WorkerMetadata) -> Self {
golem_api_grpc::proto::golem::worker::WorkerMetadata {
worker_id: Some(value.worker_id.worker_id.into_proto()),
account_id: Some(value.account_id.into()),
args: value.args,
env: HashMap::from_iter(value.env.iter().cloned()),
template_version: value.worker_id.template_version,
status: Into::<golem::worker::WorkerStatus>::into(value.last_known_status.status)
.into(),
retry_count: 0, // FIXME
}
}
}

/// Contains status information about a worker according to a given oplog index.
/// This status is just cached information, all fields must be computable by the oplog alone.
/// By having an associated oplog_idx, the cached information can be used together with the
Expand Down Expand Up @@ -793,31 +798,6 @@ pub fn parse_function_name(name: &str) -> ParsedFunctionName {
}
}

// #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode)]
// pub struct WorkerNameFilter {
// pub comparator: StringFilterComparator,
// pub value: String,
// }
//
// #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode)]
// pub struct WorkerStatusFilter {
// pub value: WorkerStatus
// }
//
//
// #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode)]
// pub struct WorkerVersionFilter {
// pub comparator: FilterComparator,
// pub value: i32,
// }
//
// #[derive(Clone, Debug, Serialize, Deserialize, Encode, Decode)]
// pub struct WorkerVersionEnvFilter {
// pub name: String,
// pub comparator: StringFilterComparator,
// pub value: String
// }

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
pub enum WorkerFilter {
Name {
Expand All @@ -831,10 +811,10 @@ pub enum WorkerFilter {
comparator: FilterComparator,
value: i32,
},
// CreatedAt {
// comparator: FilterComparator,
// value: String,
// },
CreatedAt {
comparator: FilterComparator,
value: Timestamp,
},
Env {
name: String,
comparator: StringFilterComparator,
Expand Down Expand Up @@ -878,15 +858,22 @@ impl WorkerFilter {
value,
} => {
let mut result = false;
let name = name.to_lowercase();
for env_value in metadata.env.clone() {
if env_value.0 == name {
if env_value.0.to_lowercase() == name {
result = comparator.matches(&env_value.1, &value);

break;
}
}
result
}
WorkerFilter::CreatedAt {
comparator: _,
value: _,
} => {
true // TODO implement when we will have timestamp in metadata
}
WorkerFilter::Status { value } => metadata.last_known_status.status == value,
WorkerFilter::Not(filter) => !filter.matches(metadata),
WorkerFilter::And(filters) => {
Expand Down Expand Up @@ -946,6 +933,10 @@ impl WorkerFilter {
pub fn new_status(value: WorkerStatus) -> Self {
WorkerFilter::Status { value }
}

pub fn new_created_at(comparator: FilterComparator, value: Timestamp) -> Self {
WorkerFilter::CreatedAt { comparator, value }
}
}

impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerFilter> for WorkerFilter {
Expand All @@ -965,6 +956,16 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerFilter> for WorkerFilte
golem_api_grpc::proto::golem::worker::worker_filter::Filter::Status(filter) => {
Ok(WorkerFilter::new_status(filter.value.try_into()?))
}
golem_api_grpc::proto::golem::worker::worker_filter::Filter::CreatedAt(filter) => {
let value = filter
.value
.map(|t| t.into())
.ok_or_else(|| "Missing value".to_string())?;
Ok(WorkerFilter::new_created_at(
filter.comparator.try_into()?,
value,
))
}
golem_api_grpc::proto::golem::worker::worker_filter::Filter::Env(filter) => Ok(
WorkerFilter::new_env(filter.name, filter.comparator.try_into()?, filter.value),
),
Expand All @@ -991,6 +992,7 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerFilter> for WorkerFilte
String,
>>(
)?;

Ok(WorkerFilter::new_or(filters))
}
},
Expand Down Expand Up @@ -1036,6 +1038,14 @@ impl From<WorkerFilter> for golem_api_grpc::proto::golem::worker::WorkerFilter {
},
)
}
WorkerFilter::CreatedAt { comparator, value } => {
golem_api_grpc::proto::golem::worker::worker_filter::Filter::CreatedAt(
golem_api_grpc::proto::golem::worker::WorkerCreatedAtFilter {
value: Some(value.into()),
comparator: comparator.into(),
},
)
}
WorkerFilter::Not(filter) => {
let f: golem_api_grpc::proto::golem::worker::WorkerFilter = (*filter).into();
golem_api_grpc::proto::golem::worker::worker_filter::Filter::Not(Box::new(
Expand Down Expand Up @@ -1200,10 +1210,21 @@ mod tests {

use crate::model::{
parse_function_name, AccountId, FilterComparator, StringFilterComparator, TemplateId,
VersionedWorkerId, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus,
Timestamp, VersionedWorkerId, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus,
WorkerStatusRecord,
};

#[test]
fn timestamp_conversion() {
let ts: Timestamp = Timestamp::now_utc();

let prost_ts: prost_types::Timestamp = ts.into();

let ts2: Timestamp = prost_ts.into();

assert_eq!(ts2, ts);
}

#[test]
fn parse_function_name_global() {
let parsed = parse_function_name("run-example");
Expand Down
25 changes: 21 additions & 4 deletions golem-worker-executor-base/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,10 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
.get(&template_id, filter)
.await?;

let result: Vec<golem::worker::WorkerMetadata> =
workers.iter().map(|worker| worker.clone().into()).collect();
let result: Vec<golem::worker::WorkerMetadata> = workers
.iter()
.map(|worker| self.to_proto_metadata(worker.clone()))
.collect();

Ok(result)
}
Expand Down Expand Up @@ -725,11 +727,26 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
)
.await?;

let result: Vec<golem::worker::WorkerMetadata> =
workers.iter().map(|worker| worker.clone().into()).collect();
let result: Vec<golem::worker::WorkerMetadata> = workers
.iter()
.map(|worker| self.to_proto_metadata(worker.clone()))
.collect();

Ok((new_cursor, result))
}

fn to_proto_metadata(&self, value: WorkerMetadata) -> golem::worker::WorkerMetadata {
golem::worker::WorkerMetadata {
worker_id: Some(value.worker_id.worker_id.into_proto()),
account_id: Some(value.account_id.into()),
args: value.args,
env: HashMap::from_iter(value.env.iter().cloned()),
template_version: value.worker_id.template_version,
status: Into::<golem::worker::WorkerStatus>::into(value.last_known_status.status)
.into(),
retry_count: 0,
}
}
}

impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync + 'static> UsesAllDeps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ impl<Ctx: WorkerCtx> RunningWorkerEnumerationService
let mut template_workers: Vec<WorkerMetadata> = vec![];
for worker in active_workers {
if worker.0.template_id == *template_id
&& (worker.1.metadata.last_known_status.status == WorkerStatus::Running
|| worker.1.metadata.last_known_status.status == WorkerStatus::Idle)
&& worker.1.metadata.last_known_status.status == WorkerStatus::Running
&& filter
.clone()
.map_or(true, |f| f.matches(&worker.1.metadata))
Expand Down
Loading

0 comments on commit 21d67c1

Please sign in to comment.