Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: creating steps, context.error() #18

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docs/workflows/how-to/fail-pipeline-gently.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add one more section covering the related problem of resource cleanup: remember we had to subscribe to events here to ensure finalization code runs also when an exception is thrown by another step

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remind me. Did we in the end understand why that event handler was necessary? Why wasn't store.dispose called by the sort function enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when an exception is thrown by a later step in the pipeline, the async generator is stopped without running its finally block, the event handlers are the only way we could find to ensure finalization code is called. I may try to create a small repro to show the problem and how to address it

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Fail pipeline without stopping it immediately
tpluscode marked this conversation as resolved.
Show resolved Hide resolved

The easiest way to stop a pipeline is to throw an exception. However, this will stop the pipeline immediately.
Consider the simple step implementation below.

```js
export default function streamFailingRandomly() {
return async function * (source) {
let total = 0;

for await (const chunk of source) {
if (Math.random() > 0.5) {
throw new Error('Random error');
} else {
total += chunk.length;
yield chunk;
}
}

this.logger.info(`Total processed: ${total}`);
}
}
```

When the error is thrown, the pipeline will stop immediately. No further chunks will be processed and the code following
the async loop will not be reached.
It is quite clear from a generator-style operation implementation but can be a surprise when implementing steps using low
level streams. In the latter case, the flush callback will also not be called.
tpluscode marked this conversation as resolved.
Show resolved Hide resolved

What's more, an exception thrown in one step affects other steps in the pipeline, preventing their respective flush
callbacks from being called.

:::note
`try..catch` block will not help here. When an error is thrown inside the async generator, it will be caught by the
pipeline and the processing is stopped immediately.
:::

## `this.error()` to the rescue

The solution is to use the `this.error()` method. It will instruct barnard that an error occurred but will not stop
processing. It is the responsibility of the step implementor how to handle the rest of the stream.

For the example above, the implementation could be changed to break the loop and call `this.error` instead of throwing.

```js
export default function streamFailingRandomly() {
return async function * (source) {
let error = null;
tpluscode marked this conversation as resolved.
Show resolved Hide resolved
let total = 0;

for await (const chunk of source) {
if (Math.random() > 0.5) {
this.error(new Error('Random error'));
break;
} else {
total += chunk.length;
yield chunk;
}
}

this.logger.info(`Total processed: ${total}`);
}.bind(this)
}
```

:::caution
When `this.error` is called, even when the implementor breaks the loop, the pipeline will still continue to the end.
All chunks which were already processed will be passed to the subsequent steps and the pipeline itself will not break.
This is important for example when writing to a HTTP endpoint. Unlike when throwing an exception, the HTTP request will
be sent, albeit possibly incomplete.
:::
166 changes: 166 additions & 0 deletions docs/workflows/how-to/implement-steps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
title: Implement steps
sidebar_position: 1
---

# Implementing steps

As explained on the [Pipeline Concepts](../explanations/pipeline.md#step) page, steps are linked from a pipeline using
`code:link` property. Out of the box barnard59 supports JavaScript steps. The implementation of a step is a factory
function which returns a stream or an async generator.

This page presents three common ways to implement JavaScript steps.

## Async generators

The recommended way to implement a step is to use an async generator. Simply return an async generator from the factory.
It must have a single argument which is the stream itself as an iterable. The generator can then yield values to the stream.

```js
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
// 1. prepare the step
const id = nanoid()
const ex = this.rdf.namespace('http://example.org/')

return async function* (stream) {
// 2. before first chunk
let total = 0
yield this.rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary)
tpluscode marked this conversation as resolved.
Show resolved Hide resolved

for await (const quad of stream) {
// 3. push chunks down the pipeline
total++
yield quad;
}

// 4. after last chunk
yield this.rdf.quad(rdf.blankNode(id), ex.totalQuads, total)
}.bind(this)
}
```

Commented are the four phases of a step:

1. The first phase is the preparation of the step. This happens before the pipeline
starts processing data. Can be `async`.
2. The second phase is right before the first chunk is processed.
3. The third phase is the processing of the stream. Implementors `yield` values to the stream. Individual chunks can be transformed, or skipped by continuing the loop without yielding.
4. The fourth phase is after the last chunk has been processed. This is the place to clean up resources or push additional data.

:::caution
The operations implemented using async generators always create [Duplex streams](https://nodejs.org/api/stream.html#stream_class_stream_duplex), which means that they will be used as both readable and writable streams. As a consequence,
they will not be able to be used as the first step in a pipeline, as the first step must be a readable stream.
:::

## through2

If, for some reason, you cannot use async generators, you can use the [through2](https://npm.im/through2) module as an
easy alternative to using the NodeJS streams API directly.

```js
import through2 from 'through2'
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

// 1. prepare the step
const id = nanoid()
const ex = rdf.namespace('http://example.org/')
let total = 0

return through2.obj(function (chunk, encoding, callback) {
// 3. push chunks down the pipeline
total++
this.push(chunk)
callback()
}, function (callback) {
// 4. after last chunk
this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary))
this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total))
callback()
})
}
```

Note that there are some important differences between the through2 step and async generators:

1. When using through2, it is not possible to capture a specific `before` stage. Any additional data must be pushed in the `flush` callback.
tpluscode marked this conversation as resolved.
Show resolved Hide resolved
- Alternatively, a library like [onetime](https://npm.im/onetime) can be used to create `before` stage is only executed once.
tpluscode marked this conversation as resolved.
Show resolved Hide resolved
2. The stream transform and flush functions are not bound to the context. This means that the context must be captured in a closure, or they must be implemented as arrow functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been tricked by this a few times, glad to see an explanation


:::caution
Similarly, a through2 step will always create a Duplex stream, and cannot be used as the first step in a pipeline.
:::

## NodeJS streams

If you need more control over the stream, you can implement the step using the NodeJS streams API directly. This is the
most verbose and powerful way.

Below is the above example of a step that uses the NodeJS streams API directly. Same principles as with through2 apply here.

```js
import { Transform } from 'stream'
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

// 1. prepare the step
const id = nanoid()
const ex = rdf.namespace('http://example.org/')
let total = 0

return new Transform({
objectMode: true,
transform (chunk, encoding, callback) {
// 3. push chunks down the pipeline
total++
this.push(chunk)
callback()
},
flush (callback) {
// 4. after last chunk
this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary))
this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total))
callback()
}
})
}
```

The major difference between this and methods above is the possibility to implement streams which are only `Readable` or
only `Writable`. This means that this method can be used to implement the first and last step in a pipeline.

For example, the following step will create a stream which emits a single quad and then ends.

```js
import { Readable } from 'stream'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

return new Readable({
objectMode: true,
read () {
this.push(rdf.quad(rdf.blankNode(), rdf.ns.rdf.type, rdf.ns.rdfs.Resource))
this.push(null)
}
})
}
```
Loading