diff --git a/src/lookupStream.js b/src/lookupStream.js index 36c1756..fb1cf88 100644 --- a/src/lookupStream.js +++ b/src/lookupStream.js @@ -1,4 +1,5 @@ const _ = require('lodash'); +const async = require('async'); const parallelTransform = require('parallel-transform'); const logger = require( 'pelias-logger' ).get( 'wof-admin-lookup' ); const getAdminLayers = require( './getAdminLayers' ); @@ -10,72 +11,97 @@ function hasAnyMultiples(result) { }); } -function createPipResolverStream(pipResolver, config) { - return function (doc, callback) { - // don't do anything if there's no centroid - if (_.isEmpty(doc.getCentroid())) { - return callback(null, doc); - } +function updateDocument(doc, config, point, result) { + // log results w/o country OR any multiples + if (_.isEmpty(result.country)) { + logger.debug('no country', { + centroid: point, + result: result + }); + } + if (hasAnyMultiples(result)) { + logger.debug('multiple values', { + centroid: point, + result: result + }); + } - pipResolver.lookup(doc.getCentroid(), getAdminLayers(doc.getLayer()), (err, result) => { - if (err) { - // if there's an error, just log it and move on - logger.error(`PIP server failed: ${(err.message || JSON.stringify(err))}`, { - id: doc.getGid(), - lat: doc.getCentroid().lat, - lon: doc.getCentroid().lon - }); - // don't pass the unmodified doc along - return callback(); - } + doc.getParentFields() + // filter out placetypes for which there are no values + .filter((placetype) => { return !_.isEmpty(result[placetype]); }) + // assign parents into the doc + .forEach((placetype) => { + const values = result[placetype]; + + try { + // addParent can throw an error if, for example, name is an empty string + doc.addParent(placetype, values[0].name, values[0].id.toString(), values[0].abbr); - // log results w/o country OR any multiples - if (_.isEmpty(result.country)) { - logger.debug('no country', { - centroid: doc.getCentroid(), - result: result - }); } - if (hasAnyMultiples(result)) { - logger.debug('multiple values', { - centroid: doc.getCentroid(), - result: result + catch (err) { + logger.warn('invalid value', { + centroid: point, + result: { + type: placetype, + values: values + } }); } + }); - doc.getParentFields() - // filter out placetypes for which there are no values - .filter((placetype) => { return !_.isEmpty(result[placetype]); } ) - // assign parents into the doc - .forEach((placetype) => { - const values = result[placetype]; + // prefer a 'postal city' locality when a valid postal code is available + // optionally enable/disable this functionality using config variable. + if (config && true === config.usePostalCities) { + usePostalCity(result, doc); + } +} - try { - // addParent can throw an error if, for example, name is an empty string - doc.addParent(placetype, values[0].name, values[0].id.toString(), values[0].abbr); +function createPipResolverStream(pipResolver, config) { + return function (doc, callback) { + // don't do anything if there's no centroid + if (_.isEmpty(doc.getCentroid())) { + return callback(null, doc); + } - } - catch (err) { - logger.warn('invalid value', { - centroid: doc.getCentroid(), - result: { - type: placetype, - values: values - } + let points = [doc.getCentroid()]; + const layers = getAdminLayers(doc.getLayer()); + + // optionally, perform multiple lookups per document if additional + // points are specified in the 'pip' $meta property. + const pip = doc.getMeta('pip'); + if( _.isArray(pip) && !_.isEmpty(pip) ){ + points = points.concat( + pip.filter(point => { + return _.isPlainObject(point) && _.has(point, 'lon') && _.has(point, 'lat'); + }) + ); + } + + var lookups = points.map(point => { + return (cb) => { + pipResolver.lookup(point, layers, (err, result) => { + if (err) { + // if there's an error, just log it and move on + logger.error(`PIP server failed: ${(err.message || JSON.stringify(err))}`, { + id: doc.getGid(), + lat: point.lat, + lon: point.lon }); + // don't pass the unmodified doc along + return cb(err); } - } - ); + updateDocument(doc, config, point, result); + cb(); + }); + }; + }); - // prefer a 'postal city' locality when a valid postal code is available - // optionally enable/disable this functionality using config variable. - if( config && true === config.usePostalCities ){ - usePostalCity( result, doc ); + async.parallel(lookups, (err) => { + if (err) { + return callback(); } - callback(null, doc); - }); }; }