Skip to content

Commit

Permalink
fix: onConnect retry
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 12, 2024
1 parent 20b8026 commit 12f1b02
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 17 deletions.
8 changes: 8 additions & 0 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ class ConnectHandler extends AsyncResource {
this.responseHeaders = responseHeaders || null
this.callback = callback
this.abort = null
this.upgraded = false

addSignal(this, signal)
}

onConnect (abort, context) {
if (this.upgraded) {
// Cannot be retried...
throw new Error('request already sent')
}

if (this.reason) {
abort(this.reason)
return
Expand All @@ -49,6 +55,8 @@ class ConnectHandler extends AsyncResource {
}

onUpgrade (statusCode, rawHeaders, socket) {
this.upgraded = true

const { callback, opaque, context } = this

removeSignal(this)
Expand Down
12 changes: 12 additions & 0 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class PipelineHandler extends AsyncResource {
this.abort = null
this.context = null
this.onInfo = onInfo || null
this.headersSent = false

this.req = new PipelineRequest().on('error', noop)

Expand Down Expand Up @@ -147,6 +148,11 @@ class PipelineHandler extends AsyncResource {
}

onConnect (abort, context) {
if (this.headersSent) {
// Cannot be retried...
throw new Error('request already sent')
}

const { res } = this

if (this.reason) {
Expand All @@ -160,7 +166,13 @@ class PipelineHandler extends AsyncResource {
this.context = context
}

onUpgrade () {
throw new Error('bad upgrade')
}

onHeaders (statusCode, rawHeaders, resume) {
this.headersSent = true

const { opaque, handler, context } = this

if (statusCode < 200) {
Expand Down
12 changes: 12 additions & 0 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RequestHandler extends AsyncResource {
this.highWaterMark = highWaterMark
this.reason = null
this.removeAbortListener = null
this.headersSent = false

if (signal?.aborted) {
this.reason = signal.reason ?? new RequestAbortedError()
Expand All @@ -74,6 +75,11 @@ class RequestHandler extends AsyncResource {
}

onConnect (abort, context) {
if (this.headersSent) {
// Cannot be retried...
throw new Error('request already sent')
}

if (this.reason) {
abort(this.reason)
return
Expand All @@ -85,7 +91,13 @@ class RequestHandler extends AsyncResource {
this.context = context
}

onUpgrade () {
throw new Error('bad upgrade')
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
this.headersSent = true

const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
Expand Down
12 changes: 12 additions & 0 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class StreamHandler extends AsyncResource {
this.trailers = null
this.body = body
this.onInfo = onInfo || null
this.headersSent = false

if (util.isStream(body)) {
body.on('error', (err) => {
Expand All @@ -67,6 +68,11 @@ class StreamHandler extends AsyncResource {
}

onConnect (abort, context) {
if (this.headersSent) {
// Cannot be retried...
throw new Error('request already sent')
}

if (this.reason) {
abort(this.reason)
return
Expand All @@ -78,7 +84,13 @@ class StreamHandler extends AsyncResource {
this.context = context
}

onUpgrade () {
throw new Error('bad upgrade')
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
this.headersSent = true

const { factory, opaque, context, responseHeaders } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
Expand Down
8 changes: 8 additions & 0 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@ class UpgradeHandler extends AsyncResource {
this.callback = callback
this.abort = null
this.context = null
this.upgraded = false

addSignal(this, signal)
}

onConnect (abort, context) {
if (this.upgraded) {
// Cannot be retried...
throw new Error('request already sent')
}

if (this.reason) {
abort(this.reason)
return
Expand All @@ -50,6 +56,8 @@ class UpgradeHandler extends AsyncResource {
}

onUpgrade (statusCode, rawHeaders, socket) {
this.upgraded = true

assert(statusCode === 101)

const { callback, opaque, context } = this
Expand Down
32 changes: 15 additions & 17 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class RetryHandler {
this.dispatch = handlers.dispatch
this.handler = handlers.handler
this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) }
this.abort = null
this.aborted = false
this.retryOpts = {
retry: retryFn ?? RetryHandler[kRetryHandlerDefaultRetry],
retryAfter: retryAfter ?? true,
Expand All @@ -62,22 +60,14 @@ class RetryHandler {
]
}

this.abort = null
this.aborted = false
this.retryCount = 0
this.retryCountCheckpoint = 0
this.start = 0
this.end = null
this.etag = null
this.resume = null

// Handle possible onConnect duplication
this.handler.onConnect(reason => {
this.aborted = true
if (this.abort) {
this.abort(reason)
} else {
this.reason = reason
}
})
}

onRequestSent () {
Expand All @@ -93,11 +83,19 @@ class RetryHandler {
}

onConnect (abort) {
if (this.aborted) {
abort(this.reason)
} else {
this.abort = abort
}
this.abort = abort
this.aborted = false
this.retryCount = 0
this.retryCountCheckpoint = 0
this.start = 0
this.end = null
this.etag = null
this.resume = null

this.handler.onConnect(reason => {
this.aborted = true
this.abort(reason)
})
}

onBodySent (chunk) {
Expand Down

0 comments on commit 12f1b02

Please sign in to comment.