Skip to content

Commit

Permalink
Adding support for mix&match pipelines (#1954)
Browse files Browse the repository at this point in the history
* added tests

* Added test

* Implemented support for named 'pipelines' property in pino.transport

* Updated pino.d.ts and api.md to include pipeline within targets

* - Reverted changes related to the support of named pipelines
- Implemented support for mixed target&pipeline definitions  within `targets` in `transport.js`
- Merged logic from both `worker.js` and `worker-pipeline.js` into `worker.js`
- Fixed `pipeline.test.js`
- Fixed docs to reflect changes above

TODO:
 - Remove `worker-pipeline.js`
 - Fix `transport.js` to use only `worker.js`
 - Fix related docs
 - Fix UTs

* - Removed `worker-pipeline.js`
- Updated docs to remove mentions of `worker-pipeline.js`
- Fixed failing UTs
- Fixed `transport.js` to use only `worker.js` also when `pipeline` is defined
- Fixed `worker.js` to work properly when only `pipeline` is defined

* added a simple flow schema to worker.js

* added a simple flow schema to worker.js

* Added a special case in worker.js to skip the multistream instance when a single target or pipeline is defined

* - Added optional 'level' property to TransportPipelineOptions interface
- A level can now be defined for pipelines defined inside 'targets'
- Added UT in 'pipeline.test.js' to check expected behaviour with 'dedupe'
  • Loading branch information
dbacarel authored May 13, 2024
1 parent 3243b71 commit 8a20d79
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 109 deletions.
40 changes: 32 additions & 8 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ child.info('this will have both `foo: 1` and `bar: 2`')
logger.info('this will still only have `foo: 1`')
```

As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
the object returned from the `mixin` method will also be nested. Prior versions would mix
this object into the root.
As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
the object returned from the `mixin` method will also be nested. Prior versions would mix
this object into the root.

```js
const logger = pino({
nestedKey: 'payload',
nestedKey: 'payload',
mixin() {
return { requestId: requestId.currentId() }
}
Expand Down Expand Up @@ -590,7 +590,7 @@ when using the `transport` option. In this case, an `Error` will be thrown.
#### `onChild` (Function)
The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
Any error thrown inside the callback will be uncaught and should be handled inside the callback.
```js
const parent = require('pino')({ onChild: (instance) => {
Expand All @@ -609,7 +609,7 @@ Default: `pino.destination(1)` (STDOUT)
The `destination` parameter can be a file descriptor, a file path, or an
object with `dest` property pointing to a fd or path.
An ordinary Node.js `stream` file descriptor can be passed as the
destination (such as the result
destination (such as the result
of `fs.createWriteStream`) but for peak log writing performance, it is strongly
recommended to use `pino.destination` to create the destination stream.
Note that the `destination` parameter can be the result of `pino.transport()`.
Expand Down Expand Up @@ -1001,7 +1001,7 @@ Adds to the bindings of this logger instance.
**Note:** Does not overwrite bindings. Can potentially result in duplicate keys in
log lines.
* See [`bindings` parameter in `logger.child`](#logger-child-bindings)
* See [`bindings` parameter in `logger.child`](#logger-child-bindings)
<a id="flush"></a>
### `logger.flush([cb])`
Expand Down Expand Up @@ -1239,6 +1239,30 @@ const transport = pino.transport({
pino(transport)
```
Multiple transports can now be defined to include pipelines:
```js
const pino = require('pino')
const transport = pino.transport({
targets: [{
level: 'info',
target: 'pino-pretty' // must be installed separately
}, {
level: 'trace',
target: 'pino/file',
options: { destination: '/path/to/store/logs' }
}, {
pipeline: [{
target: 'pino-syslog' // must be installed separately
}, {
target: 'pino-socket' // must be installed separately
}]
}
]
})
pino(transport)
```
If `WeakRef`, `WeakMap`, and `FinalizationRegistry` are available in the current runtime (v14.5.0+), then the thread
will be automatically terminated in case the stream or logger goes out of scope.
The `transport()` function adds a listener to `process.on('beforeExit')` and `process.on('exit')` to ensure the worker
Expand Down Expand Up @@ -1276,7 +1300,7 @@ For more on transports, how they work, and how to create them see the [`Transpor
* `target`: The transport to pass logs through. This may be an installed module name or an absolute path.
* `options`: An options object which is serialized (see [Structured Clone Algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)), passed to the worker thread, parsed and then passed to the exported transport function.
* `worker`: [Worker thread](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) configuration options. Additionally, the `worker` option supports `worker.autoEnd`. If this is set to `false` logs will not be flushed on process exit. It is then up to the developer to call `transport.end()` to flush logs.
* `targets`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
* `targets`: May be specified instead of `target`. Must be an array of transport configurations and/or pipelines. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
* `pipeline`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options. All intermediate steps in the pipeline _must_ be `Transform` streams and not `Writable`.
* `dedupe`: See [pino.multistream options](#pino-multistream)
Expand Down
4 changes: 1 addition & 3 deletions docs/bundling.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ In particular, a bundler must ensure that the following files are also bundled s
* `lib/worker.js` from the `thread-stream` dependency
* `file.js`
* `lib/worker.js`
* `lib/worker-pipeline.js`
* Any transport used by the user (like `pino-pretty`)

Once the files above have been generated, the bundler must also add information about the files above by injecting a code that sets `__bundlerPathsOverrides` in the `globalThis` object.
Expand All @@ -22,12 +21,11 @@ globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': pinoWebpackAbsolutePath('./thread-stream-worker.js')
'pino/file': pinoWebpackAbsolutePath('./pino-file.js'),
'pino-worker': pinoWebpackAbsolutePath('./pino-worker.js'),
'pino-pipeline-worker': pinoWebpackAbsolutePath('./pino-pipeline-worker.js'),
'pino-pretty': pinoWebpackAbsolutePath('./pino-pretty.js'),
};
```

Note that `pino/file`, `pino-worker`, `pino-pipeline-worker`, and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.
Note that `pino/file`, `pino-worker` and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.

## Webpack Plugin

Expand Down
17 changes: 13 additions & 4 deletions lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,29 @@ function transport (fullOptions) {

if (targets) {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
options.targets = targets.map((dest) => {
options.targets = targets.filter(dest => dest.target).map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
})
options.pipelines = targets.filter(dest => dest.pipeline).map((dest) => {
return dest.pipeline.map((t) => {
return {
...t,
level: dest.level, // duplicate the pipeline `level` property defined in the upper level
target: fixTarget(t.target)
}
})
})
} else if (pipeline) {
target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js')
options.targets = pipeline.map((dest) => {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
options.pipelines = [pipeline.map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
})
})]
}

if (levels) {
Expand Down
38 changes: 0 additions & 38 deletions lib/worker-pipeline.js

This file was deleted.

188 changes: 164 additions & 24 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const EE = require('events')
const { pipeline, PassThrough } = require('stream')
const pino = require('../pino.js')
const build = require('pino-abstract-transport')
const loadTransportStreamBuilder = require('./transport-stream')
Expand All @@ -9,36 +11,144 @@ const loadTransportStreamBuilder = require('./transport-stream')

/* istanbul ignore file */

module.exports = async function ({ targets, levels, dedupe }) {
targets = await Promise.all(targets.map(async (t) => {
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}))
return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targets) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
/*
* > Multiple targets & pipelines
*
*
* ┌─────────────────────────────────────────────────┐ ┌─────┐
* │ │ │ p │
* │ │ │ i │
* │ target │ │ n │
* │ │ ────────────────────────────────┼────┤ o │
* │ targets │ target │ │ . │
* │ ────────────► │ ────────────────────────────────┼────┤ m │ source
* │ │ target │ │ u │ │
* │ │ ────────────────────────────────┼────┤ l │ │write
* │ │ │ │ t │ ▼
* │ │ pipeline ┌───────────────┐ │ │ i │ ┌────────┐
* │ │ ──────────► │ PassThrough ├───┼────┤ s ├──────┤ │
* │ │ └───────────────┘ │ │ t │ write│ Thread │
* │ │ │ │ r │◄─────┤ Stream │
* │ │ pipeline ┌───────────────┐ │ │ e │ │ │
* │ │ ──────────► │ PassThrough ├───┼────┤ a │ └────────┘
* │ └───────────────┘ │ │ m │
* │ │ │ │
* └─────────────────────────────────────────────────┘ └─────┘
*
*
*
* > One single pipeline or target
*
*
* source
* │
* ┌────────────────────────────────────────────────┐ │write
* │ │ ▼
* │ │ ┌────────┐
* │ targets │ target │ │ │
* │ ────────────► │ ──────────────────────────────┤ │ │
* │ │ │ │ │
* │ ├──────┤ │
* │ │ │ │
* │ │ │ │
* │ OR │ │ │
* │ │ │ │
* │ │ │ │
* │ ┌──────────────┐ │ │ │
* │ targets │ pipeline │ │ │ │ Thread │
* │ ────────────► │ ────────────►│ PassThrough ├─┤ │ Stream │
* │ │ │ │ │ │ │
* │ └──────────────┘ │ │ │
* │ │ │ │
* │ OR │ write│ │
* │ │◄─────┤ │
* │ │ │ │
* │ ┌──────────────┐ │ │ │
* │ pipeline │ │ │ │ │
* │ ──────────────►│ PassThrough ├────────────────┤ │ │
* │ │ │ │ │ │
* │ └──────────────┘ │ └────────┘
* │ │
* │ │
* └────────────────────────────────────────────────┘
*/

module.exports = async function ({ targets, pipelines, levels, dedupe }) {
const targetStreams = []

// Process targets
if (targets && targets.length) {
targets = await Promise.all(targets.map(async (t) => {
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}))

targetStreams.push(...targets)
}

// Process pipelines
if (pipelines && pipelines.length) {
pipelines = await Promise.all(
pipelines.map(async (p) => {
let level
const pipeDests = await Promise.all(
p.map(async (t) => {
// level assigned to pipeline is duplicated over all its targets, just store it
level = t.level
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return stream
}
))

function closeCb () {
if (--expected === 0) {
cb(err)
return {
level,
stream: createPipeline(pipeDests)
}
})
)
targetStreams.push(...pipelines)
}

// Skip building the multistream step if either one single pipeline or target is defined and
// return directly the stream instance back to TreadStream.
// This is equivalent to define either:
//
// pino.transport({ target: ... })
//
// OR
//
// pino.transport({ pipeline: ... })
if (targetStreams.length === 1) {
return targetStreams[0].stream
} else {
return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targetStreams) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
}

function closeCb () {
if (--expected === 0) {
cb(err)
}
}
}
}
})
})
}

// TODO: Why split2 was not used for pipelines?
function process (stream) {
const multi = pino.multistream(targets, { levels, dedupe })
const multi = pino.multistream(targetStreams, { levels, dedupe })
// TODO manage backpressure
stream.on('data', function (chunk) {
const { lastTime, lastMsg, lastObj, lastLevel } = this
Expand All @@ -51,4 +161,34 @@ module.exports = async function ({ targets, levels, dedupe }) {
multi.write(chunk + '\n')
})
}

/**
* Creates a pipeline using the provided streams and return an instance of `PassThrough` stream
* as a source for the pipeline.
*
* @param {(TransformStream|WritableStream)[]} streams An array of streams.
* All intermediate streams in the array *MUST* be `Transform` streams and only the last one `Writable`.
* @returns A `PassThrough` stream instance representing the source stream of the pipeline
*/
function createPipeline (streams) {
const ee = new EE()
const stream = new PassThrough({
autoDestroy: true,
destroy (_, cb) {
ee.on('error', cb)
ee.on('closed', cb)
}
})

pipeline(stream, ...streams, function (err) {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
ee.emit('error', err)
return
}

ee.emit('closed')
})

return stream
}
}
Loading

0 comments on commit 8a20d79

Please sign in to comment.