diff --git a/docs/api.md b/docs/api.md
index 1fc438423..80fa249d8 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -241,13 +241,13 @@ child.info('this will have both `foo: 1` and `bar: 2`')
logger.info('this will still only have `foo: 1`')
```
-As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
-the object returned from the `mixin` method will also be nested. Prior versions would mix
-this object into the root.
+As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
+the object returned from the `mixin` method will also be nested. Prior versions would mix
+this object into the root.
```js
const logger = pino({
- nestedKey: 'payload',
+ nestedKey: 'payload',
mixin() {
return { requestId: requestId.currentId() }
}
@@ -590,7 +590,7 @@ when using the `transport` option. In this case, an `Error` will be thrown.
#### `onChild` (Function)
-The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
+The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
Any error thrown inside the callback will be uncaught and should be handled inside the callback.
```js
const parent = require('pino')({ onChild: (instance) => {
@@ -609,7 +609,7 @@ Default: `pino.destination(1)` (STDOUT)
The `destination` parameter can be a file descriptor, a file path, or an
object with `dest` property pointing to a fd or path.
An ordinary Node.js `stream` file descriptor can be passed as the
-destination (such as the result
+destination (such as the result
of `fs.createWriteStream`) but for peak log writing performance, it is strongly
recommended to use `pino.destination` to create the destination stream.
Note that the `destination` parameter can be the result of `pino.transport()`.
@@ -1001,7 +1001,7 @@ Adds to the bindings of this logger instance.
**Note:** Does not overwrite bindings. Can potentially result in duplicate keys in
log lines.
-* See [`bindings` parameter in `logger.child`](#logger-child-bindings)
+* See [`bindings` parameter in `logger.child`](#logger-child-bindings)
### `logger.flush([cb])`
@@ -1239,6 +1239,30 @@ const transport = pino.transport({
pino(transport)
```
+Multiple transports can now be defined to include pipelines:
+
+```js
+const pino = require('pino')
+const transport = pino.transport({
+ targets: [{
+ level: 'info',
+ target: 'pino-pretty' // must be installed separately
+ }, {
+ level: 'trace',
+ target: 'pino/file',
+ options: { destination: '/path/to/store/logs' }
+ }, {
+ pipeline: [{
+ target: 'pino-syslog' // must be installed separately
+ }, {
+ target: 'pino-socket' // must be installed separately
+ }]
+ }
+ ]
+})
+pino(transport)
+```
+
If `WeakRef`, `WeakMap`, and `FinalizationRegistry` are available in the current runtime (v14.5.0+), then the thread
will be automatically terminated in case the stream or logger goes out of scope.
The `transport()` function adds a listener to `process.on('beforeExit')` and `process.on('exit')` to ensure the worker
@@ -1276,7 +1300,7 @@ For more on transports, how they work, and how to create them see the [`Transpor
* `target`: The transport to pass logs through. This may be an installed module name or an absolute path.
* `options`: An options object which is serialized (see [Structured Clone Algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)), passed to the worker thread, parsed and then passed to the exported transport function.
* `worker`: [Worker thread](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) configuration options. Additionally, the `worker` option supports `worker.autoEnd`. If this is set to `false` logs will not be flushed on process exit. It is then up to the developer to call `transport.end()` to flush logs.
-* `targets`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
+* `targets`: May be specified instead of `target`. Must be an array of transport configurations and/or pipelines. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
* `pipeline`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options. All intermediate steps in the pipeline _must_ be `Transform` streams and not `Writable`.
* `dedupe`: See [pino.multistream options](#pino-multistream)
diff --git a/docs/bundling.md b/docs/bundling.md
index 6467b8e52..1c326a936 100644
--- a/docs/bundling.md
+++ b/docs/bundling.md
@@ -7,7 +7,6 @@ In particular, a bundler must ensure that the following files are also bundled s
* `lib/worker.js` from the `thread-stream` dependency
* `file.js`
* `lib/worker.js`
-* `lib/worker-pipeline.js`
* Any transport used by the user (like `pino-pretty`)
Once the files above have been generated, the bundler must also add information about the files above by injecting a code that sets `__bundlerPathsOverrides` in the `globalThis` object.
@@ -22,12 +21,11 @@ globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': pinoWebpackAbsolutePath('./thread-stream-worker.js')
'pino/file': pinoWebpackAbsolutePath('./pino-file.js'),
'pino-worker': pinoWebpackAbsolutePath('./pino-worker.js'),
- 'pino-pipeline-worker': pinoWebpackAbsolutePath('./pino-pipeline-worker.js'),
'pino-pretty': pinoWebpackAbsolutePath('./pino-pretty.js'),
};
```
-Note that `pino/file`, `pino-worker`, `pino-pipeline-worker`, and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.
+Note that `pino/file`, `pino-worker` and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.
## Webpack Plugin
diff --git a/lib/transport.js b/lib/transport.js
index 6be075fdf..b8ce347f2 100644
--- a/lib/transport.js
+++ b/lib/transport.js
@@ -87,20 +87,29 @@ function transport (fullOptions) {
if (targets) {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
- options.targets = targets.map((dest) => {
+ options.targets = targets.filter(dest => dest.target).map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
})
+ options.pipelines = targets.filter(dest => dest.pipeline).map((dest) => {
+ return dest.pipeline.map((t) => {
+ return {
+ ...t,
+ level: dest.level, // duplicate the pipeline `level` property defined in the upper level
+ target: fixTarget(t.target)
+ }
+ })
+ })
} else if (pipeline) {
- target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js')
- options.targets = pipeline.map((dest) => {
+ target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
+ options.pipelines = [pipeline.map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
- })
+ })]
}
if (levels) {
diff --git a/lib/worker-pipeline.js b/lib/worker-pipeline.js
deleted file mode 100644
index 76cb3b888..000000000
--- a/lib/worker-pipeline.js
+++ /dev/null
@@ -1,38 +0,0 @@
-'use strict'
-
-const EE = require('events')
-const loadTransportStreamBuilder = require('./transport-stream')
-const { pipeline, PassThrough } = require('stream')
-
-// This file is not checked by the code coverage tool,
-// as it is not reliable.
-
-/* istanbul ignore file */
-
-module.exports = async function ({ targets }) {
- const streams = await Promise.all(targets.map(async (t) => {
- const fn = await loadTransportStreamBuilder(t.target)
- const stream = await fn(t.options)
- return stream
- }))
- const ee = new EE()
-
- const stream = new PassThrough({
- autoDestroy: true,
- destroy (_, cb) {
- ee.on('error', cb)
- ee.on('closed', cb)
- }
- })
-
- pipeline(stream, ...streams, function (err) {
- if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
- ee.emit('error', err)
- return
- }
-
- ee.emit('closed')
- })
-
- return stream
-}
diff --git a/lib/worker.js b/lib/worker.js
index c20c19add..dcbd5d391 100644
--- a/lib/worker.js
+++ b/lib/worker.js
@@ -1,5 +1,7 @@
'use strict'
+const EE = require('events')
+const { pipeline, PassThrough } = require('stream')
const pino = require('../pino.js')
const build = require('pino-abstract-transport')
const loadTransportStreamBuilder = require('./transport-stream')
@@ -9,36 +11,144 @@ const loadTransportStreamBuilder = require('./transport-stream')
/* istanbul ignore file */
-module.exports = async function ({ targets, levels, dedupe }) {
- targets = await Promise.all(targets.map(async (t) => {
- const fn = await loadTransportStreamBuilder(t.target)
- const stream = await fn(t.options)
- return {
- level: t.level,
- stream
- }
- }))
- return build(process, {
- parse: 'lines',
- metadata: true,
- close (err, cb) {
- let expected = 0
- for (const transport of targets) {
- expected++
- transport.stream.on('close', closeCb)
- transport.stream.end()
+/*
+ * > Multiple targets & pipelines
+ *
+ *
+ * ┌─────────────────────────────────────────────────┐ ┌─────┐
+ * │ │ │ p │
+ * │ │ │ i │
+ * │ target │ │ n │
+ * │ │ ────────────────────────────────┼────┤ o │
+ * │ targets │ target │ │ . │
+ * │ ────────────► │ ────────────────────────────────┼────┤ m │ source
+ * │ │ target │ │ u │ │
+ * │ │ ────────────────────────────────┼────┤ l │ │write
+ * │ │ │ │ t │ ▼
+ * │ │ pipeline ┌───────────────┐ │ │ i │ ┌────────┐
+ * │ │ ──────────► │ PassThrough ├───┼────┤ s ├──────┤ │
+ * │ │ └───────────────┘ │ │ t │ write│ Thread │
+ * │ │ │ │ r │◄─────┤ Stream │
+ * │ │ pipeline ┌───────────────┐ │ │ e │ │ │
+ * │ │ ──────────► │ PassThrough ├───┼────┤ a │ └────────┘
+ * │ └───────────────┘ │ │ m │
+ * │ │ │ │
+ * └─────────────────────────────────────────────────┘ └─────┘
+ *
+ *
+ *
+ * > One single pipeline or target
+ *
+ *
+ * source
+ * │
+ * ┌────────────────────────────────────────────────┐ │write
+ * │ │ ▼
+ * │ │ ┌────────┐
+ * │ targets │ target │ │ │
+ * │ ────────────► │ ──────────────────────────────┤ │ │
+ * │ │ │ │ │
+ * │ ├──────┤ │
+ * │ │ │ │
+ * │ │ │ │
+ * │ OR │ │ │
+ * │ │ │ │
+ * │ │ │ │
+ * │ ┌──────────────┐ │ │ │
+ * │ targets │ pipeline │ │ │ │ Thread │
+ * │ ────────────► │ ────────────►│ PassThrough ├─┤ │ Stream │
+ * │ │ │ │ │ │ │
+ * │ └──────────────┘ │ │ │
+ * │ │ │ │
+ * │ OR │ write│ │
+ * │ │◄─────┤ │
+ * │ │ │ │
+ * │ ┌──────────────┐ │ │ │
+ * │ pipeline │ │ │ │ │
+ * │ ──────────────►│ PassThrough ├────────────────┤ │ │
+ * │ │ │ │ │ │
+ * │ └──────────────┘ │ └────────┘
+ * │ │
+ * │ │
+ * └────────────────────────────────────────────────┘
+ */
+
+module.exports = async function ({ targets, pipelines, levels, dedupe }) {
+ const targetStreams = []
+
+ // Process targets
+ if (targets && targets.length) {
+ targets = await Promise.all(targets.map(async (t) => {
+ const fn = await loadTransportStreamBuilder(t.target)
+ const stream = await fn(t.options)
+ return {
+ level: t.level,
+ stream
}
+ }))
+
+ targetStreams.push(...targets)
+ }
+
+ // Process pipelines
+ if (pipelines && pipelines.length) {
+ pipelines = await Promise.all(
+ pipelines.map(async (p) => {
+ let level
+ const pipeDests = await Promise.all(
+ p.map(async (t) => {
+ // level assigned to pipeline is duplicated over all its targets, just store it
+ level = t.level
+ const fn = await loadTransportStreamBuilder(t.target)
+ const stream = await fn(t.options)
+ return stream
+ }
+ ))
- function closeCb () {
- if (--expected === 0) {
- cb(err)
+ return {
+ level,
+ stream: createPipeline(pipeDests)
+ }
+ })
+ )
+ targetStreams.push(...pipelines)
+ }
+
+ // Skip building the multistream step if either one single pipeline or target is defined and
+ // return directly the stream instance back to TreadStream.
+ // This is equivalent to define either:
+ //
+ // pino.transport({ target: ... })
+ //
+ // OR
+ //
+ // pino.transport({ pipeline: ... })
+ if (targetStreams.length === 1) {
+ return targetStreams[0].stream
+ } else {
+ return build(process, {
+ parse: 'lines',
+ metadata: true,
+ close (err, cb) {
+ let expected = 0
+ for (const transport of targetStreams) {
+ expected++
+ transport.stream.on('close', closeCb)
+ transport.stream.end()
+ }
+
+ function closeCb () {
+ if (--expected === 0) {
+ cb(err)
+ }
}
}
- }
- })
+ })
+ }
+ // TODO: Why split2 was not used for pipelines?
function process (stream) {
- const multi = pino.multistream(targets, { levels, dedupe })
+ const multi = pino.multistream(targetStreams, { levels, dedupe })
// TODO manage backpressure
stream.on('data', function (chunk) {
const { lastTime, lastMsg, lastObj, lastLevel } = this
@@ -51,4 +161,34 @@ module.exports = async function ({ targets, levels, dedupe }) {
multi.write(chunk + '\n')
})
}
+
+ /**
+ * Creates a pipeline using the provided streams and return an instance of `PassThrough` stream
+ * as a source for the pipeline.
+ *
+ * @param {(TransformStream|WritableStream)[]} streams An array of streams.
+ * All intermediate streams in the array *MUST* be `Transform` streams and only the last one `Writable`.
+ * @returns A `PassThrough` stream instance representing the source stream of the pipeline
+ */
+ function createPipeline (streams) {
+ const ee = new EE()
+ const stream = new PassThrough({
+ autoDestroy: true,
+ destroy (_, cb) {
+ ee.on('error', cb)
+ ee.on('closed', cb)
+ }
+ })
+
+ pipeline(stream, ...streams, function (err) {
+ if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
+ ee.emit('error', err)
+ return
+ }
+
+ ee.emit('closed')
+ })
+
+ return stream
+ }
}
diff --git a/pino.d.ts b/pino.d.ts
index b6709a59d..3efe72276 100644
--- a/pino.d.ts
+++ b/pino.d.ts
@@ -259,10 +259,11 @@ declare namespace pino {
interface TransportPipelineOptions> extends TransportBaseOptions{
pipeline: TransportSingleOptions[]
+ level?: LevelWithSilentOrString
}
interface TransportMultiOptions> extends TransportBaseOptions{
- targets: readonly TransportTargetOptions[],
+ targets: readonly (TransportTargetOptions|TransportPipelineOptions)[],
levels?: Record
dedupe?: boolean
}
diff --git a/test/transport/bundlers-support.test.js b/test/transport/bundlers-support.test.js
index efe8a5edc..cb10221c0 100644
--- a/test/transport/bundlers-support.test.js
+++ b/test/transport/bundlers-support.test.js
@@ -95,33 +95,3 @@ test('pino.transport with worker destination overridden by bundler and mjs trans
globalThis.__bundlerPathsOverrides = undefined
})
-
-test('pino.transport with worker-pipeline destination overridden by bundler', async ({ same, teardown }) => {
- globalThis.__bundlerPathsOverrides = {
- 'pino-pipeline-worker': join(__dirname, '..', '..', 'lib/worker-pipeline.js')
- }
-
- const destination = file()
- const transport = pino.transport({
- pipeline: [
- {
- target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
- options: { destination }
- }
- ]
- })
- teardown(transport.end.bind(transport))
- const instance = pino(transport)
- instance.info('hello')
- await watchFileCreated(destination)
- const result = JSON.parse(await readFile(destination))
- delete result.time
- same(result, {
- pid,
- hostname,
- level: 30,
- msg: 'hello'
- })
-
- globalThis.__bundlerPathsOverrides = undefined
-})
diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js
index 25b244e06..a845d4425 100644
--- a/test/transport/pipeline.test.js
+++ b/test/transport/pipeline.test.js
@@ -6,6 +6,7 @@ const { readFile } = require('fs').promises
const { watchFileCreated, file } = require('../helper')
const { test } = require('tap')
const pino = require('../../')
+const { DEFAULT_LEVELS } = require('../../lib/constants')
const { pid } = process
const hostname = os.hostname()
@@ -29,8 +30,106 @@ test('pino.transport with a pipeline', async ({ same, teardown }) => {
same(result, {
pid,
hostname,
- level: 30,
+ level: DEFAULT_LEVELS.info,
msg: 'hello',
service: 'pino' // this property was added by the transform
})
})
+
+test('pino.transport with targets containing pipelines', async ({ same, teardown }) => {
+ const destinationA = file()
+ const destinationB = file()
+ const transport = pino.transport({
+ targets: [
+ {
+ target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
+ options: { destination: destinationA }
+ },
+ {
+ pipeline: [
+ {
+ target: join(__dirname, '..', 'fixtures', 'transport-transform.js')
+ },
+ {
+ target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
+ options: { destination: destinationB }
+ }
+ ]
+ }
+ ]
+ })
+
+ teardown(transport.end.bind(transport))
+ const instance = pino(transport)
+ instance.info('hello')
+ await watchFileCreated(destinationA)
+ await watchFileCreated(destinationB)
+ const resultA = JSON.parse(await readFile(destinationA))
+ const resultB = JSON.parse(await readFile(destinationB))
+ delete resultA.time
+ delete resultB.time
+ same(resultA, {
+ pid,
+ hostname,
+ level: DEFAULT_LEVELS.info,
+ msg: 'hello'
+ })
+ same(resultB, {
+ pid,
+ hostname,
+ level: DEFAULT_LEVELS.info,
+ msg: 'hello',
+ service: 'pino' // this property was added by the transform
+ })
+})
+
+test('pino.transport with targets containing pipelines with levels defined and dedupe', async ({ same, teardown }) => {
+ const destinationA = file()
+ const destinationB = file()
+ const transport = pino.transport({
+ targets: [
+ {
+ target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
+ options: { destination: destinationA },
+ level: DEFAULT_LEVELS.info
+ },
+ {
+ pipeline: [
+ {
+ target: join(__dirname, '..', 'fixtures', 'transport-transform.js')
+ },
+ {
+ target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'),
+ options: { destination: destinationB }
+ }
+ ],
+ level: DEFAULT_LEVELS.error
+ }
+ ],
+ dedupe: true
+ })
+
+ teardown(transport.end.bind(transport))
+ const instance = pino(transport)
+ instance.info('hello info')
+ instance.error('hello error')
+ await watchFileCreated(destinationA)
+ await watchFileCreated(destinationB)
+ const resultA = JSON.parse(await readFile(destinationA))
+ const resultB = JSON.parse(await readFile(destinationB))
+ delete resultA.time
+ delete resultB.time
+ same(resultA, {
+ pid,
+ hostname,
+ level: DEFAULT_LEVELS.info,
+ msg: 'hello info'
+ })
+ same(resultB, {
+ pid,
+ hostname,
+ level: DEFAULT_LEVELS.error,
+ msg: 'hello error',
+ service: 'pino' // this property was added by the transform
+ })
+})