Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
v11.4.0; support for pre-registering partial transactions for AWS Lam…
Browse files Browse the repository at this point in the history
  • Loading branch information
trentm authored Apr 28, 2023
1 parent 4f03ca7 commit b30b2f3
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 22 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# elastic-apm-http-client changelog

## v11.4.0

- Add support for pre-registering of partial transactions for AWS Lambda.
This adds `client.lambdaShouldRegisterTransactions()` and
`client.lambdaRegisterTransaction(transaction, awsRequestId)` so the
APM agent can register a partial transaction with the Elastic Lambda
extension before executing the user's handler. In some error cases
(`uncaughtException`, `unhandledRejection`, Lambda timeout), the extension
can report that transaction when the APM agent is unable.
(https://github.com/elastic/apm-agent-nodejs/issues/3136)

## v11.3.1

- Tweak logic to only exclude `metadata.service.agent.activation_method` when
Expand Down
69 changes: 53 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,29 +363,29 @@ Client creation.
### `client.lambdaStart()`

Tells the client that a Lambda function invocation has started.
See [Notes on Lambda Usage](#notes-on-lambda-usage) below.

#### Notes on Lambda usage
### `client.lambdaShouldRegisterTransactions()`

To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
this Client should be used as follows in a Lambda environment.
This returns a boolean indicating if the APM agent -- when running in a Lambda
environment -- should bother calling `client.lambdaRegisterTransaction(...)`.
This can help the APM agent avoid some processing gathering transaction data.

- When a Lambda invocation starts, `client.lambdaStart()` must be called.
Typically the reason this would return `false` is when the Lambda extension is
too old to support registering transactions.

The Client prevents intake requests to APM Server when in a Lambda environment
when a function invocation is *not* active. This is to ensure that an intake
request does not accidentally span a period when a Lambda VM is frozen,
which can lead to timeouts and lost APM data.

- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must
be called.
### `client.lambdaRegisterTransaction(transaction, awsRequestId)`

The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so
a subsequent intake request is not started until the next invocation, and
(b) signal the Elastic AWS Lambda Extension that this invocation is done.
The user's Lambda handler should not finish until `cb` is called. This
ensures that the extension receives tracing data and the end signal before
the Lambda Runtime freezes the VM.
Tells the Lambda Extension about the ongoing transaction, so that data can be
used to report the transaction in certain error cases -- e.g. a Lambda handler
timeout. See [Notes on Lambda Usage](#notes-on-lambda-usage) below.

Arguments:

- `transaction` - A transaction object that can be serialized to JSON.
- `awsRequestId` - The AWS request ID for this invocation. This is a UUID
available on the Lambda context object.

### `client.sendSpan(span[, callback])`

Expand Down Expand Up @@ -462,6 +462,43 @@ Destroy the `client`. After this call, the client has ended and
subsequent calls to `sendSpan()`, `sendTransaction()`, `sendError()`,
`flush()`, or `end()` will result in an error.

## Notes on Lambda usage

To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
this Client should be used as follows in a Lambda environment.

1. Ensure that metadata is set before any of the following calls. Typically
in Lambda this is done by (a) configuring the client with
`expectExtraMetadata` and (b) calling `setExtraMetadata()` at the start of
the first invocation.

2. When a Lambda invocation starts, `client.lambdaStart()` must be called.
The Client prevents intake requests to APM Server when in a Lambda
environment when a function invocation is *not* active. This is to ensure
that an intake request does not accidentally span a period when a Lambda VM
is frozen, which can lead to timeouts and lost APM data.

3. When the transaction for this Lambda invocation has been created,
`await client.lambdaRegisterTransaction(<transaction>, <awsRequestId>)` should be
called. This is used to pass transaction details to the Lambda Extension so
a transaction can be reported in certain failure modes (e.g. a Lambda
handler timeout).

`client.lambdaShouldRegisterTransactions()` can be used to avoid gathering
data for this call.

4. When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)`
must be called.

The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so
a subsequent intake request is not started until the next invocation, and
(b) signal the Elastic AWS Lambda Extension that this invocation is done.
The user's Lambda handler should not finish until `cb` is called. This
ensures that the extension receives tracing data and the end signal before
the Lambda Runtime freezes the VM.



## License

[MIT](LICENSE)
110 changes: 107 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ function Client (opts) {
// _lambdaActive indicates if a Lambda function invocation is active. It is
// only meaningful if `isLambdaExecutionEnvironment`.
this._lambdaActive = false
// Whether to forward `.lambdaRegisterTransaction()` calls to the Lambda
// extension. This will be set false if a previous attempt failed.
this._lambdaShouldRegisterTransactions = true

// Internal runtime stats for developer debugging/tuning.
this._numEvents = 0 // number of events given to the client
Expand Down Expand Up @@ -312,6 +315,7 @@ Client.prototype.config = function (opts) {
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent)
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent)
this._conf.requestRegisterTransaction = getRegisterTransactionRequestOptions(this._conf, this._agent)

// fixes bug where cached/memoized _encodedMetadata wouldn't be
// updated when client was reconfigured
Expand Down Expand Up @@ -649,6 +653,101 @@ Client.prototype.lambdaStart = function () {
this._lambdaActive = true
}

/**
* Indicate whether the APM agent -- when in a Lambda environment -- should
* bother calling `.lambdaRegisterTransaction(...)`.
*
* @returns {boolean}
*/
Client.prototype.lambdaShouldRegisterTransactions = function () {
return this._lambdaShouldRegisterTransactions
}

/**
* Tell the local Lambda extension about the just-started transaction. This
* allows the extension to report the transaction in certain error cases
* where the APM agent isn't able to *end* the transaction and report it,
* e.g. if the function is about to timeout, or if the process crashes.
*
* The expected request is as follows, and a 200 status code is expected in
* response:
*
* POST /register/transaction
* Content-Type: application/vnd.elastic.apm.transaction+ndjson
* x-elastic-aws-request-id: ${awsRequestId}
*
* {"metadata":{...}}
* {"transaction":{...partial transaction data...}}
*
* @param {object} trans - a mostly complete APM Transaction object. It should
* have a default `outcome` value. `duration` and `result` (and possibly
* `outcome`) fields will be set by the Elastic Lambda extension if this
* transaction is used.
* @param {import('crypto').UUID} awsRequestId
* @returns {Promise || undefined} So this can, and should, be `await`ed.
* If returning a promise, it will only resolve, never reject.
*/
Client.prototype.lambdaRegisterTransaction = function (trans, awsRequestId) {
if (!isLambdaExecutionEnvironment) {
return
}
if (!this._lambdaShouldRegisterTransactions) {
return
}
assert(this._encodedMetadata, '_encodedMetadata is set')

// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000
const startTime = performance.now()

return new Promise((resolve, reject) => {
this._log.trace({ awsRequestId, traceId: trans.trace_id, transId: trans.id }, 'lambdaRegisterTransaction start')
var out = this._encode({ transaction: trans }, Client.encoding.TRANSACTION)

const finish = errOrErrMsg => {
const durationMs = performance.now() - startTime
if (errOrErrMsg) {
this._log.debug({ awsRequestId, err: errOrErrMsg, durationMs }, 'lambdaRegisterTransaction unsuccessful')
this._lambdaShouldRegisterTransactions = false
} else {
this._log.trace({ awsRequestId, durationMs }, 'lambdaRegisterTransaction success')
}
resolve() // always resolve, never reject
}

// Every `POST /register/transaction` request must set the
// `x-elastic-aws-request-id` header. Instead of creating a new options obj
// each time, we just modify in-place.
this._conf.requestRegisterTransaction.headers['x-elastic-aws-request-id'] = awsRequestId

const req = this._transportRequest(this._conf.requestRegisterTransaction, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err)
})
res.resume()
if (res.statusCode !== 200) {
finish(`unexpected response status code: ${res.statusCode}`)
return
}
res.on('end', function () {
finish()
})
})
req.setTimeout(TIMEOUT_MS)
req.on('timeout', () => {
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) registering lambda transaction`))
})
req.on('error', err => {
finish(err)
})
req.write(this._encodedMetadata)
req.write(out)
req.end()
})
}

// With the cork/uncork handling on this stream, `this.write`ing on this
// stream when already destroyed will lead to:
// Error: Cannot call write after a stream was destroyed
Expand Down Expand Up @@ -1160,9 +1259,9 @@ Client.prototype._signalLambdaEnd = function (cb) {
const finish = errOrErrMsg => {
const durationMs = performance.now() - startTime
if (errOrErrMsg) {
this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done')
this._log.error({ err: errOrErrMsg, durationMs }, '_signalLambdaEnd error')
} else {
this._log.trace({ durationMs }, 'signaled lambda invocation done')
this._log.trace({ durationMs }, '_signalLambdaEnd success')
}
cb()
}
Expand All @@ -1187,7 +1286,6 @@ Client.prototype._signalLambdaEnd = function (cb) {
})
req.setTimeout(TIMEOUT_MS)
req.on('timeout', () => {
this._log.trace('_signalLambdaEnd timeout')
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`))
})
req.on('error', err => {
Expand Down Expand Up @@ -1321,6 +1419,12 @@ function getSignalLambdaEndRequestOptions (opts, agent) {
return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent)
}

function getRegisterTransactionRequestOptions (opts, agent) {
const headers = getHeaders(opts)
headers['Content-Type'] = 'application/vnd.elastic.apm.transaction+ndjson'
return getBasicRequestOptions('POST', '/register/transaction', headers, opts, agent)
}

function getConfigRequestOptions (opts, agent) {
const path = '/config/v1/agents?' + querystring.stringify({
'service.name': opts.serviceName,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "elastic-apm-http-client",
"version": "11.3.1",
"version": "11.4.0",
"description": "A low-level HTTP client for communicating with the Elastic APM intake API",
"main": "index.js",
"directories": {
Expand Down
28 changes: 26 additions & 2 deletions test/lambda-usage.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ test('lambda usage', suite => {

test('setup mock APM server', t => {
server = APMServer(function (req, res) {
if (req.method === 'POST' && req.url === '/register/transaction') {
req.resume()
req.on('end', () => {
res.writeHead(200)
res.end()
})
return
} else if (!(req.method === 'POST' && req.url.startsWith('/intake/v2/events'))) {
req.resume()
req.on('end', () => {
res.writeHead(404)
res.end()
})
return
}

// Capture intake req data to this mock APM server to `reqsToServer`.
const reqInfo = {
method: req.method,
Expand Down Expand Up @@ -78,13 +94,21 @@ test('lambda usage', suite => {
}, client._conf.bufferWindowTime + 10)
})

test('lambda invocation', t => {
test('lambda invocation', async (t) => {
client.lambdaStart() // 1. start of invocation

// 2. Registering transaction
t.equal(client.lambdaShouldRegisterTransactions(), true, '.lambdaShouldRegisterTransactions() is true')
await client.lambdaRegisterTransaction(
{ name: 'GET /aStage/myFn', type: 'lambda', outcome: 'unknown' /* ... */ },
'063de0d2-1705-4eeb-9dfd-045d76b8cdec')
t.equal(client.lambdaShouldRegisterTransactions(), true, '.lambdaShouldRegisterTransactions() is true after register')

setTimeout(() => {
client.sendTransaction({ name: 'GET /aStage/myFn', type: 'lambda', result: 'success' /* ... */ })
client.sendSpan({ name: 'mySpan', type: 'custom', result: 'success' /* ... */ })

// 2. end of invocation
// 3. Flush at end of invocation
client.flush({ lambdaEnd: true }, function () {
t.ok(reqsToServer.length > 1, 'at least 2 intake requests to APM Server')
t.equal(reqsToServer[reqsToServer.length - 1].url.searchParams.get('flushed'), 'true',
Expand Down

0 comments on commit b30b2f3

Please sign in to comment.