Skip to content

Commit

Permalink
Merge branch 'parallel-creative' into retriever-backport
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Aug 25, 2024
2 parents 8f9c3b5 + 0a9c6a1 commit b39c69c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 74 deletions.
25 changes: 20 additions & 5 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export default class BatchEdgeQueryHandler {
/**
* @private
*/
_queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
_queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
// Skip queueing queries to unavailable APIs
const queries = constructQueries(APIEdges, this.options).filter((query) => {
if (unavailableAPIs[query.APIEdge.query_operation.server]?.skip === true) {
Expand All @@ -74,6 +74,7 @@ export default class BatchEdgeQueryHandler {
debug(message);
this.logs.push(new LogEntry('INFO', null, message).getLog());
let finishedCount = 0;
let processedHashes = new Set<string>();
const completedLogs = this.logs;
const completedRecords: Record[] = [];
return new Promise<Record[]>((resolve) => {
Expand All @@ -85,6 +86,13 @@ export default class BatchEdgeQueryHandler {
logs: SerializableLog[];
apiUnavailable: boolean;
};

// check if this query is applicable
if (!queriesByHash[hash] || processedHashes.has(hash)) {
return;
}
processedHashes.add(hash);

completedLogs.push(...LogEntry.deserialize(logs));
completedRecords.push(...Record.unpackRecords(records, qEdge));

Expand All @@ -101,10 +109,17 @@ export default class BatchEdgeQueryHandler {
if (finishedCount >= queries.length) {
debug(`Total number of records returned for qEdge ${qEdge.id} is ${completedRecords.length}`);
resolve(completedRecords);
global.workerSide.off('message', listener); // Clean up
global.workerSide.off('message', listener);
abortSignal?.removeEventListener('abort', abort); // Clean up
}
}
function abort() {
global.workerSide.off('message', listener);
abortSignal?.removeEventListener('abort', abort); // Clean up
resolve([]);
}
global.workerSide.on('message', listener);
abortSignal?.addEventListener('abort', abort);
global.workerSide.postMessage({
threadId,
type: 'subqueryRequest',
Expand Down Expand Up @@ -171,13 +186,13 @@ export default class BatchEdgeQueryHandler {
});
}

async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
debug('Node Update Start');
// it's now a single edge but convert to arr to simplify refactoring
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
// difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qEdges);
await nodeUpdate.setEquivalentIDs(qEdges, abortSignal);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');

Expand All @@ -194,7 +209,7 @@ export default class BatchEdgeQueryHandler {

const expanded_APIEdges = this._expandAPIEdges(APIEdges);
debug('Start to query APIEdges....');
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs);
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal);
if (queryRecords === undefined) return;
debug('APIEdges are successfully queried....');
queryRecords = await this._postQueryFilter(queryRecords);
Expand Down
6 changes: 4 additions & 2 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,11 @@ export default class QueryEdgeManager {
debug(logMessage);
}

async executeEdges(): Promise<boolean> {
async executeEdges(abortSignal?: AbortSignal): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
if (abortSignal?.aborted) return false;

const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
Expand All @@ -359,7 +361,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
let queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
let queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ export default class TRAPIQueryHandler {
];
};

async query(): Promise<void> {
async query(abortSignal?: AbortSignal): Promise<void> {
this._initializeResponse();
await this.addQueryNodes();

Expand Down Expand Up @@ -686,12 +686,14 @@ export default class TRAPIQueryHandler {
}
const manager = new EdgeManager(queryEdges, metaKG, this.options);

const executionSuccess = await manager.executeEdges();
const executionSuccess = await manager.executeEdges(abortSignal);
this.logs = [...this.logs, ...manager.logs];
if (!executionSuccess) {
return;
}

if (abortSignal?.aborted) return;

const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
Expand Down
126 changes: 65 additions & 61 deletions src/inferred_mode/inferred_mode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Debug from 'debug';
import { LogEntry, StampedLog, Telemetry } from '@biothings-explorer/utils';
import { LogEntry, StampedLog, Telemetry, timeoutPromise } from '@biothings-explorer/utils';
import * as utils from '../utils';
import async from 'async';
import { biolink } from '@biothings-explorer/utils';
Expand Down Expand Up @@ -39,7 +39,6 @@ export interface CombinedResponseReport {
querySuccess: number;
queryHadResults: boolean;
mergedResults: { [resultID: string]: number };
creativeLimitHit: boolean | number;
}

// MatchedTemplate, but with IDs, etc. filled in
Expand All @@ -54,6 +53,7 @@ export default class InferredQueryHandler {
predicatePath: string;
includeReasoner: boolean;
CREATIVE_LIMIT: number;
CREATIVE_TIMEOUT: number;
constructor(
parent: TRAPIQueryHandler,
queryGraph: TrapiQueryGraph,
Expand All @@ -71,6 +71,7 @@ export default class InferredQueryHandler {
this.predicatePath = predicatePath;
this.includeReasoner = includeReasoner;
this.CREATIVE_LIMIT = process.env.CREATIVE_LIMIT ? parseInt(process.env.CREATIVE_LIMIT) : 500;
this.CREATIVE_TIMEOUT = process.env.CREATIVE_TIMEOUT_S ? parseInt(process.env.CREATIVE_TIMEOUT) * 1000 : 4.75 * 60 * 1000;
}

get queryIsValid(): boolean {
Expand Down Expand Up @@ -263,7 +264,6 @@ export default class InferredQueryHandler {
querySuccess: 0,
queryHadResults: false,
mergedResults: {},
creativeLimitHit: false,
};
let mergedThisTemplate = 0;
const resultIDsFromPrevious = new Set(Object.keys(combinedResponse.message.results));
Expand Down Expand Up @@ -477,9 +477,6 @@ export default class InferredQueryHandler {
}
report.querySuccess = 1;

if (Object.keys(combinedResponse.message.results).length >= this.CREATIVE_LIMIT && !report.creativeLimitHit) {
report.creativeLimitHit = Object.keys(newResponse.message.results).length;
}
span.finish();
return report;
}
Expand Down Expand Up @@ -572,65 +569,58 @@ export default class InferredQueryHandler {
[resultID: string]: number;
} = {};

await async.eachOfSeries(subQueries, async ({ template, queryGraph, qualifiers }, i) => {
const span = Telemetry.startSpan({ description: 'creativeTemplate' });
span.setData('template', (i as number) + 1);
i = i as number;
if (stop) {
span.finish();
return;
}
if (global.queryInformation?.queryGraph) {
global.queryInformation.isCreativeMode = true;
global.queryInformation.creativeTemplate = template;
}
const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner);
try {
// make query and combine results/kg/logs/etc
// perf debugging
const startUsage = process.cpuUsage();
const startTime = new Date().getTime();
const ncpu = require('os').cpus().length;

const completedHandlers = await Promise.all(
subQueries.map(async ({ template, queryGraph }, i) => {
const span = Telemetry.startSpan({ description: 'creativeTemplate' });
span.setData('template', i + 1);
const handler = new TRAPIQueryHandler(this.options, this.path, this.predicatePath, this.includeReasoner);
handler.setQueryGraph(queryGraph);
await handler.query();
const { querySuccess, queryHadResults, mergedResults, creativeLimitHit } = this.combineResponse(
i,
handler,
qEdgeID,
qEdge,
combinedResponse,
qualifiers,
);
// update values used in logging
successfulQueries += querySuccess;
if (queryHadResults) resultQueries.push(i);
Object.entries(mergedResults).forEach(([result, countMerged]) => {
mergedResultsCount[result] =
result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged;
});
// log to user if we should stop
if (creativeLimitHit) {
stop = true;
const message = [
`Addition of ${creativeLimitHit} results from Template ${i + 1}`,
Object.keys(combinedResponse.message.results).length === this.CREATIVE_LIMIT ? ' meets ' : ' exceeds ',
`creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${Object.keys(combinedResponse.message.results).length
} merged). `,
`Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results. Skipping remaining ${subQueries.length - (i + 1)
} `,
subQueries.length - (i + 1) === 1 ? `template.` : `templates.`,
].join('');
try {
await timeoutPromise(handler.query(AbortSignal.timeout(this.CREATIVE_TIMEOUT)), this.CREATIVE_TIMEOUT);
} catch (error) {
handler.logs.forEach((log) => {
combinedResponse.logs.push(log);
});
const message = `ERROR: Template-${i + 1} failed due to error ${error}`;
debug(message);
combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog());
combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog());
span.finish();
return undefined;
}
span.finish();
} catch (error) {
handler.logs.forEach((log) => {
combinedResponse.logs.push(log);
});
const message = `ERROR: Template-${i + 1} failed due to error ${error}`;
debug(message);
combinedResponse.logs.push(new LogEntry(`ERROR`, null, message).getLog());
span.finish();
return;
}
});
return { i, handler };
})
);

// perf debugging
const endTime = new Date().getTime();
const timeDelta = (endTime - startTime) * 10 * ncpu;
const { user, system } = process.cpuUsage(startUsage);
debug(`Average CPU Usage: ${(system + user) / timeDelta}%`);

for (const handlerInfo of completedHandlers) {
if (handlerInfo === undefined) continue;
const { i, handler } = handlerInfo;
const { querySuccess, queryHadResults, mergedResults } = this.combineResponse(
i,
handler,
qEdgeID,
qEdge,
combinedResponse,
);
successfulQueries += querySuccess;
if (queryHadResults) resultQueries.push(i);
Object.entries(mergedResults).forEach(([result, countMerged]) => {
mergedResultsCount[result] =
result in mergedResultsCount ? mergedResultsCount[result] + countMerged : countMerged;
});
}

// log about merged Results
if (Object.keys(mergedResultsCount).length) {
// Add 1 for first instance of result (not counted during merging)
Expand Down Expand Up @@ -660,6 +650,20 @@ export default class InferredQueryHandler {
response.message.results = Object.values(combinedResponse.message.results).sort((a, b) => {
return b.analyses[0].score - a.analyses[0].score ? b.analyses[0].score - a.analyses[0].score : 0;
});

// log about trimming results
if (response.message.results.length > this.CREATIVE_LIMIT) {
const message = [
`Number of results exceeds`,
`creative result maximum of ${this.CREATIVE_LIMIT} (reaching ${
Object.keys(response.message.results).length
} merged). `,
`Response will be truncated to top-scoring ${this.CREATIVE_LIMIT} results.`
].join('');
debug(message);
combinedResponse.logs.push(new LogEntry(`INFO`, null, message).getLog());
}

// trim extra results and prune kg
response.message.results = response.message.results.slice(0, this.CREATIVE_LIMIT);
response.description = `Query processed successfully, retrieved ${response.message.results.length} results.`;
Expand Down
8 changes: 4 additions & 4 deletions src/update_nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ export default class NodesUpdateHandler {
* Resolve input ids
* @param {object} curies - each key represents the category, e.g. gene, value is an array of curies.
*/
async _getEquivalentIDs(curies: ResolverInput): Promise<SRIResolverOutput> {
async _getEquivalentIDs(curies: ResolverInput, abortSignal?: AbortSignal): Promise<SRIResolverOutput> {
// const resolver = new id_resolver.Resolver('biolink');
// const equivalentIDs = await resolver.resolve(curies);
return await resolveSRI(curies);
return await resolveSRI(curies, abortSignal);
}

async setEquivalentIDs(qEdges: QEdge[]): Promise<void> {
async setEquivalentIDs(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<void> {
debug(`Getting equivalent IDs...`);
const curies = this._getCuries(this.qEdges);
debug(`curies: ${JSON.stringify(curies)}`);
const equivalentIDs = await this._getEquivalentIDs(curies);
const equivalentIDs = await this._getEquivalentIDs(curies, abortSignal);
qEdges.map((qEdge) => {
const edgeEquivalentIDs = Object.keys(equivalentIDs)
.filter((key) => qEdge.getInputCurie().includes(key))
Expand Down

0 comments on commit b39c69c

Please sign in to comment.