From 9c4b5242a169cb4e90922c3fc7755be3bdac8be8 Mon Sep 17 00:00:00 2001 From: Michael Russo Date: Mon, 27 Jun 2016 21:13:45 -0700 Subject: [PATCH] Enhance some logic around deleting of edges. --- .../corepersistence/CpRelationManager.java | 40 ++++++++-------- .../asyncevents/EventBuilderImpl.java | 7 ++- .../index/impl/EsEntityIndexImpl.java | 47 ++++++------------- 3 files changed, 37 insertions(+), 57 deletions(-) diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 39b3161256..6e1bade7fb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -524,44 +524,42 @@ public void removeFromCollection( String collectionName, EntityRef itemRef ) thr } Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); - org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId ); - // remove edge from collection to item GraphManager gm = managerCache.getGraphManager( applicationScope ); - List removedEdges = new ArrayList<>(); - //run our delete - gm.loadEdgeVersions( - CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) ) - .flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge)) - .doOnNext(edge -> removedEdges.add(edge)).toBlocking() - .lastOrDefault(null); + // mark the edge versions and take the first for later delete edge queue event ( load is descending ) + final Edge markedSourceEdge = gm.loadEdgeVersions( + CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) ) + .flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null); + + + Edge markedReversedEdge = null; CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName ); if (collection != null && collection.getLinkedCollection() != null) { // delete reverse edges final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() ); - gm.loadEdgeVersions( - CpNamingUtils.createEdgeFromCollectionName( memberEntity.getId(), pluralType, cpHeadEntity.getId() ) ) - .flatMap(reverseEdge -> gm.markEdge(reverseEdge)) - .flatMap(reverseEdge -> gm.deleteEdge(reverseEdge)) - .doOnNext(reverseEdge -> removedEdges.add(reverseEdge)) - .toBlocking().lastOrDefault(null); + markedReversedEdge = gm.loadEdgeVersions( + CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) ) + .flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null); } /** - * Remove from the index + * Remove from the index. This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes + * older versions of the edge(s). * */ + if( markedSourceEdge != null ) { + indexService.queueDeleteEdge(applicationScope, markedSourceEdge); + } + if( markedReversedEdge != null ){ + indexService.queueDeleteEdge(applicationScope, markedReversedEdge); + } - // item not deindexed, only edges - removedEdges.forEach(edge -> { - indexService.queueDeleteEdge(applicationScope, edge); - }); // special handling for roles collection of a group if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) { @@ -572,7 +570,7 @@ public void removeFromCollection( String collectionName, EntityRef itemRef ) thr if ( path.startsWith( "/roles/" ) ) { Entity itemEntity = - em.get( new SimpleEntityRef( memberEntity.getId().getType(), memberEntity.getId().getUuid() ) ); + em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) ); RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity ); em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 02a7588056..bbdce5a090 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -96,10 +96,9 @@ public Observable buildDeleteEdge( final ApplicationScope logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); } - return indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> { - final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return gm.deleteEdge( edge ).map( deletedEdge -> batch ); - } ); + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + return gm.deleteEdge( edge ) + .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge )); } diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 3b60b5767a..dc110f77d3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -466,8 +466,8 @@ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id enti SearchResponse searchResponse; List candidates = new ArrayList<>(); - //never let the limit be less than 2 as there are potential indefinite paging issues - final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit()); + // never let this fetch more than 100 to save memory + final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit()); final QueryBuilder entityQuery = QueryBuilders .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId())); @@ -485,41 +485,24 @@ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id enti long queryTimestamp = 0L; - while(true){ - - QueryBuilder timestampQuery = QueryBuilders - .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) - .gte(queryTimestamp); - - QueryBuilder finalQuery = QueryBuilders - .boolQuery() - .must(entityQuery) - .must(timestampQuery); - - searchResponse = srb - .setQuery(finalQuery) - .setSize(searchLimit) - .execute() - .actionGet(); - - int responseSize = searchResponse.getHits().getHits().length; - if(responseSize == 0){ - break; - } - // update queryTimestamp to be the timestamp of the last entity returned from the query - queryTimestamp = (long) searchResponse - .getHits().getAt(responseSize - 1) - .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME); + QueryBuilder timestampQuery = QueryBuilders + .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) + .gte(queryTimestamp); - candidates = aggregateScrollResults(candidates, searchResponse, null); + QueryBuilder finalQuery = QueryBuilders + .boolQuery() + .must(entityQuery) + .must(timestampQuery); - if(responseSize < searchLimit){ + searchResponse = srb + .setQuery(finalQuery) + .setSize(searchLimit) + .execute() + .actionGet(); - break; - } + candidates = aggregateScrollResults(candidates, searchResponse, null); - } } catch ( Throwable t ) { logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );