Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when indexTask with ES #51

Open
qingdaojunzuo opened this issue Dec 20, 2023 · 0 comments
Open

Race condition when indexTask with ES #51

qingdaojunzuo opened this issue Dec 20, 2023 · 0 comments

Comments

@qingdaojunzuo
Copy link

qingdaojunzuo commented Dec 20, 2023

Describe the bug
Race condition found when indexTask with ES, the index requests' count sent by conductor server are not matched by received on ES side

Steps To Reproduce
Steps to reproduce the behavior:

  1. Run multiple tasks in parallel
  2. Change ES index log to debug
  3. Logged requests in ES (Using ES7 , may same in ES6)
  4. Some task status finished in IN_PROGRESS rather than COMPLETED after workflow COMPLETED
  5. On the other hand, the persistency component status is right (using postgres as persistency)
  6. indexBatchSize is default as 1 and asyncIndexingEnabled is also default as false
    Even better - add a Loom video where you walk through the steps of the error.

Expected behavior
All the task should in terminated status, such as COMPLETED/FAILED in ES rather than IN_PROGRESS

Device/browser

  • OS: Ubuntu
  • Browser N/A
  • Version 3.14

Additional context

  1. When debug log opened, following log printed right in our env, we have 3 index requests per task, which logged in ElasticSearchRestDAOV7.java -> indexTask, the average time cost is less than 30 ms

Time taken {} for indexing task:{} in workflow: {}

  1. On ES side, the received records count is less than 3 randomly

  2. Seem that, there is a race condition in function indexObject and indexBulkRequest,

`

private void indexObject(
        final String index, final String docType, final String docId, final Object doc) {

    byte[] docBytes;
    try {
        docBytes = objectMapper.writeValueAsBytes(doc);
    } catch (JsonProcessingException e) {
        logger.error("Failed to convert {} '{}' to byte string", docType, docId);
        return;
    }
    IndexRequest request = new IndexRequest(index);
    request.id(docId).source(docBytes, XContentType.JSON);

    if (bulkRequests.get(docType) == null) {
        bulkRequests.put(
                docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
    }

    bulkRequests.get(docType).getBulkRequest().add(request);
    if (bulkRequests.get(docType).getBulkRequest().numberOfActions() >= this.indexBatchSize) {
        indexBulkRequest(docType);
    }
}

private synchronized void indexBulkRequest(String docType) {
    if (bulkRequests.get(docType).getBulkRequest() != null
            && bulkRequests.get(docType).getBulkRequest().numberOfActions() > 0) {
        synchronized (bulkRequests.get(docType).getBulkRequest()) {
            indexWithRetry(
                    bulkRequests.get(docType).getBulkRequest().get(),
                    "Bulk Indexing " + docType,
                    docType);
            bulkRequests.put(
                    docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
        }
    }
}`
  • No lock found when add request to bulkRequest in indexObject
  • lock found when sent bulkRequest and removed local bulkRequest in indexBulkRequest
  • When exec with order in 2 threads as, T1 sent bulkRequest -> T2 add request to bulkRequest -> T2 might wait on synchronized of indexBulkRequest -> T1 removed local bulkRequest -> T2 runs into indexBulkRequest and failed with check, nothing to be sent/or even if T3 added a new one to empty bulkRequest so that the check past ...

Thanks

@qingdaojunzuo qingdaojunzuo closed this as not planned Won't fix, can't repro, duplicate, stale Dec 20, 2023
@qingdaojunzuo qingdaojunzuo reopened this Dec 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant