Skip to content

Commit

Permalink
Removed redundant WorkerId and PromiseId (#944)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo authored Sep 18, 2024
1 parent 736e2b2 commit e8409f9
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 245 deletions.
25 changes: 7 additions & 18 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ impl From<u64> for Timestamp {

pub type ComponentVersion = u64;

#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct WorkerId {
pub component_id: ComponentId,
pub worker_name: String,
Expand All @@ -240,11 +242,6 @@ impl WorkerId {
format!("{}/{}", self.component_id, self.worker_name)
}

pub fn to_json_string(&self) -> String {
serde_json::to_string(self)
.unwrap_or_else(|_| panic!("failed to serialize worker id {self}"))
}

pub fn to_redis_key(&self) -> String {
format!("{}:{}", self.component_id.0, self.worker_name)
}
Expand Down Expand Up @@ -305,7 +302,7 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerId> for WorkerId {
}

/// Associates a worker-id with its owner account
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)]
pub struct OwnedWorkerId {
pub account_id: AccountId,
pub worker_id: WorkerId,
Expand Down Expand Up @@ -342,23 +339,15 @@ impl Display for OwnedWorkerId {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Encode, Decode)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct PromiseId {
pub worker_id: WorkerId,
pub oplog_idx: OplogIndex,
}

impl PromiseId {
pub fn from_json_string(s: &str) -> PromiseId {
serde_json::from_str(s)
.unwrap_or_else(|err| panic!("failed to deserialize promise id: {s}: {err}"))
}

pub fn to_json_string(&self) -> String {
serde_json::to_string(self)
.unwrap_or_else(|err| panic!("failed to serialize promise id {self}: {err}"))
}

pub fn to_redis_key(&self) -> String {
format!("{}:{}", self.worker_id.to_redis_key(), self.oplog_idx)
}
Expand Down
2 changes: 2 additions & 0 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bincode::enc::write::Writer;
use bincode::enc::Encoder;
use bincode::error::{DecodeError, EncodeError};
use bincode::{BorrowDecode, Decode, Encode};
use poem_openapi::NewType;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::sync::atomic::AtomicU64;
Expand All @@ -30,6 +31,7 @@ use crate::model::{
Encode,
Decode,
Default,
NewType,
)]
pub struct OplogIndex(u64);

Expand Down
150 changes: 12 additions & 138 deletions golem-service-base/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use bincode::{Decode, Encode};
use golem_common::model::component_metadata::ComponentMetadata;
use golem_common::model::{
ComponentId, ComponentType, ComponentVersion, ScanCursor, ShardId, Timestamp, WorkerFilter,
WorkerStatus,
ComponentId, ComponentType, ComponentVersion, PromiseId, ScanCursor, ShardId, Timestamp,
WorkerFilter, WorkerId, WorkerStatus,
};
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use poem_openapi::{Enum, NewType, Object, Union};
Expand Down Expand Up @@ -105,108 +105,21 @@ impl std::fmt::Display for VersionedComponentId {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)]
pub struct Empty {}

// NOTE: different from golem_common::model::WorkerId because of field name annotations
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct WorkerId {
pub component_id: ComponentId,
pub worker_name: Id,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, NewType)]
pub struct Id(String);

impl TryFrom<String> for Id {
type Error = &'static str;

fn try_from(value: String) -> Result<Self, Self::Error> {
let _ = valid_id(value.as_str())?;
Ok(Self(value))
}
}

impl Display for Id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.clone())
}
}

impl WorkerId {
pub fn new(component_id: ComponentId, worker_name: String) -> Result<Self, &'static str> {
Ok(Self {
component_id,
worker_name: worker_name.try_into()?,
})
}
}

fn valid_id(identifier: &str) -> Result<&str, &'static str> {
let length = identifier.len();
pub fn validate_worker_name(name: &str) -> Result<(), &'static str> {
let length = name.len();
if !(1..=100).contains(&length) {
Err("Identifier must be between 1 and 100 characters")
} else if identifier.contains(' ') {
Err("Identifier must not contain spaces")
} else if !identifier
Err("Worker name must be between 1 and 100 characters")
} else if name.contains(' ') {
Err("Worker name must not contain spaces")
} else if !name
.chars()
.all(|c| c.is_alphanumeric() || c == '_' || c == '-')
{
Err("Identifier must contain only alphanumeric characters, underscores, and dashes")
} else if identifier.starts_with('-') {
Err("Identifier must not start with a dash")
Err("Worker name must contain only alphanumeric characters, underscores, and dashes")
} else if name.starts_with('-') {
Err("Worker name must not start with a dash")
} else {
Ok(identifier)
}
}

impl From<golem_common::model::WorkerId> for WorkerId {
fn from(value: golem_common::model::WorkerId) -> Self {
Self {
component_id: value.component_id,
worker_name: Id(value.worker_name),
}
}
}

impl From<WorkerId> for golem_common::model::WorkerId {
fn from(value: WorkerId) -> Self {
Self {
component_id: value.component_id,
worker_name: value.worker_name.0,
}
}
}

impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerId> for WorkerId {
type Error = String;

fn try_from(
value: golem_api_grpc::proto::golem::worker::WorkerId,
) -> Result<Self, Self::Error> {
let worker_name: Id = value.name.try_into().map_err(String::from)?;

Ok(Self {
component_id: value
.component_id
.ok_or("Missing component_id")?
.try_into()?,
worker_name,
})
}
}

impl From<WorkerId> for golem_api_grpc::proto::golem::worker::WorkerId {
fn from(value: WorkerId) -> Self {
Self {
component_id: Some(value.component_id.into()),
name: value.worker_name.0,
}
}
}

impl std::fmt::Display for WorkerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.component_id, self.worker_name.0)
Ok(())
}
}

Expand All @@ -227,45 +140,6 @@ impl From<CompleteParameters> for golem_api_grpc::proto::golem::worker::Complete
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct PromiseId {
pub worker_id: WorkerId,
pub oplog_idx: u64,
}

impl TryFrom<golem_api_grpc::proto::golem::worker::PromiseId> for PromiseId {
type Error = String;

fn try_from(
value: golem_api_grpc::proto::golem::worker::PromiseId,
) -> Result<Self, Self::Error> {
Ok(Self {
worker_id: value
.worker_id
.ok_or("Missing field: worker_id")?
.try_into()?,
oplog_idx: value.oplog_idx,
})
}
}

impl From<PromiseId> for golem_api_grpc::proto::golem::worker::PromiseId {
fn from(value: PromiseId) -> Self {
Self {
worker_id: Some(value.worker_id.into()),
oplog_idx: value.oplog_idx,
}
}
}

impl Display for PromiseId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.worker_id, self.oplog_idx)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object, thiserror::Error)]
#[error("Invalid request: {details}")]
pub struct GolemErrorInvalidRequest {
Expand Down
8 changes: 3 additions & 5 deletions golem-worker-executor-base/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ use std::error::Error;
use std::fmt::{Display, Formatter};

use bincode::{Decode, Encode};
use golem_wasm_rpc::wasmtime::EncodingError;
use serde::{Deserialize, Serialize};
use tonic::Status;

use golem_api_grpc::proto::golem;
use golem_common::metrics::api::TraceErrorKind;
use golem_common::model::{ComponentId, PromiseId, ShardId, WorkerId};
use golem_wasm_rpc::wasmtime::EncodingError;
use tonic::Status;

use crate::model::InterruptKind;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub enum GolemError {
InvalidRequest {
details: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use std::{

use futures::{Sink, SinkExt, Stream, StreamExt};
use golem_api_grpc::proto::golem::worker::LogEvent;
use golem_common::model::WorkerEvent;
use golem_service_base::model::WorkerId;
use golem_common::model::{WorkerEvent, WorkerId};
use poem::web::websocket::Message;
use tonic::Status;
use tracing::{error, info};
Expand Down
25 changes: 14 additions & 11 deletions golem-worker-service-base/src/service/worker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ use golem_api_grpc::proto::golem::workerexecutor::v1::{
};
use golem_common::client::MultiTargetGrpcClient;
use golem_common::config::RetryConfig;
use golem_common::model::oplog::OplogIndex;
use golem_common::model::{
AccountId, ComponentId, ComponentVersion, FilterComparator, IdempotencyKey, ScanCursor,
Timestamp, WorkerFilter, WorkerStatus,
};
use golem_service_base::model::{
GolemErrorUnknown, PromiseId, ResourceLimits, WorkerId, WorkerMetadata,
};
use golem_common::model::{PromiseId, WorkerId};
use golem_service_base::model::{GolemErrorUnknown, ResourceLimits, WorkerMetadata};
use golem_service_base::routing_table::HasRoutingTableService;
use golem_service_base::{
model::{Component, GolemError},
Expand Down Expand Up @@ -292,7 +292,7 @@ where
_auth_ctx: &AuthCtx,
) -> WorkerResult<ConnectWorkerStream> {
let worker_id = worker_id.clone();
let worker_id_err: golem_common::model::WorkerId = worker_id.clone().into();
let worker_id_err: golem_common::model::WorkerId = worker_id.clone();
let stream = self
.call_worker_executor(
worker_id.clone(),
Expand Down Expand Up @@ -420,7 +420,7 @@ where
}
}
},
WorkerServiceError::internal
WorkerServiceError::internal,
).await?;

Ok(invoke_response)
Expand Down Expand Up @@ -480,7 +480,7 @@ where
}
}
},
WorkerServiceError::internal
WorkerServiceError::internal,
).await?;

Ok(invoke_response)
Expand Down Expand Up @@ -589,7 +589,7 @@ where
) -> WorkerResult<bool> {
let promise_id = PromiseId {
worker_id: worker_id.clone(),
oplog_idx: oplog_id,
oplog_idx: OplogIndex::from_u64(oplog_id),
};

let result = self
Expand Down Expand Up @@ -627,7 +627,7 @@ where
}
}
},
WorkerServiceError::internal
WorkerServiceError::internal,
)
.await?;
Ok(result)
Expand Down Expand Up @@ -710,7 +710,7 @@ where
}
}
},
WorkerServiceError::internal
WorkerServiceError::internal,
).await?;

Ok(metadata)
Expand Down Expand Up @@ -906,7 +906,7 @@ where
}
}).collect::<Result<Vec<_>, ResponseMapResult>>()
},
WorkerServiceError::internal
WorkerServiceError::internal,
).await?;

Ok(result.into_iter().flatten().collect())
Expand Down Expand Up @@ -1011,7 +1011,10 @@ where
_metadata: WorkerRequestMetadata,
_auth_ctx: &AuthCtx,
) -> WorkerResult<WorkerId> {
Ok(WorkerId::new(ComponentId::new_v4(), "no-op".to_string()).unwrap())
Ok(WorkerId {
component_id: ComponentId::new_v4(),
worker_name: "no-op".to_string(),
})
}

async fn connect(
Expand Down
Loading

0 comments on commit e8409f9

Please sign in to comment.