Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe seems to work like Request #50

Closed
fgadaleta opened this issue Aug 14, 2023 · 4 comments
Closed

Subscribe seems to work like Request #50

fgadaleta opened this issue Aug 14, 2023 · 4 comments

Comments

@fgadaleta
Copy link
Contributor

in src/node/tcp/idle.rs function subscribe does

            loop {
                let packet_as_bytes: Vec<u8> = to_allocvec(&packet).unwrap();
                send_msg(&mut &stream, packet_as_bytes).await.unwrap();
                let msg = match await_response::<T>(&mut &stream, max_buffer_size).await {
                    Ok(msg) => msg,
                    Err(e) => {
                        error!("Subscription Error: {}", e);
                        continue;
                    }
                };
                let delta = Utc::now() - msg.timestamp;
                // println!("The time difference between msg tx/rx is: {} us",delta);
                if delta <= chrono::Duration::zero() {
                    // println!("Data is not newer, skipping to next subscription iteration");
                    continue;
                }

                let mut data = data.lock().await;

                *data = Some(msg);
                sleep(rate).await;
            }
            
            ```

makes a GET request and awaits for response for each message. 
That's not how a subscribe method works. That's how a request method works.
In a subscribe method a stream would be made available after a GET request 
@quietlychris
Copy link
Owner

You make a good point.

The current implementation for "subscribe" is basically just a tokio task on the Node that runs a continuous GET loop, and then whenever a received value has a timestamp newer than the one being held internally by the Node, it overwrites it. The initial reasoning for this "subscribe" implementation was to make the newest value on the Host available to the Node within a given timeframe (defined by the request rate), without necessarily requiring an on-demand request() call, since that might not always be available on high-latency or lossy wireless networks. There was also a sub-thought of trying to stick to a paradigm where the Node is never possibly interrupted by receiving a new message from the Host; this is less important since async became available on the Node as well (not that Meadow is currently making perfect use of that, per #49).

Ideally, I'd like to switch this to a more traditional subscribe paradigm, where after one SUBSCRIBE message, the Host keeps the stream open, and upon any new message being added to a topic, will push those onto the stream, which will then be sent to any subscribed Node. I'd toyed around with trying to implement this using sled's watch_prefix() function, but haven't had time to really dig into it yet. I'm not sure if this will happen in the near-term, unless someone else decides to implement it; it's not currently critical-path for anything I'm currently working on, and since sled itself is undergoing a massive re-write right now on it's path to a 1.0 release, I'm trying to avoid making too many Host-related changes related to Trees, since I'm unsure what their final API will look like anyway.

@quietlychris
Copy link
Owner

Preliminary commit using Tree structures for data storage is available via commit 67c96ba. Not available in main yet and doesn't directly address proper subscribe functionality, but this should open up being able to do more traditional subscribes at some point.

@quietlychris
Copy link
Owner

(Also related to #44)

@quietlychris
Copy link
Owner

This was addressed in #62

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants