Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't get a second message with my subscriber #232

Open
EvilWatermelon opened this issue Jun 28, 2024 · 0 comments
Open

Can't get a second message with my subscriber #232

EvilWatermelon opened this issue Jun 28, 2024 · 0 comments

Comments

@EvilWatermelon
Copy link

EvilWatermelon commented Jun 28, 2024

The first message comes without any problem but after that I'm getting a StreamError: Closed(..) and no other error messages.
What does this mean and how can I fix this problem?

pub async fn mqtt_connect() -> Result<AsyncClient, paho_mqtt::errors::Error> {

    let mqtt_url: String = std::env::var("MQTT_CON").expect("MQTT_CON must be set.");
    let host: String = std::env::args()
        .nth(1)
        .unwrap_or_else(|| mqtt_url);
    let id: &str = "isumis_backend";
    /*
     Create the client. Use an ID for a persistent session.
     A real system should try harder to use a unique ID.
    */
    let create_opts: CreateOptions = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(id)
        .finalize();

    // Create the client connection
    let cli: AsyncClient = AsyncClient::new(create_opts).unwrap_or_else(|e| {
        error!("Error creating the client: {}", e);
        process::exit(1);
    });

    // Define the set of options for the connection
    let lwt: Message = Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);

    let conn_opts: ConnectOptions = mqtt::ConnectOptionsBuilder::new()
        .clean_session(false)
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        //.keep_alive_interval(Duration::from_secs(15))
        .will_message(lwt)
        .finalize();

    // Make the connection to the broker
    info!("Connecting to the MQTT server...");
    cli.connect(conn_opts).await?;

    match cli.is_connected() {
        true => {
            warn!("Connection unsuccessful, reconnecting...");
            cli.reconnect();
        },
        false => info!("Connection to MQTT successful.")
    };

    Ok(cli)
}

pub fn subscriber() -> () {

    if let Err(err) = block_on(async {

        let mut cli: AsyncClient = mqtt_connect().await?;

        // Get message stream before connecting.
        let strm: mqtt::AsyncReceiver<Option<Message>> = cli.get_stream(25);

        info!("Subscribing to topics: {:?}", TOPICS);
        //let sub_opts: Vec<mqtt::SubscribeOptions> = vec![mqtt::SubscribeOptions::with_retain_as_published(); TOPICS.len()];
        //cli.subscribe_many_with_options(TOPICS, QOS, &sub_opts, None)
        cli.subscribe_many(TOPICS, QOS).await?;

        // Just loop on incoming messages.
        info!("Waiting for messages...");

        /*
         Note that we're not providing a way to cleanly shut down and
         disconnect. Therefore, when you kill this app (with a ^C or
         whatever) the server will get an unexpected drop and then
         should emit the LWT message.
        */
        message_loop(cli, strm).await;

        // Explicit return type for the async block
        Ok::<(), mqtt::Error>(())
    }) {
        error!("{}", err);
    }
}

async fn message_loop(cli: AsyncClient, mut strm: mqtt::AsyncReceiver<Option<Message>>) -> () {

    while let Some(msg_opt) = strm.next().await {
        if let Some(sub_msg) = msg_opt {

            if sub_msg.retained() {
                info!("(R) ");
            }

            let msg: &str = match std::str::from_utf8(sub_msg.payload()) {
                Ok(v) => v,
                Err(e) => panic!("Invalid UTF-8 sequence: {e}"),
            };

            let payload_json: Value = serde_json::from_str(msg).unwrap();

            info!("New message: {}", &payload_json);

            match sub_msg.topic() {
                TRAFFIC => network_traffic(payload_json)
                    .await
                    .expect("Could not handle network traffic"),
                FILE_INFO => handle_file_info(msg)
                    .await
                    .expect("Could not handle scanned app results"),
                BLACKLIGHT => handle_blacklight(payload_json)
                    .await
                    .expect("Could not handle blacklight results"), 
                &_ => error!("No suitable topic name found")
            };

        } else {
            // A "None" means we were disconnected. Try to reconnect...
            warn!("Lost connection. Attempting reconnect.");
            while let Err(err) = cli.reconnect().await {
                error!("Error reconnecting: {}", err);
                actix_web::rt::time::sleep(Duration::from_millis(1000)).await;
            }
        }
    }
}

I recieve a part of the message and then I get the error

2024-06-28T12:49:07.915945141+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 m->c->connect_state = 0
2024-06-28T12:49:07.916177041+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 11 my_backend <- PUBLISH msgid: 0 qos: 0 retained: 0 payload len(1172): {"host":"example.com
2024-06-28T12:49:07.916257531+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 Calling messageArrived for client isumis_backend, queue depth 0
2024-06-28T12:49:07.916308994+02:00 DEBUG paho_mqtt::async_client - Message arrived. Client: 0x7fb9f0024160, topic: 0x7fb9f0026c10 len 31 cmsg: 0x7fb9f00240a0: MQTTAsync_message { struct_id: [77, 81, 84, 77], struct_version: 1, payloadlen: 1172, payload: 0x7fb9ec002610, qos: 0, retained: 0, dup: 0, msgid: 0, properties: MQTTProperties { count: 0, max_count: 0, length: 0, array: 0x0 } }
2024-06-28T12:49:07.916444203+02:00 ERROR paho_mqtt::async_client - Stream error: Closed(..)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant