Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Monitor output as Object #375

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ $env:DEBUG='puppeteer-cluster:*';node examples/minimal
* [cluster.task(taskFunction)](#clustertasktaskfunction)
* [cluster.queue([data] [, taskFunction])](#clusterqueuedata--taskfunction)
* [cluster.execute([data] [, taskFunction])](#clusterexecutedata--taskfunction)
* [cluster.getMonitorObject()](#clustergetmonitorobject)
* [cluster.idle()](#clusteridle)
* [cluster.close()](#clusterclose)

Expand Down Expand Up @@ -172,6 +173,7 @@ Emitted when a task is queued via [Cluster.queue] or [Cluster.execute]. The firs
- `skipDuplicateUrls` <[boolean]> If set to `true`, will skip URLs which were already crawled by the cluster. Defaults to `false`. If you use this field, the queued `data` must be your URL or `data` must be an object containing a field called `url`.
- `timeout` <[number]> Specify a timeout for all tasks. Defaults to `30000` (30 seconds).
- `monitor` <[boolean]> If set to `true`, will provide a small command line output to provide information about the crawling process. Defaults to `false`.
- `monitorObject` <[boolean]> If set to `true`, will provide a method that provides an object with information about the crawling process. Default `false`.
- `workerCreationDelay` <[number]> Time between creation of two workers. Set this to a value like `100` (0.1 seconds) in case you want some time to pass before another worker is created. You can use this to prevent a network peak right at the start. Defaults to `0` (no delay).
- `puppeteer` <[Object]> In case you want to use a different puppeteer library (like [puppeteer-core](https://github.com/GoogleChrome/puppeteer/blob/master/docs/api.md#puppeteer-vs-puppeteer-core) or [puppeteer-extra](https://github.com/berstend/puppeteer-extra)), pass the object here. If not set, will default to using puppeteer. When using `puppeteer-core`, make sure to also provide `puppeteerOptions.executablePath`.
- returns: <[Promise]<[Cluster]>>
Expand Down Expand Up @@ -212,6 +214,10 @@ Be aware that this function only returns a Promise for backward compatibility re

Works like [Cluster.queue], but this function returns a Promise which will be resolved after the task is executed. That means, that the job is still queued, but the script will wait for it to be finished. In case an error happens during the execution, this function will reject the Promise with the thrown error. There will be no "taskerror" event fired. In addition, tasks queued via execute will ignore "retryLimit" and "retryDelay". For an example see the [Execute example](examples/execute.js).

### cluster.getMonitorObject()
- if in the cluster options the option `monitorObject` is set to `true` this method will provide an object with all information of the `monitor` functionality. Maybe used for API-Calls or other use cases.
- returns: object

#### cluster.idle()
- returns: <[Promise]>

Expand Down
81 changes: 80 additions & 1 deletion src/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interface ClusterOptions {
puppeteerOptions: LaunchOptions;
perBrowserOptions: LaunchOptions[] | undefined;
monitor: boolean;
monitorObject: boolean;
timeout: number;
retryLimit: number;
retryDelay: number;
Expand All @@ -30,6 +31,21 @@ interface ClusterOptions {
puppeteer: any;
}

interface MonitorObject {
now: number;
timeDiff: number;
doneTargets: number;
donePercentage: string;
errorPerc: string;
timeRunning: string;
timeRemaining: string;
cpuUsage : string;
memoryUsage: string;
pagesPerSecond: string;
workerCount: number;
worker: string[];
}

type Partial<T> = {
[P in keyof T]?: T[P];
};
Expand All @@ -45,6 +61,7 @@ const DEFAULT_OPTIONS: ClusterOptions = {
},
perBrowserOptions: undefined,
monitor: false,
monitorObject: false,
timeout: 30 * 1000,
retryLimit: 0,
retryDelay: 0,
Expand Down Expand Up @@ -168,7 +185,7 @@ export default class Cluster<JobData = any, ReturnData = any> extends EventEmitt
throw new Error(`Unable to launch browser, error message: ${err.message}`);
}

if (this.options.monitor) {
if (this.options.monitor || this.options.monitorObject) {
await this.systemMonitor.init();
}

Expand Down Expand Up @@ -472,6 +489,68 @@ export default class Cluster<JobData = any, ReturnData = any> extends EventEmitt
debug('Closed');
}

public getMonitorObject(): MonitorObject {
const monitorObject: MonitorObject = {
now: Date.now(),
timeDiff: 0,
doneTargets: 0,
donePercentage: '',
errorPerc: '',
timeRunning: '',
timeRemaining: '',
cpuUsage : '',
memoryUsage: '',
pagesPerSecond: '',
workerCount: this.workers.length + this.workersStarting,
worker: [] as string[],
};

if (!this.options.monitorObject) return monitorObject;

monitorObject.timeDiff = monitorObject.now - this.startTime;
monitorObject.doneTargets = this.allTargetCount - this.jobQueue.size() -
this.workersBusy.length;
const donePercentage = this.allTargetCount === 0
? 1 : (monitorObject.doneTargets / this.allTargetCount);

monitorObject.donePercentage = (100 * donePercentage).toFixed(2);
monitorObject.errorPerc = monitorObject.doneTargets === 0 ?
'0.00' : (100 * this.errorCount / monitorObject.doneTargets).toFixed(2);

monitorObject.timeRunning = util.formatDuration(monitorObject.timeDiff);

let timeRemainingMillis = -1;
if (donePercentage !== 0) {
timeRemainingMillis = ((monitorObject.timeDiff) / donePercentage)
- monitorObject.timeDiff;
}
monitorObject.timeRemaining = util.formatDuration(timeRemainingMillis);

monitorObject.cpuUsage = this.systemMonitor.getCpuUsage().toFixed(1);
monitorObject.memoryUsage = this.systemMonitor.getMemoryUsage().toFixed(1);

monitorObject.pagesPerSecond = monitorObject.doneTargets === 0 ?
'0' : (monitorObject.doneTargets * 1000 / monitorObject.timeDiff).toFixed(2);

this.workers.forEach((worker, i) => {
const isIdle = this.workersAvail.indexOf(worker) !== -1;
let workOrIdle;
let workerUrl = '';
if (isIdle) {
workOrIdle = 'IDLE';
} else {
workOrIdle = 'WORK';
if (worker.activeTarget) {
workerUrl = worker.activeTarget.getUrl() || 'UNKNOWN TARGET';
} else {
workerUrl = 'NO TARGET (should not be happening)';
}
}
monitorObject.worker.push(`#${i} ${workOrIdle} ${workerUrl}`);
});
return monitorObject;
}

private monitor(): void {
if (!this.display) {
this.display = new Display();
Expand Down
29 changes: 29 additions & 0 deletions test/Cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,35 @@ describe('options', () => {
await cluster.idle();
await cluster.close();
});

test('monitorObject enabled', async () => {
const cluster = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_CONTEXT,
puppeteerOptions: { args: ['--no-sandbox'] },
maxConcurrency: 1,
monitor: false,
monitorObject: true,
});
cluster.on('taskerror', (err) => {
throw err;
});

cluster.task(async () => {
await new Promise(resolve => setTimeout(resolve, 550));
});

cluster.queue(TEST_URL);

// there should be at least one logging call in a 500ms interval
output = '';
await new Promise(resolve => setTimeout(resolve, 510));
const monitorObject = cluster.getMonitorObject();
expect(monitorObject.workerCount).toBeGreaterThan(0);
expect(monitorObject.cpuUsage.length).toBeGreaterThan(0);

await cluster.idle();
await cluster.close();
});
});

});
Expand Down