Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add induceError and induceEnd #310

Closed
wants to merge 1 commit into from
Closed

Conversation

semmel
Copy link

@semmel semmel commented Feb 26, 2024

I needed that in one place, and I guess, this completes the API.

Should be backwards compatible.

What do you think?
Edit:
I saw that #1 already discusses induceEnd and induceError and warns about race conditions, only after writing my own implementation in JS and also completing this PR.

To illustrate my use case:
I have a loop which

  • polls data via fetch,
  • on success or tolerable error (e.g. timeout) the loop waits a certain amount of time
  • before it continues to the next iteration.

The results a processed by a consumer Stream into which I induceValue inside the loop. However, if fetch fails severely (e.g. with HTTP error code 400-499) I'd like to process that using the same consumer Stream. Thus, inside the loop I need to induceError.

@briancavalier
Copy link
Member

Hey @semmel, thanks for updating with more info after you read #1.

Based on the use case, would it work for you to implement polling using a periodic stream that you map to fetch requests? Here's a quick sketch of the idea:

const fetchPolling = awaitPromises(map(() => fetch(...), periodic(5000)))

You can always further map the fetch results after doing the awaitPromises if you need.

If that won't work, maybe implementing @most/types Stream interface will work for your use case. You can either implement it directly using the interface from @most/types, or you can use the newStream convenience function in @most/core.

Either way, you'll implement a function that receives a Sink (and a Scheduler). The Sink has methods you can use to produce events, errors, and end the stream. The Scheduler's currentTime method can (must, in fact) be used to get the Time value you need to pass to the Sink's methods.

The @most/disposable package has Disposable helpers you can use to create a Disposable to return from your Stream's run method.

Let me know if that sounds like any of the above will work for you.

@semmel
Copy link
Author

semmel commented Feb 28, 2024

Thanks @briancavalier for your answer! I did not know about @most/core/newStream — it is not documented, or is it?

I am sorry, but actually the fetch polling has nothing to do with my use of @most/adapter. That was misleading for you and me. (I got that fetch polling problem modelled into a stream just fine w/o createAdapter — even with exponential backoff in error case)

My actual problem is modelling asynchronous state machines (SM) with streams when not all input action streams can run from the beginning.

For example this SM

┌─────────────┐─────run─────▶┌──────────────────┐
│    armed    │              │     running      │
│    -----    │              │    ---------     │
│exit / fetch │              │do / poll status ↻│
└─────────────┘◀───cancel────┴──────────────────┘
                                       │
                 ┌──────────┐          │
                 │ complete │       ready
                 │ -------- │◀─────────┘
                 └──────────┘
const [inducePollEvent, pollStream] = createAdapter();
let disposePolling = identity;
// :: Stream 'armed'|'running'|'complete'
const state = flow(
  mergeArray([    // here go the SM actions
    map(() => "run", fromClick('#run')),  
    map(() => "cancel", fromClick('#cancel')),
    map(() => "ready", pollStream)    // <-- feedback stream <--
  ]),
  flatScan(
    (state, action) => {
      switch(state) {
        case 'armed':
          if (action === 'run') {
            return flow(
              fetch(`${api}/scheduleWork`), 
              fromPromise, 
              map(() => 'running'),
              tap(() => {
                // run the feedback stream imperatively <--
                disposePolling = runStatusPolling(inducePollEvent); 
              })
            );
          else 
            return throwError(new Error(`Illegal action ${action} in state ${state}`));
        case 'running':
          if (action === 'cancel') {
            disposePolling()
            return now('armed');
          }
          else if (action === 'ready') {
            return now('complete');
          }
          // …
        case 'complete':
          // …
      }
    },
    'armed'  //  initial state
  )
)

Not all input action streams can run from the beginning: In this example pollStream with the "ready" action ("running" -> "complete"), can only execute if we are in the state "running". It cannot run already in the initial state "armed". That's why I created such an imperative feedback stream.

In general I find this pattern of modelling a SM with streams and scan or flatScan quite useful.

Does this make sense? How would you model it without createAdapter()?

Btw. the implementation for

// flatScan :: ((acc: S, t: T) -> Stream S, S) -> Stream T -> Stream S  
flatScan = (reducer, seed) => pipe(
  concatMap((() => {
    let state = seed;
    return pipe(
      sourceValue => reducer(state, sourceValue),
      tap_o(next => { state = next; })
    );
  })())
);

@briancavalier
Copy link
Member

Sorry for the delay, @semmel. There are a few ways to model state machines mostjs without going into full imperative mode. For example, there's scan and its slightly more flexible sibling, loop. You can combine those with join, concatMap, or switchLatest to get higher order behavior. That's all fairly similar to what you've done with flatScan.

For your specific situation, though, the it looks like the real snag is that feedback from a later async action (polling) needs to loop back to an earlier node in the stream graph (the map -> "ready"). Most/core doesn't have an elegant way of doing that, and it's usually a situation where you end up needing something like most/adapter. The fact that mostjs Stream is "only" a Monad (thus represents only output with no input) is limiting in that way ... something like a Profunctor can model such things more effectively.

Given all of that, I think your solution is quite reasonable 😄

@semmel
Copy link
Author

semmel commented Mar 6, 2024

@briancavalier Thank you for your analysis, and I am sorry for the long example.
I am glad, you find my solution "quite reasonable" — that means that I might be using @most/core streams in the right way. 👍🏻

So back to my example and this PR:
If @most/adapter would provide induceError I could "loop back" stream failure of the polling into the state machine stream, which when would fail too.

So in conclusion, in general I might have overestimated the usefulness of error events ("stream failure") in the streams?

My intention to add induceError to adapter was to bring the loop-back stream on equal footing with the other source streams in the mergeArray argument: Their failure results directly in a failure of the state machine stream.

I have not made up my mind if caring too much about stream failure makes sense, given that I can always feed back polling application errors in Either.

Btw. I think the distinction between "Stream Failure vs. Application Errors" very valuable.

Conclusion No 2: So if you think induceError and induceEnd is a bad pattern, I'll trust you on that 😉

@briancavalier
Copy link
Member

tl;dr
I think the mostjs-community most-subject could be a good option for your use case. Take a look and let us know if it'll work for you.

Thank you for your analysis, and I am sorry for the long example.

No need to apologize! The example was very helpful.

I am glad, you find my solution "quite reasonable" — that means that I might be using @most/core streams in the right way. 👍🏻

I apologize if "quite reasonable" came across as negative in any way. This is a challenging use case, and I always use case, and I always end up using @most/adapter when I need to create loopbacks like this, too.


I'll try to explain a bit more of my perspective on why I'd like @most/adapter to stick with just induceEvent.

The concepts of "error" and "end" represent statefulness. Mostjs has stateful internals, but tries really hard not to expose that statefulness to developers so they don't accidentally create subtle dependencies on state, timing, ordering, etc. For example, slice is implemented with internal mutable counters, but its API is declarative, and never exposes those counters. There are exceptions, like @most/multicast, which indirectly expose some statefulness. It's always a tradeoff: multicasting is useful, but adding it introduces the possibility of new kinds of races, like between early events and subscribers.

Adding induceEnd and induceError directly exposes that statefulness to callers, and allows some new confusing situations and race conditions. For example, what should this code do?

induceEvent(x)
induceError(e)
induceEvent(y)

There are a few choices, but let's say it produces 1 event, x, and then the error, e, and never produces the event y (remember: error and end are both terminal state transitions).

That sounds reasonable, but now imagine that those lines of code are far apart in your code base, each in a different async operation. New problems arise: for example, you may have created a race between induceError(e) and induceEvent(y). If induceError(e) wins the race, induceEvent(y) will be silent. That could be very hard to understand and debug.

Here's another:

induceEnd()
induceError(e)

Again, imagine these are far apart and end up racing. Now sometimes your app fails and sometimes it doesn't. Or worse yet, important errors may go silent.

I hope that provides a bit more context!

@semmel
Copy link
Author

semmel commented Mar 8, 2024

tl;dr I think the mostjs-community most-subject could be a good option for your use case. Take a look and let us know if it'll work for you.

Yes, that looks like a perfect fit! I'll try it soon.

Btw. in most-subject's package.json is the dependency "@most/core": "0.14.0", a problem?
I could not find a changelog for @most/core v0.x -> v1.0.
Otherwise I'll try with the latest @most/core anyway and will report back.

I am glad, you find my solution "quite reasonable" — that means that I might be using @most/core streams in the right way. 👍🏻

I apologize if "quite reasonable" came across as negative in any way. This is a challenging use case, and I always use case, and I always end up using @most/adapter when I need to create loopbacks like this, too.

Hey, no need to apologise. I did not understand "quite reasonable" in a negative way, on the contrary. I meant the "" just as citations marks. (I am not a native speaker, thus I should phrase my sentences more clearly so they don't come across wrong.)

I'll try to explain a bit more of my perspective on why I'd like @most/adapter to stick with just induceEvent.

I hope that provides a bit more context!

Indeed I understand and I agree that providing such an api would give the wrong idea how to use streams.
It is tempting to make the api more general to benefit a particular current use case, but in the long run it could cause the problems you described. @most/core is exactly to avoid such problems.

Thank you again for your explanations.
I'll close this PR now with the perspective of using most-subject for my state machines.

@semmel semmel closed this Mar 8, 2024
@briancavalier
Copy link
Member

Thanks for the great discussion, @semmel. I'm glad most-subject will work for you.

@semmel semmel deleted the add-error-end branch March 11, 2024 08:38
@semmel
Copy link
Author

semmel commented Apr 16, 2024

Follow up: I've updated most-subject to @most/core v1.
I'd be happy for feedback on the PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants