Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple issues with apoc.cypher.mapParallel2 execution and results #1373

Closed
cbartens opened this issue Jan 8, 2020 · 5 comments
Closed

Comments

@cbartens
Copy link

cbartens commented Jan 8, 2020

As per conversation with @sarmbruster here https://community.neo4j.com/t/how-best-to-do-parallel-processing/13342/12

After some extensive trial and error I made the following observations in regards to apoc.cypher.mapParallel2, some of which may be user error still but some of which sound like some potential issues worth looking into.

Issue 1 - CPU goes to 100% and instance becomes unresponsive

This still happens, no idea why. I will check the logs when I have some time. Would be good to find out - or even better if one could wrap the parallel function into some other function that prevents this from happening. Reality is it's unlikely that a normal user would get these queries right on the first try so it involves a lot a trial and error and having to shut down the instance every so often as it becomes unresponsive is not making that any easier, not to mention having to apologize to colleagues :slight_smile:

Issue 2 - Success, I now have a query that delivers what I need (with some buts)

After some playing around I settled on the following query which works fine.

MATCH (e:Employee)-[:LINKED_TO|:MATCHED_TO]->(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS items 
CALL apoc.cypher.mapParallel2("
	OPTIONAL MATCH (_)-[r2]-(o:EffortObject) 
    WHERE r2.Effort = 'yes' and not r2.TimeEvent is null and r2.TimeEvent >= '2017-10-01' and r2.TimeEvent <= '2017-12-31' 
    RETURN date(datetime(r2.TimeEvent)) as date, count(distinct r2.IdUnique) as events",
    {parallel:True, batchSize:2000, concurrency:20}, items, 4) YIELD value 
RETURN value.date as date, sum(value.events) as events 
ORDER BY date

Some key observations that might help others:

  • The parallel query is about 3x times faster than the normal query, 5s vs 15s
  • Using MATCH inside the parallel query does not work but OPTIONAL MATCH does
  • The parallel query does not seem to be able to cope with longer MATCH paths, i.e. (_)-[r2]-(o:EffortObject) works but (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) does not
  • The best results are achieved by splitting the MATCH between the normal and parallel query, i.e. the complete MATCH phrase is (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)-[r]-(:EffortObject) so in my case the normal part of the query takes care of (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser) the results of which are handed off to the parallel query as (_) and then the parallel query does the rest as in (:EffortUser)-[r]-(:EffortObject) respectively (_)-[r]-(:EffortObject)
  • Using RETURN DISTINCT inside the parallel query can break the query
  • The variables you add to the parallel query RETURN part matter a lot, to the point where adding or removing the wrong variable can break the entire query
  • The parallel query config part {parallel:True, batchSize:2000, concurrency:20} does impact the query and shaves another 1s off it reducing it from 5s to 1s and batchSize:2000 and concurrency:20 seem to be the optimal settings, anything higher or lower than that and the query time increases again
  • But the most important observation I made is that the parallel query returns slightly different results to the normal query, details see table below - for us I consider this delta to be low enough to be useable but I observed a max 28% difference on one day which needs monitoring

c11aa4f206bc94685f63f7e4fe4845836a501828

@DKroot
Copy link

DKroot commented Jan 10, 2020

The latest apoc.cypher.mapParallel2() version is unstable in my experience.

I've been trying to use apoc.cypher.mapParallel2() with the latest Neo4j and APOC versions and clearly observed the issue with it randomly returning partial or no results. I have a plain Cypher query calculating several metrics on batches of input node pairs (node ids) in a very large graph: overall around 33,000+ input records. I batch the input to use e.g. 100 records as a batch size, which seems to be the optimal performance point. Each input record results in 1 output record. I then run the script while checking that the actual output row number matches the expected. Here is one of the Cypher queries in question:

// E(x,y)/(|N(x)|*|N(y)|) Index
// 10 pairs: 14.7s
// 20 pairs: 6.0s-8.6s
// *25 pairs: 6.1s-10.7s*
// 30 pairs: 15.9s
// 40 pairs: 46.0s
// 50 pairs: 30.7s
WITH $JDBC_conn_string AS db, $sql_query AS sql
CALL apoc.load.jdbc(db, sql) YIELD row
MATCH (x:Publication {node_id: row.cited_1})<--(Nxy)-->(y:Publication {node_id: row.cited_2})
WITH min(Nxy.pub_year) AS first_co_citation_year, row.cited_1 AS x_scp, row.cited_2 AS y_scp
OPTIONAL MATCH (x:Publication {node_id: x_scp})<--(Nx:Publication)
  WHERE Nx.pub_year <= first_co_citation_year
WITH count(Nx) AS nx_size, first_co_citation_year, x_scp, y_scp
OPTIONAL MATCH (y:Publication {node_id: y_scp})<--(Ny:Publication)
  WHERE Ny.pub_year <= first_co_citation_year
WITH count(Ny) AS ny_size, nx_size, first_co_citation_year, x_scp, y_scp
OPTIONAL MATCH
  (x:Publication {node_id: x_scp})<--(Ex:Publication)-[E]-(Ey:Publication)-->(y:Publication {node_id: y_scp})
  WHERE startNode(E).pub_year <= first_co_citation_year
RETURN x_scp AS cited_1, y_scp AS cited_2, toFloat(count(E)) / (nx_size * ny_size) AS e_co_citation_conditional_index;

Now, apoc.cypher.mapParallel2() version of the same query provides clear performance gains:

// E(x,y)/(|N(x)|*|N(y)|) Index
// 10 pairs: 0.8s-1.8s-7.2s
// 20 pairs: 4.3s
// 25 pairs: 3.2-3.8s-9.2s
// *30 pairs: 2.7-3.4-7.4s*
// 40 pairs: FAILED
// 50 pairs: FAILED
WITH $JDBC_conn_string AS db, $sql_query AS sql
CALL apoc.load.jdbc(db, sql) YIELD row
WITH collect({x_scp: row.cited_1, y_scp: row.cited_2}) AS pairs
CALL apoc.cypher.mapParallel2('
  MATCH (x:Publication {node_id: _.x_scp})<--(Nxy)-->(y:Publication {node_id: _.y_scp})
  WITH min(Nxy.pub_year) AS first_co_citation_year, _.x_scp AS x_scp, _.y_scp AS y_scp
  OPTIONAL MATCH (x:Publication {node_id: x_scp})<--(Nx:Publication)
    WHERE Nx.pub_year <= first_co_citation_year
  WITH count(Nx) AS nx_size, first_co_citation_year, x_scp, y_scp
  OPTIONAL MATCH (y:Publication {node_id: y_scp})<--(Ny:Publication)
    WHERE Ny.pub_year <= first_co_citation_year
  WITH count(Ny) AS ny_size, nx_size, first_co_citation_year, x_scp, y_scp
  OPTIONAL MATCH
    (x:Publication {node_id: x_scp})<--(Ex:Publication)-[E]-(Ey:Publication)-->(y:Publication {node_id: y_scp})
    WHERE startNode(E).pub_year <= first_co_citation_year
  RETURN x_scp, y_scp, toFloat(count(E)) / (nx_size * ny_size) AS e_index',
{}, pairs, 8) YIELD value
RETURN value.x_scp AS cited_1, value.y_scp AS cited_2, value.e_index AS e_index;

However, when I run it the query randomly returns less than the expected number of records. I could not figure out any particular pattern on when it happens. It seems to be happening less frequently on smaller batch sizes. I tried to reduce them, but I could not complete the run on 33,000 input records even for the batch size = 1.

@cbartens
Copy link
Author

Update - After running and comparing many more standard vs parallel queries I'm pretty sure the partial results issue is limited to querying / counting relationships. Whenever I count nodes the counts returned by the parallel query always match the standard query 100%.

@DKroot
Copy link

DKroot commented Jan 19, 2020

I've observed this problem on counting nodes as well:

// Jaccard Co-Citation* Conditional (<= first_co_citation_year) Index
WITH $JDBC_conn_string AS db, $sql_query AS sql
CALL apoc.load.jdbc(db, sql) YIELD row
WITH collect({x_scp: row.cited_1, y_scp: row.cited_2}) AS pairs
CALL apoc.cypher.mapParallel2('
  MATCH (x:Publication {node_id: _.x_scp})<--(Nxy)-->(y:Publication {node_id: _.y_scp})
  WITH count(Nxy) AS intersect_size, min(Nxy.pub_year) AS first_co_citation_year, _.x_scp AS x_scp, _.y_scp AS y_scp
  OPTIONAL MATCH (x:Publication {node_id: x_scp})<--(Nx:Publication)
    WHERE Nx.node_id <> y_scp AND Nx.pub_year <= first_co_citation_year
  WITH collect(Nx) AS nx_list, intersect_size, first_co_citation_year, x_scp, y_scp
  OPTIONAL MATCH (y:Publication {node_id: y_scp})<--(Ny:Publication)
    WHERE Ny.node_id <> x_scp AND Ny.pub_year <= first_co_citation_year
  WITH nx_list + collect(Ny) AS union_list, intersect_size, x_scp, y_scp
  UNWIND union_list AS union_node
  RETURN x_scp, y_scp, toFloat(intersect_size) / (count(DISTINCT union_node) + 2) AS jaccard_index',
{}, pairs, 8) YIELD value
RETURN
  value.x_scp AS cited_1, value.y_scp AS cited_2, value.jaccard_index AS jaccard_co_citation_conditional_star_index;

@schivmeister
Copy link

I have spent considerable time behind troubleshooting this, thinking it was PEBKAC. I had successfully used mapParallel2 on a couple of occasions before, so did not expect further trouble, much to my disappointment.

Of the successful attempts, one was of much smaller data scale. Everyone's querying case is different, but I can generalize one particular use case to that of finding duplicates, with a representative pattern being something like this:

MATCH (g:Genre)<-[:OF_TYPE]-(m) WHERE exists(m.rating)
WITH collect(DISTINCT g) as genres
CALL apoc.cypher.mapParallel2("
	WITH (_) as g
	MATCH (m)-[:OF_TYPE]->(g) WHERE exists(m.rating)
	MATCH (m)-[:PUBLISHED_IN]->(c:Category) WHERE exists(c.name)
	WITH g, c, count(m) as worksCount
	MATCH (m)-[:OF_TYPE]->(g) WHERE exists(m.rating)
	MATCH (m)-[:PUBLISHED_IN]->(w:Workshop)
	MATCH (m)-[:PUBLISHED_FOR]->(a:Audience)
	MATCH (m)-[:PUBLISHED_BY]->(p:Publisher)
	MATCH (m)-[:PUBLISHED_IN]->(c)
	WITH m.rating as rating, c.name as name, g, w, a, p, collect({cat: c, worksCount: worksCount}) as dupes
	WITH rating, name, g, w, a, p, dupes, size(dupes) as dupeCount WHERE dupeCount > 1
	WITH rating, name, apoc.coll.sortMaps(dupes, 'worksCount') as sortedDupes, dupeCount
	WITH rating, name, sortedDupes as dupes, dupeCount
	RETURN rating, dupes, dupeCount
", {}, genres, 80) YIELD value
RETURN count(*)

The complexity here is three-fold: (i) binding an aggregate or collection for later use (worksCount), (ii) the number of items being matched or grouped for identifying duplicates, and (iii) the item being collected or deduplicated upon for the grouping, which can decrease or increase scan size. And, of course, the millions of datapoints in scope.

These are the typical cardinalities involved:

  • g 80
  • m 10,000,000
  • c 1,000,000
  • w 100,000
  • a 10,000
  • p 1,000,000

The difference between the successful and failed attempts is that of potential result size. In order to test queries like this and validate the parallelization procedure, I have scripted up a serial loop (over g) achieving the same outcome (a final count).

In the successful case, I do not bind a carry-over collection, group on different items, and collect for the grouping a different item (m vs. c). Even though in that case the cardinalities are higher, the results are smaller:

  • For 1 g (highest) = 2,00
  • For all g (sum) = 1,200

vs. the failed case:

  • For 1 g (highest) = 1,200
  • For all g (sum) = 12,000

I was initially trying to find the right partitions size, but I realized that was moot. My fallback partition size is the number of items in the list to parallelize on (g), so you see 80, which worked for the successful case (better than any lower or higher).

I see that the default partition size is 100 * [number of cores] according to one discussion and linked APOC source. [1] However, for my failed case, varying neither this nor the batch or concurrency sizes makes any difference.

Given an environment with a 16-core (8 actual) VM with no other significant OS processes, a dataset size of 40GB with a total RAM of 32GB, and Neo4j 3.5, these are the observations for the failed case for different partitions, batches and concurrency:

  • 0 results
  • Partial results, totally off the mark (7, 10, etc.)
  • What looks like orphaned queries listed by dbms.listQueries, sometimes a couple, sometimes several
  • Zombified OS threads continuing to consume CPU (the case of an unresponsive Neo4j server requiring killing of the JVM)

I understand this is "community-supported", but it would be good to get this working across the board reliably. Not all use cases are write-oriented or suitable for periodic.iterate, and not all of us have moved to 4.x (in case the parallelization/distributed options there are in any way better).

Nevertheless, thanks to all APOC contributors for your continued work on providing and improving these extensions.

[1] https://stackoverflow.com/questions/61047587/difference-between-apoc-cypher-mapparallel-and-apoc-cypher-mapparallel2-in-neo4j

@vga91
Copy link
Collaborator

vga91 commented Jan 15, 2025

Currently mapParallel2 is deprecated in favor of Cypher statements: see here

@vga91 vga91 closed this as completed Jan 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants