Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlatMap seems not parallelly WithPool or WithCPUPool #273

Open
arlor opened this issue Nov 25, 2020 · 2 comments
Open

FlatMap seems not parallelly WithPool or WithCPUPool #273

arlor opened this issue Nov 25, 2020 · 2 comments
Assignees
Labels
bug report Reported bug.

Comments

@arlor
Copy link

arlor commented Nov 25, 2020

FlatMap not run parallelly WithPool or WithCPUPool

See:

package main

import (
        "fmt"
        "time"

        "github.com/reactivex/rxgo/v2"
)

func main() {
        obs := rxgo.Just(1, 2, 3)().FlatMap(
                func(i rxgo.Item) rxgo.Observable {
                        time.Sleep(time.Second)
                        return rxgo.Just(i.V.(int)*10, i.V.(int)*100)()
                },
                rxgo.WithCPUPool(),
        )

        start := time.Now().Unix()
        for item := range obs.Observe() {
                fmt.Printf("item: %+v\n", item)
        }
        end := time.Now().Unix()
        fmt.Printf("cost: %d seconds\n", end-start)
}

Expect:
cost 1 seconds

Actual:
cost 3 seconds

@arlor arlor added the validated bug Bug validated by the RxGo team. label Nov 25, 2020
@teivah teivah added bug report Reported bug. and removed validated bug Bug validated by the RxGo team. labels Apr 7, 2021
@nitedani
Copy link

#279
Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).

	rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable {
		return doRequest(i.V.(string))
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
	rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) {
		val, err := <-doRequest(i.(string)).Observe()
		return val, err
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})

These two should do the same thing

@dalianzhu
Copy link

dalianzhu commented Aug 31, 2022

#279 Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).

	rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable {
		return doRequest(i.V.(string))
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
	rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) {
		val, err := <-doRequest(i.(string)).Observe()
		return val, err
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})

These two should do the same thing

These codes are not merged, the issue still exists

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug report Reported bug.
Projects
None yet
Development

No branches or pull requests

4 participants