Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sometimes get "frame error" on rx/tx, but retry method call can fix it #306

Open
lynxzp opened this issue Jun 2, 2022 · 1 comment
Open

Comments

@lynxzp
Copy link

lynxzp commented Jun 2, 2022

Sometimes call RecvMessage() return recv frame error and SendFrame() return send frame error. But just retry call fix this problem.

Steps to reproduce: run this program (for loop is important part):

package main

import (
	"github.com/zeromq/goczmq"
	"log"
	"os"
)

func main() {

	rxUrl := "tcp://*:5555"
	txUrl := "tcp://127.0.0.1:5555"

	if len(os.Args) != 3 {
		log.Println("Usage: proxy rxUrl txUrl")
		log.Println("Using default values: proxy \"tcp://*:5555\" \"tcp://127.0.0.1:5555\"")
	} else {
		rxUrl = os.Args[1]
		txUrl = os.Args[2]
	}

	router, err := goczmq.NewRouter(rxUrl)
	if err != nil {
		log.Fatal(err)
	}
	defer router.Destroy()

	dealer, err := goczmq.NewDealer(txUrl)
	if err != nil {
		log.Fatal(err)
	}
	defer dealer.Destroy()

	err = dealer.SendFrame([]byte("123"), goczmq.FlagNone)
	if err != nil {
		log.Fatal(err)
	}

	seq := 0
	for {
		var data [][]byte
		seq++

		for {
			data, err = router.RecvMessage()
			if err != nil {
				log.Printf("rx error \"%v\" on iteration %v\n", err, seq)
			} else {
				break
			}
		}

		for {
			err = dealer.SendFrame(data[1], goczmq.FlagNone)
			if err != nil {
				log.Printf("tx error \"%v\" on iteration %v\n", err, seq)
			} else {
				break
			}

		}
	}
}

Sample output:

2022/06/02 16:17:18 Usage: proxy rxUrl txUrl
2022/06/02 16:17:18 Using default values: proxy "tcp://*:5555" "tcp://127.0.0.1:5555"
2022/06/02 16:17:18 rx error "recv frame error" on iteration 12506
2022/06/02 16:17:19 rx error "recv frame error" on iteration 32214
2022/06/02 16:17:19 rx error "recv frame error" on iteration 48934
2022/06/02 16:17:19 rx error "recv frame error" on iteration 71562
2022/06/02 16:17:20 rx error "recv frame error" on iteration 77118
2022/06/02 16:17:25 rx error "recv frame error" on iteration 342318
2022/06/02 16:17:25 rx error "recv frame error" on iteration 348342
2022/06/02 16:17:25 rx error "recv frame error" on iteration 351665
2022/06/02 16:17:26 tx error "send frame error" on iteration 365049
2022/06/02 16:17:26 rx error "recv frame error" on iteration 377613
2022/06/02 16:17:26 rx error "recv frame error" on iteration 393592
2022/06/02 16:17:27 rx error "recv frame error" on iteration 414721
2022/06/02 16:17:27 rx error "recv frame error" on iteration 424175
^C
@nubunto
Copy link

nubunto commented Jul 7, 2022

Seeing same behavior. You can reproduce it with this:

Server:

socket, err := goczmq.NewPub("tcp://*:5556,ipc://weather.ipc")
if err != nil {
	log.Fatal(err)
}
defer socket.Destroy()
rand.Seed(time.Now().UnixNano())
for {
	zipcode := 59937 //rand.Intn(100000)
	temperature := rand.Intn(215) - 80
	relhumidity := rand.Intn(50) + 10

	msg := fmt.Sprintf("%d %d %d", zipcode, temperature, relhumidity)
	fmt.Println("sending %s", msg)
	socket.SendFrame([]byte(msg), goczmq.FlagNone)

	time.Sleep(1000 * time.Millisecond)
}

Client:

socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
	log.Fatal(err)
}

defer socket.Destroy()

var totalTemp int64
for i := 0; i < 101; i++ {
	fmt.Println("receiving message...")
	datapt, err := socket.RecvMessage()
	if err != nil {
		log.Fatalf("error: %w", err)
	}

	temps := strings.Split(string(datapt[0]), " ")
	temp, err := strconv.ParseInt(temps[1], 10, 64)
	if err == nil {
		fmt.Printf("received %s\n", datapt[0])
		totalTemp += temp
	}
}

fmt.Printf("average temp was %d", totalTemp/100)

This will fail randomly everytime. But if you use a poller, it never fails. Like this:

socket, err := goczmq.NewSub("tcp://127.0.0.1:5556", "59937")
if err != nil {
	log.Fatal(err)
}

poller, err := goczmq.NewPoller(socket)
if err != nil {
	log.Fatal(err)
}

defer socket.Destroy()

var totalTemp int64
for i := 0; i < 101; i++ {
	fmt.Println("receiving message...")
	s := poller.Wait(2000)
	if s == nil {
		continue
	}
	datapt, err := s.RecvMessage()
	if err != nil {
		log.Fatalf("error: %w", err)
	}

	temps := strings.Split(string(datapt[0]), " ")
	temp, err := strconv.ParseInt(temps[1], 10, 64)
	if err == nil {
		fmt.Printf("received %s\n", datapt[0])
		totalTemp += temp
	}
}

fmt.Printf("average temp was %d", totalTemp/100)

Interestingly enough, poller.Wait sometimes returns nil. I suspect it might be related?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants