Skip to content

Commit

Permalink
Use a queue to split graph builder work (instead of splitting the ite…
Browse files Browse the repository at this point in the history
…rator).
  • Loading branch information
lseelenbinder committed Mar 28, 2024
1 parent d25c8ee commit b0cd4ea
Showing 1 changed file with 29 additions and 21 deletions.
50 changes: 29 additions & 21 deletions src/mjolnir/graphbuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ void BuildTileSet(const std::string& ways_file,
const std::string& complex_restriction_to_file,
const std::string& tile_dir,
const OSMData& osmdata,
std::map<GraphId, size_t>::const_iterator tile_start,
std::map<GraphId, size_t>::const_iterator tile_end,
std::queue<std::pair<GraphId, size_t>>& tiles,
std::mutex& tiles_lock,
const uint32_t tile_creation_date,
const boost::property_tree::ptree& pt,
std::promise<DataQuality>& result) {
Expand Down Expand Up @@ -453,10 +453,21 @@ void BuildTileSet(const std::string& ways_file,
std::map<std::pair<uint8_t, uint8_t>, uint32_t> langMap;
////////////////////////////////////////////////////////////////////////////
// Iterate over tiles
for (; tile_start != tile_end; ++tile_start) {
while (true) {
tiles_lock.lock();
if (tiles.empty()) {
// all work done
tiles_lock.unlock();
break;
}

auto tile = tiles.front();
tiles.pop();
tiles_lock.unlock();

try {
// What actually writes the tile
GraphId tile_id = tile_start->first.Tile_Base();
GraphId tile_id = tile.first.Tile_Base();
GraphTileBuilder graphtile(tile_dir, tile_id, false);

// Information about tile creation
Expand Down Expand Up @@ -496,20 +507,20 @@ void BuildTileSet(const std::string& ways_file,

////////////////////////////////////////////////////////////////////////
// Iterate over nodes in the tile
auto node_itr = nodes[tile_start->second];
auto node_itr = nodes[tile.second];
// to avoid realloc we guess how many edges there might be in a given tile
geo_attribute_cache.clear();
geo_attribute_cache.reserve(5 * (std::next(tile_start) == tile_end
? nodes.end() - node_itr
: std::next(tile_start)->second - tile_start->second));
// geo_attribute_cache.reserve(5 * (std::next(tile_start) == tile_end
// ? nodes.end() - node_itr
// : std::next(tile_start)->second - tile_start->second));

while (node_itr != nodes.end() && (*node_itr).graph_id.Tile_Base() == tile_id) {
// amalgamate all the node duplicates into one and the edges that connect to it
// this moves the iterator for you
auto bundle = collect_node_edges(node_itr, nodes, edges);

// Make sure node has edges
if (bundle.node_edges.size() == 0) {
if (bundle.node_edges.empty()) {
LOG_ERROR("Node has no edges - skip");
continue;
}
Expand Down Expand Up @@ -1251,7 +1262,7 @@ void BuildTileSet(const std::string& ways_file,
catch (std::exception& e) {
// ..gets sent back to the main thread
result.set_exception(std::current_exception());
LOG_ERROR((boost::format("Failed tile %1%: %2%") % tile_start->first % e.what()).str());
LOG_ERROR((boost::format("Failed tile %1%: %2%") % tile.first % e.what()).str());
return;
}
}
Expand Down Expand Up @@ -1295,25 +1306,22 @@ void BuildLocalTiles(const unsigned int thread_count,
std::vector<std::promise<DataQuality>> results(threads.size());

// Divvy up the work
size_t floor = tiles.size() / threads.size();
size_t at_ceiling = tiles.size() - (threads.size() * floor);
std::map<GraphId, size_t>::const_iterator tile_start, tile_end = tiles.begin();
std::queue<std::pair<GraphId, size_t>> tile_queue;
std::mutex tile_lock;
for (auto id : tiles) {
tile_queue.push(id);
}

// Atomically pass around stats info
for (size_t i = 0; i < threads.size(); ++i) {
// Figure out how many this thread will work on (either ceiling or floor)
size_t tile_count = (i < at_ceiling ? floor + 1 : floor);
// Where the range begins
tile_start = tile_end;
// Where the range ends
std::advance(tile_end, tile_count);
// Make the thread
threads[i].reset(new std::thread(BuildTileSet, std::cref(ways_file), std::cref(way_nodes_file),
std::cref(nodes_file), std::cref(edges_file),
std::cref(complex_from_restriction_file),
std::cref(complex_to_restriction_file), std::cref(tile_dir),
std::cref(osmdata), tile_start, tile_end, tile_creation_date,
std::cref(pt.get_child("mjolnir")), std::ref(results[i])));
std::cref(osmdata), std::ref(tile_queue), std::ref(tile_lock),
tile_creation_date, std::cref(pt.get_child("mjolnir")),
std::ref(results[i])));
}

// Join all the threads to wait for them to finish up their work
Expand Down

0 comments on commit b0cd4ea

Please sign in to comment.