Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to correctly read from the tail of a stream? #410

Open
junglie85 opened this issue Mar 23, 2023 · 1 comment
Open

How to correctly read from the tail of a stream? #410

junglie85 opened this issue Mar 23, 2023 · 1 comment

Comments

@junglie85
Copy link

I want to read from the tail of a stream, but when I run the below program, this doesn't appear to be happening.

1st run, there is no output, I'm expecting a b c d e f
2nd run, the output is a b c d e f
3rd run the output is a b c d e f a b c d e f, I'm expecting a b c d e f

I suspect I've misunderstood a concept, but would appreciate some clarification on how to correctly read only new data?

use std::error::Error;

use pravega_client::{
    client_factory::ClientFactoryAsync,
    event::{reader_group::ReaderGroupConfigBuilder, EventWriter},
};
use pravega_client_config::ClientConfigBuilder;
use pravega_client_shared::{
    Retention, ScaleType, Scaling, Scope, ScopedStream, Stream, StreamConfiguration,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = ClientConfigBuilder::default()
        .controller_uri("localhost:9090")
        .build()?;
    let handle = tokio::runtime::Handle::try_current()?;
    let factory = ClientFactoryAsync::new(config, handle);

    let scope_name = "tutorial";
    let stream_name = "parallel-key-numbers";

    let scoped_stream = ScopedStream {
        scope: Scope::from(scope_name.to_string()),
        stream: Stream::from(stream_name.to_string()),
    };

    setup_server(&factory, scoped_stream.clone(), 5).await?;

    let mut event_writer = factory.create_event_writer(scoped_stream.clone());
    write_event(&mut event_writer, "a", "my_key").await?;
    write_event(&mut event_writer, "b", "my_key").await?;
    write_event(&mut event_writer, "c", "my_key").await?;
    write_event(&mut event_writer, "d", "my_key").await?;
    write_event(&mut event_writer, "e", "my_key").await?;
    write_event(&mut event_writer, "f", "my_key").await?;
    event_writer.flush().await?;

    let event_reader_group = factory
        .create_reader_group_with_config(
            scoped_stream.scope.clone(),
            "parallel_numbers_reader_group".to_string(),
            ReaderGroupConfigBuilder::default()
                .read_from_tail_of_stream(scoped_stream.clone())
                .build(),
        )
        .await;

    let mut event_reader = event_reader_group
        .create_reader("reader_1".to_string())
        .await;

    while let Some(mut slice) = event_reader.acquire_segment().await? {
        while let Some(event) = slice.next() {
            let text = String::from_utf8(event.value)?;
            println!("{}", text);
        }
    }

    Ok(())
}

async fn setup_server(
    factory: &ClientFactoryAsync,
    scoped_stream: ScopedStream,
    min_num_segments: i32,
) -> Result<(), Box<dyn Error>> {
    let controller_client = factory.controller_client();

    if !controller_client
        .check_scope_exists(&scoped_stream.scope)
        .await?
    {
        controller_client.create_scope(&scoped_stream.scope).await?;
    }

    if !controller_client
        .check_stream_exists(&scoped_stream)
        .await?
    {
        let scaling = Scaling {
            scale_type: ScaleType::FixedNumSegments,
            target_rate: 0,
            scale_factor: 0,
            min_num_segments,
        };
        let retention = Retention {
            ..Default::default()
        };
        let stream_config = StreamConfiguration::new(scoped_stream, scaling, retention, None);
        controller_client.create_stream(&stream_config).await?;
    }

    Ok(())
}

async fn write_event(
    event_writer: &mut EventWriter,
    data: &str,
    routing_key: &str,
) -> Result<(), Box<dyn Error>> {
    event_writer
        .write_event_by_routing_key(routing_key.to_string(), data.as_bytes().to_vec())
        .await
        .await??;

    Ok(())
}
@junglie85
Copy link
Author

I've been experimenting with this some more and the behaviour I am seeing is that the reads are happening from the point in the stream where the tail offset was when I first created the reader group. Subsequent runs of the same program don't fail due to a group already existing with the same name, which I find surprising. I was also expecting that after reading a segment, the reader group would be updated with the new stream offset. What are the mechanics that this process is supposed to follow?

To avoid the program thinking the tail was earlier in the stream on subsequent runs of the program, I tried deleting the reader group. However, when I next ran the program, it panicked (but only after apparently successfully re-creating the deleted group) on writing, with a message that the reader group had already been deleted:

thread 'main' panicked at 'should compute segments: ReaderAlreadyOfflineError { error_msg: "the ReaderGroup is deleted", source: SyncPreconditionError { error_msg: "Precondition failure" } }', C:\Users\junglie85\.cargo\registry\src\github.com-1ecc6299db9ec823\pravega-client-0.3.2\src\event\reader.rs:158:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Should it be possible to reuse deleted reader groups?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant