Skip to content

Commit

Permalink
DOCSP-5410: Automatically restart workers
Browse files Browse the repository at this point in the history
  • Loading branch information
i80and committed May 8, 2019
1 parent 2b13e50 commit 4cca570
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 76 deletions.
145 changes: 145 additions & 0 deletions src/TaskWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
'use strict'

const Worker = require('tiny-worker')
const log = require('./log').log

const MAXIMUM_BACKLOG = 20

/** A web worker with a promise-oriented message-call interface. */
class TaskWorker {
/**
* Create a new TaskWorker.
* @param {string} scriptPath - A path to a JS file to execute.
*/
constructor(scriptPath) {
this.scriptPath = scriptPath
this.backlog = 0
this.pending = new Map()
this.messageId = 0
this.lastStarted = null
this.dead = false
this.worker = null
this.start()
}

/**
* Send a message to this TaskWorker.
* @param {map} message - An object to send to the worker.
* @return {Promise}
*/
send(message) {
if (this.backlog > MAXIMUM_BACKLOG) {
throw new Error('backlog-exceeded')
}

if (!this.worker) {
throw new Error('Worker not running')
}

return new Promise((resolve, reject) => {
const messageId = this.messageId
this.messageId += 1
this.backlog += 1

this.worker.postMessage({message: message, messageId: messageId})
this.pending.set(messageId, [resolve, reject])
})
}

/**
* Handler for messages received from the worker.
* @private
* @param {MessageEvent} event
* @return {Promise<?, Error>}
*/
onmessage(event) {
const pair = this.pending.get(event.data.messageId)
if (!pair) {
log.error(`Got unknown message ID ${event.data.messageId}`)
return
}

this.backlog -= 1
this.pending.delete(event.data.messageId)
const [resolve, reject] = pair
if (event.data.error) {
reject(new Error(event.data.error))
return
}

resolve(event.data)
}

/**
* Start the worker process.
* @return {number}
*/
start() {
// Do nothing if the child is still running
if (this.worker && this.worker.child.connected) {
return this.worker.child.pid
}

// If we died within the past hour, don't restart. Something is wrong
if (this.lastStarted && ((new Date()) - this.lastStarted) < TaskWorker.MIN_RESTART_INTERVAL) {
this.dead = true
}

if (this.dead) {
return -1
}

const worker = new Worker(this.scriptPath)
worker.onmessage = this.onmessage.bind(this)
worker.child.addListener('exit', (code, signal) => {
log.warning(`Worker exited: code=${code} signal=${signal}`)
this.stop()

// Don't restart if graceful or due to SIGINT
if (code === 0 || signal === 'SIGINT') {
return
}

// Wait a random interval up to a minute before restarting
// This might help prevent a thundering herd problem
const randomFactor = (
TaskWorker.MAX_RESTART_TIMEOUT - TaskWorker.MIN_RESTART_TIMEOUT) +
TaskWorker.MIN_RESTART_TIMEOUT
setTimeout(() => this.start(), (Math.random() * randomFactor))
})


this.stop()
this.worker = worker

this.lastStarted = new Date()
return this.worker.child.pid
}

stop() {
for (const pair of this.pending.values()) {
pair[1](new Error('Worker terminated'))
}

this.backlog = 0
this.pending.clear()
this.messageId = 0

if (this.worker && this.worker.child.connected) {
this.worker.terminate()
}

this.worker = null
}
}

// Configurable knobs
// If a restart happens less than this number of ms from the last restart, flag the worker as dead
// Default: 1 hour
TaskWorker.MIN_RESTART_INTERVAL = 1000 * 60 * 60

// We wait a random amount of time before restarting a stopped worker. Default: 1-10 seconds
TaskWorker.MIN_RESTART_TIMEOUT = 1000
TaskWorker.MAX_RESTART_TIMEOUT = 1000 * 9

exports.TaskWorker = TaskWorker
90 changes: 15 additions & 75 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,24 @@ const util = require('util')
const zlib = require('zlib')

const Pool = require('./pool.js').Pool
const TaskWorker = require('./TaskWorker.js').TaskWorker
const log = require('./log.js').log
const dive = require('dive')
const iltorb = require('iltorb')
const Logger = require('basic-logger')
const S3 = require('aws-sdk/clients/s3')
const Worker = require('tiny-worker')

process.title = 'marian'

const MAXIMUM_QUERY_LENGTH = 100

// If a worker's backlog rises above this threshold, reject the request.
// This prevents the server from getting bogged down for unbounded periods of time.
const MAXIMUM_BACKLOG = 20
const WARNING_BACKLOG = 15

const STANDARD_HEADERS = {
'X-Content-Type-Options': 'nosniff'
}

const log = new Logger({
showTimestamp: true,
})

/**
* Find an acceptable compression format for the client, and return a compressed
* version of the content if possible. Otherwise return the original input text.
Expand Down Expand Up @@ -80,66 +75,6 @@ function checkMethod(req, res, method) {
return true
}

/** A web worker with a promise-oriented message-call interface. */
class TaskWorker {
/**
* Create a new TaskWorker.
* @param {string} scriptPath - A path to a JS file to execute.
*/
constructor(scriptPath) {
this.worker = new Worker(scriptPath)
this.worker.onmessage = this.onmessage.bind(this)

this.backlog = 0
this.pending = new Map()
this.messageId = 0
}

/**
* Send a message to this TaskWorker.
* @param {map} message - An object to send to the worker.
* @return {Promise}
*/
send(message) {
if (this.backlog > MAXIMUM_BACKLOG) {
throw new Error('backlog-exceeded')
}

return new Promise((resolve, reject) => {
const messageId = this.messageId
this.messageId += 1
this.backlog += 1

this.worker.postMessage({message: message, messageId: messageId})
this.pending.set(messageId, [resolve, reject])
})
}

/**
* Handler for messages received from the worker.
* @private
* @param {MessageEvent} event
* @return {Promise<?, Error>}
*/
onmessage(event) {
const pair = this.pending.get(event.data.messageId)
if (!pair) {
log.error(`Got unknown message ID ${event.data.messageId}`)
return
}

this.backlog -= 1
this.pending.delete(event.data.messageId)
const [resolve, reject] = pair
if (event.data.error) {
reject(new Error(event.data.error))
return
}

resolve(event.data)
}
}

class Index {
constructor(manifestSource) {
this.manifestSource = manifestSource
Expand All @@ -150,10 +85,11 @@ class Index {
this.currentlyIndexing = false

const MAX_WORKERS = parseInt(process.env.MAX_WORKERS) || 2
this.workers = new Pool(Math.min(os.cpus().length, MAX_WORKERS), () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js')))
const nWorkers = Math.min(os.cpus().length, MAX_WORKERS)
this.workers = new Pool(nWorkers, () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js')))

// Suspend all of our workers until we have an index
for (const worker of this.workers.pool) {
for (const worker of this.workers) {
this.workers.suspend(worker)
}
}
Expand Down Expand Up @@ -282,7 +218,7 @@ class Index {

this.errors = []
setTimeout(async () => {
for (const worker of this.workers.pool) {
for (const worker of this.workers) {
this.workers.suspend(worker)
try {
await worker.send({sync: manifests})
Expand Down Expand Up @@ -396,9 +332,15 @@ class Marian {
body = await compress(req, headers, body)

// If all workers are overloaded, return 503
let statusCode = 200
if (status.workers.filter((n) => n <= WARNING_BACKLOG).length === 0) {
statusCode = 503
// If a worker is dead, return 500
let statusCode = 503
for (const workerState of status.workers) {
if (workerState === 'd') {
statusCode = 500
break
} else if (workerState <= WARNING_BACKLOG) {
statusCode = 200
}
}

res.writeHead(statusCode, headers)
Expand Down Expand Up @@ -468,8 +410,6 @@ class Marian {
}

async function main() {
Logger.setLevel('info', true)

const server = new Marian(process.argv[2])
server.start(8080)
}
Expand Down
8 changes: 8 additions & 0 deletions src/log.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

const Logger = require('basic-logger')
Logger.setLevel('info', true)

exports.log = new Logger({
showTimestamp: true,
})
8 changes: 8 additions & 0 deletions src/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,21 @@ class Pool {

getStatus() {
return this.pool.map((worker) => {
if (worker.dead) {
return 'd'
}

if (!this.suspended.has(worker)) {
return worker.backlog
}

return 's'
})
}

[Symbol.iterator]() {
return this.pool.values()
}
}

exports.Pool = Pool
2 changes: 1 addition & 1 deletion test/test_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('Pool', () => {
})

it('Should throw if no elements are available', () => {
for (const worker of pool.pool) {
for (const worker of pool) {
pool.suspend(worker)
}

Expand Down
43 changes: 43 additions & 0 deletions test/test_taskworker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* eslint-env node, mocha */
'use strict'

const assert = require('assert')
const TaskWorker = require('../src/TaskWorker.js').TaskWorker
TaskWorker.MIN_RESTART_INTERVAL = 200
TaskWorker.MIN_RESTART_TIMEOUT = 10
TaskWorker.MAX_RESTART_TIMEOUT = 10

function promiseTimeout(time) {
return new Promise(resolve => setTimeout(resolve, time))
}

describe('TaskWorker', function() {
this.slow(1000)

const workerPath = 'test/worker.js'
const worker = new TaskWorker(workerPath)

it('Should work', async () => {
assert.equal((await worker.send('ping')).message, 'pong')
assert.equal((await worker.send('ping')).message, 'pong')
})

it('Should restart and reject stale requests', async () => {
await promiseTimeout(200)
await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated'))
await promiseTimeout(50)
assert.equal((await worker.send('ping')).message, 'pong')
})

it('Should avoid restarting too much', async () => {
assert.strictEqual(worker.dead, false)
await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated'))
await promiseTimeout(10)
await assert.rejects(async () => await worker.send('ping'), new Error('Worker not running'))
assert.strictEqual(worker.dead, true)
})

after(() => {
worker.stop()
})
})
12 changes: 12 additions & 0 deletions test/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

self.onmessage = (ev) => {
const message = ev.data.message
const messageId = ev.data.messageId

if (message === 'ping') {
self.postMessage({message: 'pong', messageId: messageId})
} else if (message === 'die') {
process.exit(1)
}
}

0 comments on commit 4cca570

Please sign in to comment.