Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client HandleNext Behavior #11

Open
charles-d-burton opened this issue Sep 12, 2024 · 2 comments
Open

Client HandleNext Behavior #11

charles-d-burton opened this issue Sep 12, 2024 · 2 comments

Comments

@charles-d-burton
Copy link

I ran into a bug with your client.HandleNext() function. When I setup a connection like you have in your examples after about a minute client.HandleNext() starts blasting my terminal with EOF errors. According to the function docs if it has an error it should disconnect and go into a reconnect, but that's not the case. The code in question is:

	go func() {
		for {
			if !client.IsConnected() {
				time.Sleep(time.Second)
				tryconnect()
				continue
			}

			err := client.HandleNext()
			if err != nil {
				slog.Error(err.Error())
			}
		}
	}()

After about a minute it hits that if err != nil block and just spins out with EOF

2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF
2024/09/12 13:36:40 ERROR EOF

Comments here indicate that it should fail and try to reconnect

// HandleNext reads from the wire and decodes MQTT packets.

Here's the full code for reference:

func startMQTT(send chan bool, received chan []byte, topics ...string) {

	client := mqtt.NewClient(mqtt.ClientConfig{
		Decoder: mqtt.DecoderNoAlloc{UserBuffer: make([]byte, 1500)},
		OnPub: func(_ mqtt.Header, _ mqtt.VariablesPublish, r io.Reader) error {
			message, _ := io.ReadAll(r)
			if len(message) > 0 {
				select {
				case received <- message:
				default:
					//Ignores message if buffer full
				}
			}
			println("received message")
			return nil
		},
	})

	var varConn mqtt.VariablesConnect
	varConn.SetDefaultMQTT([]byte("meetmon"))
	varConn.Username = []byte("<redacted>")
	varConn.Password = []byte("<redacted>")

	tryconnect := func() error {
		conn, err := net.Dial("tcp", "192.168.1.121:1883")
		if err != nil {
			return err
		}
		ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
		defer cancel()
		return client.Connect(ctx, conn, &varConn)
	}

	err := tryconnect()
	if err != nil {
		slog.Error("connect attempt failed")
		panic(err)
	}
	slog.Info("connected to mqtt broker")

	go func() {
		for {
			if !client.IsConnected() {
				time.Sleep(time.Second)
				tryconnect()
				continue
			}

			err := client.HandleNext()
			if err != nil {
				slog.Error(err.Error())
			}
		}
	}()

	slog.Info("setting config state in broker")
	config, err := (&MeetingConfig{}).GetConfig()
	if err != nil {
		panic(err)
	}
	fmt.Println(string(config))

	configPflags, err := mqtt.NewPublishFlags(mqtt.QoS0, false, true)
	if err != nil {
		panic(err)
	}

	vpub := mqtt.VariablesPublish{
		TopicName: []byte("homeassistant/binary_sensor/meeting/config"),
	}

	vpub.PacketIdentifier = randInt16()
	slog.Info("publishing config")
	err = client.PublishPayload(configPflags, vpub, config)
	if err != nil {
		panic(err)
	}

	go func() {
		pflags, err := mqtt.NewPublishFlags(mqtt.QoS0, false, false)
		if err != nil {
			panic(err)
		}
		vpub.TopicName = []byte("homeassistant/binary_sensor/meeting/state")

		for {
			if !client.IsConnected() {
				slog.Info("client is disconnected, sleeping for 1 second and retrying")
				time.Sleep(time.Second)
				continue
			}

			//TODO: Maybe this should be a select
			msg := <-send
			for {
				if msg {
					slog.Info("setting meeting state to ON")
					vpub.PacketIdentifier = randInt16()
					err := client.PublishPayload(pflags, vpub, []byte("ON"))
					if err == nil {
						break //retry until message is sent
					}
				} else {
          //Default to the not in a meeting state
          slog.Info("setting meeting state to OFF")
          err := client.PublishPayload(pflags, vpub, []byte("OFF"))
          if err == nil {
            break
          }
        }
        time.Sleep(time.Second)
			}
		}
	}()
	ctx := context.Background()

	var vsub mqtt.VariablesSubscribe
	vsub.TopicFilters = []mqtt.SubscribeRequest{mqtt.SubscribeRequest{TopicFilter: []byte("homeassistant/binary_sensor/meeting/state"), QoS: mqtt.QoS0}}
	vsub.PacketIdentifier = randInt16()
	err = client.Subscribe(ctx, vsub)
	if err != nil {
		println(err.Error())
		panic(err)
	}
}

@charles-d-burton
Copy link
Author

Maybe this behavior is expected? If I add another goroutine that has a ticker to ping the server i stop receiving error messages. I also added a client.Disconnect() if the handler throws an error. That seems to handle the problem of the disconnect not showing up in the system. All of this is an easy enough fix.

soypat added a commit that referenced this issue Sep 12, 2024
@soypat
Copy link
Owner

soypat commented Sep 12, 2024

io.EOF and net.ErrClosed are special errors. We should always disconnect on receiving them since there is no more data to be expected after receiving them. Changes in #12. Feel free to test :)

soypat added a commit that referenced this issue Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants