Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 19, 2024
1 parent 2bd18b7 commit f5bb39b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
26 changes: 22 additions & 4 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ impl EnvelopeBufferService {
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, false);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
self.push(buffer, envelope).await;
buffer.mark_ready(&project_key, false);
}
EnvelopeBuffer::Ready(project_key) => {
relay_log::trace!(
Expand Down Expand Up @@ -637,7 +637,7 @@ mod tests {
#[tokio::test]
async fn test_update_project() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _) = buffer_service();
let (service, global_tx, mut project_cache_rx, _) = buffer_service();

let addr = service.start();

Expand All @@ -646,17 +646,35 @@ mod tests {
)));

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();

addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent.
let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await
else {
panic!();
};

addr.send(EnvelopeBuffer::NotReady(project_key, envelope));

tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(project_cache_rx.len(), 1);
let message = project_cache_rx.recv().await;
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent again because 1 second passed.
assert_eq!(project_cache_rx.len(), 2);
assert_eq!(project_cache_rx.len(), 1);
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
))
}
}
3 changes: 3 additions & 0 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl ValidateEnvelope {
}
}

#[derive(Debug)]
pub struct UpdateRateLimits {
project_key: ProjectKey,
rate_limits: RateLimits,
Expand Down Expand Up @@ -233,6 +234,7 @@ pub struct AddMetricMeta {
/// This message is sent from the project buffer in case of the error while fetching the data from
/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the
/// persistent storage.
#[derive(Debug)]
pub struct UpdateSpoolIndex(pub HashSet<QueueKey>);

impl UpdateSpoolIndex {
Expand Down Expand Up @@ -276,6 +278,7 @@ pub struct UpdateProject(pub ProjectKey);
/// associated with a project.
///
/// See the enumerated variants for a full list of available messages for this service.
#[derive(Debug)]
pub enum ProjectCache {
RequestUpdate(RequestUpdate),
Get(GetProjectState, ProjectSender),
Expand Down

0 comments on commit f5bb39b

Please sign in to comment.