Skip to content

Commit

Permalink
Enhance some logic around deleting of edges.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Russo committed Jun 28, 2016
1 parent 09280b1 commit 9c4b524
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,44 +524,42 @@ public void removeFromCollection( String collectionName, EntityRef itemRef ) thr
}

Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );


// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );


List<Edge> removedEdges = new ArrayList<>();
//run our delete
gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
.flatMap(edge -> gm.markEdge(edge)).flatMap(edge -> gm.deleteEdge(edge))
.doOnNext(edge -> removedEdges.add(edge)).toBlocking()
.lastOrDefault(null);

// mark the edge versions and take the first for later delete edge queue event ( load is descending )
final Edge markedSourceEdge = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, entityId ) )
.flatMap(edge -> gm.markEdge(edge)).toBlocking().firstOrDefault(null);


Edge markedReversedEdge = null;
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
if (collection != null && collection.getLinkedCollection() != null) {
// delete reverse edges
final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( memberEntity.getId(), pluralType, cpHeadEntity.getId() ) )
.flatMap(reverseEdge -> gm.markEdge(reverseEdge))
.flatMap(reverseEdge -> gm.deleteEdge(reverseEdge))
.doOnNext(reverseEdge -> removedEdges.add(reverseEdge))
.toBlocking().lastOrDefault(null);
markedReversedEdge = gm.loadEdgeVersions(
CpNamingUtils.createEdgeFromCollectionName( entityId, pluralType, cpHeadEntity.getId() ) )
.flatMap(reverseEdge -> gm.markEdge(reverseEdge)).toBlocking().firstOrDefault(null);
}


/**
* Remove from the index
* Remove from the index. This will call gm.deleteEdge which also deletes the reverse edge(s) and de-indexes
* older versions of the edge(s).
*
*/
if( markedSourceEdge != null ) {
indexService.queueDeleteEdge(applicationScope, markedSourceEdge);
}
if( markedReversedEdge != null ){
indexService.queueDeleteEdge(applicationScope, markedReversedEdge);

}

// item not deindexed, only edges
removedEdges.forEach(edge -> {
indexService.queueDeleteEdge(applicationScope, edge);
});

// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
Expand All @@ -572,7 +570,7 @@ public void removeFromCollection( String collectionName, EntityRef itemRef ) thr
if ( path.startsWith( "/roles/" ) ) {

Entity itemEntity =
em.get( new SimpleEntityRef( memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
em.get( new SimpleEntityRef( entityId.getType(), entityId.getUuid() ) );

RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName(), Optional.fromNullable(itemEntity) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}

return indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
return gm.deleteEdge( edge ).map( deletedEdge -> batch );
} );
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
return gm.deleteEdge( edge )
.flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id enti
SearchResponse searchResponse;
List<CandidateResult> candidates = new ArrayList<>();

//never let the limit be less than 2 as there are potential indefinite paging issues
final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
// never let this fetch more than 100 to save memory
final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());

final QueryBuilder entityQuery = QueryBuilders
.termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId()));
Expand All @@ -485,41 +485,24 @@ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id enti

long queryTimestamp = 0L;

while(true){

QueryBuilder timestampQuery = QueryBuilders
.rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
.gte(queryTimestamp);

QueryBuilder finalQuery = QueryBuilders
.boolQuery()
.must(entityQuery)
.must(timestampQuery);

searchResponse = srb
.setQuery(finalQuery)
.setSize(searchLimit)
.execute()
.actionGet();

int responseSize = searchResponse.getHits().getHits().length;
if(responseSize == 0){
break;
}

// update queryTimestamp to be the timestamp of the last entity returned from the query
queryTimestamp = (long) searchResponse
.getHits().getAt(responseSize - 1)
.getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
QueryBuilder timestampQuery = QueryBuilders
.rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
.gte(queryTimestamp);

candidates = aggregateScrollResults(candidates, searchResponse, null);
QueryBuilder finalQuery = QueryBuilders
.boolQuery()
.must(entityQuery)
.must(timestampQuery);

if(responseSize < searchLimit){
searchResponse = srb
.setQuery(finalQuery)
.setSize(searchLimit)
.execute()
.actionGet();

break;
}
candidates = aggregateScrollResults(candidates, searchResponse, null);

}
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
Expand Down

0 comments on commit 9c4b524

Please sign in to comment.