Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[Transaction] Fix initTransaction might wait until request timeout #1991

Conversation

BewareMyPower
Copy link
Collaborator

Motivation

Most of the tests in TransactionTest take at least 10 seconds because the request.timeout.ms config is 10000. See

producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10);

It's because KoP loads the metadata topic lazily, unlike Kafka, which loads the metadata topic when started. KoP adopts this way because KoP starts before the Pulsar broker finishes the start, in this case there might be some problems with the topic lookup.

Therefore, when the Kafka producer sends the 1st INIT_PRODUCER_ID request, it will receive COORDINATOR_LOAD_IN_PROGRESS error, and the request will reenqueue [1], then the request will expire after request.timeout.ms milliseconds.

} else if (error != Errors.NOT_COORDINATOR && error != Errors.COORDINATOR_NOT_AVAILABLE) {
    if (error != Errors.COORDINATOR_LOAD_IN_PROGRESS && error != Errors.CONCURRENT_TRANSACTIONS) {
       /* ... */
    } else {
        this.reenqueue(); // [1]
    }
} else { // [2]
    TransactionManager.this.lookupCoordinator(CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
    this.reenqueue();
}

Modifications

Return COORDINATOR_NOT_AVAILABLE instead of
COORDINATOR_LOAD_IN_PROGRESS when the transaction metadata topic is loading. It's different with Kafka's behavior but it can avoid waiting 30 seconds by default for initTransaction in KoP.

In addition, configure the request timeout with 3 seconds for some cases that initTransaction might expire so that the test time could be reduced.

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@BewareMyPower
Copy link
Collaborator Author

The test takes before 2min 57s before this PR:

image

The test takes 1min 56s after this PR:

Screenshot 2023-08-02 at 23 22 49

@@ -584,7 +584,7 @@ private Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> getAndMaybeAddT
if (loadingPartitions.contains(partitionId)) {
log.info("TX Coordinator {} partition {} for transactionalId {} is loading",
transactionConfig.getTransactionMetadataTopicName(), partitionId, transactionalId);
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment to let others know why we change to COORDINATOR_NOT_AVAILABLE?

@gaoran10
Copy link
Contributor

gaoran10 commented Aug 3, 2023

Maybe it's also related to this change, I think the init-producerID request should be resent after 1 second when receiving COORDINATOR_LOAD_IN_PROGRESS exception, but it doesn't.

@Demogorgon314
Copy link
Member

Maybe it's also related to this change, I think the init-producerID request should be resent after 1 second when receiving COORDINATOR_LOAD_IN_PROGRESS exception, but it doesn't.

Yes, it will wait for the request timeout and then send the next request. I tried to change the ThrottleTimeMs to -1. Then it won't wait for the request timeout. Maybe we should investigate this.

@BewareMyPower
Copy link
Collaborator Author

@gaoran10 Good find! I will investigate it deeper.

@BewareMyPower BewareMyPower marked this pull request as draft August 3, 2023 07:20
@BewareMyPower
Copy link
Collaborator Author

BewareMyPower commented Aug 3, 2023

Yes, it's because of the throttle. When any response arrives, the NetworkClient#maybeThrottle is called before passing the response to the callback.

    private void maybeThrottle(AbstractResponse response, short apiVersion, String nodeId, long now) {
        int throttleTimeMs = response.throttleTimeMs();
        if (throttleTimeMs > 0 && response.shouldClientThrottle(apiVersion)) {
            connectionStates.throttle(nodeId, now + throttleTimeMs);
            log.trace("Connection to node {} is throttled for {} ms until timestamp {}", nodeId, throttleTimeMs,
                      now + throttleTimeMs);
        }
    }

However, the throttle is only 1 second. After debugging with Kafka client, I found the root cause is Sender#awaitNodeReady. I added some logs

                log.info("XYZ before awaitNodeReady");
                if (!awaitNodeReady(targetNode, coordinatorType)) {
                    /* ... */
                } else {
                    log.trace("XYZ awaitNodeReady done: {} {}", targetNode, coordinatorType);
                }

and found:

[2023-08-03 16:31:34,781] INFO [Producer clientId=producer-txn-id, transactionalId=txn-id] XYZ before awaitNodeReady (org.apache.kafka.clients.producer.internals.Sender:475)
[2023-08-03 16:31:44,783] TRACE [Producer clientId=producer-txn-id, transactionalId=txn-id] XYZ awaitNodeReady done: localhost:15003 (id: 1825631472 rack: null) TRANSACTION (org.apache.kafka.clients.producer.internals.Sender:481)

NetworkClientUtils#awaitReady, which is called by Sender#awaitNodeReady, is the root cause. See my code comments below.

public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
    // timeoutMs is the request timeout, which is 10 seconds in our config.
    if (timeoutMs < 0) {
        throw new IllegalArgumentException("Timeout needs to be greater than 0");
    }
    long startTime = time.milliseconds();

    // isReady will check if `throttleUntilTimeMs` of the connection is before the current timestamp.
    if (isReady(client, node, startTime) ||  client.ready(node, startTime))
        return true;

    long attemptStartTime = time.milliseconds();
    while (!client.isReady(node, attemptStartTime) && attemptStartTime - startTime < timeoutMs) {
        // Since the throttle time is 1 second, it will always go to this loop 
        if (client.connectionFailed(node)) {
            throw new IOException("Connection to " + node + " failed.");
        }
        // Since the time of `isReady()` and `client.ready()` calls are too small to count,
        // the pollTimeout is very closed to the `timeoutMs` (10 seconds)
        long pollTimeout = timeoutMs - (attemptStartTime - startTime);
        // Then, `poll` will be blocking for nearly 10 seconds because there is no inflight request
        client.poll(pollTimeout, attemptStartTime);
        if (client.authenticationException(node) != null)
            throw client.authenticationException(node);
        attemptStartTime = time.milliseconds();
    }
    return client.isReady(node, attemptStartTime);
}

The reason why returning a COORDINATOR_NOT_AVAILABLE is that before reenqueue, a FIND_COORDINATOR request is sent. In this case, maybeSendAndPollTransactionalRequest calls maybeFindCoordinatorAndRetry directly:

            } else if (coordinatorType != null) {
                log.trace("Coordinator not known for {}, will retry {} after finding coordinator.", coordinatorType, requestBuilder.apiKey());
                maybeFindCoordinatorAndRetry(nextRequestHandler);
                return true;

@BewareMyPower BewareMyPower force-pushed the bewaremypower/fix-init-txn-timeout branch from 10c76b8 to 227876c Compare August 4, 2023 05:35
@codecov
Copy link

codecov bot commented Aug 4, 2023

Codecov Report

Merging #1991 (227876c) into master (0241e43) will increase coverage by 0.00%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #1991   +/-   ##
=========================================
  Coverage     17.29%   17.29%           
  Complexity      728      728           
=========================================
  Files           190      190           
  Lines         14041    14038    -3     
  Branches       1320     1318    -2     
=========================================
  Hits           2428     2428           
+ Misses        11437    11434    -3     
  Partials        176      176           
Files Changed Coverage Δ
...ative/pulsar/handlers/kop/KafkaRequestHandler.java 1.06% <ø> (+<0.01%) ⬆️

@BewareMyPower BewareMyPower marked this pull request as ready for review August 4, 2023 05:44
@BewareMyPower
Copy link
Collaborator Author

Hi @Demogorgon314 @gaoran10, I just updated my PR according to the explanation above. IMO, it does not make sense to set a 1 second throttle ms in the InitProducerId response, so I removed the config.

@BewareMyPower BewareMyPower merged commit b645003 into streamnative:master Aug 4, 2023
19 checks passed
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Aug 14, 2023
…treamnative#1991)

### Motivation

Most of the tests in `TransactionTest` take at least 10 seconds because
the `request.timeout.ms` config is 10000. See

https://github.com/streamnative/kop/blob/1fd3bdb9158fd944c2fb7a241c0d8dca923367ab/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java#L1150

It's because KoP loads the metadata topic lazily, unlike Kafka, which
loads the metadata topic when started. KoP adopts this way because KoP
starts before the Pulsar broker finishes the start, in this case there
might be some problems with the topic lookup.

Therefore, when the Kafka producer sends the 1st INIT_PRODUCER_ID
request, it will receive `COORDINATOR_LOAD_IN_PROGRESS` error, and the
request will reenqueue [1], then the request will expire after
`request.timeout.ms` milliseconds.

```java
} else if (error != Errors.NOT_COORDINATOR && error != Errors.COORDINATOR_NOT_AVAILABLE) {
    if (error != Errors.COORDINATOR_LOAD_IN_PROGRESS && error != Errors.CONCURRENT_TRANSACTIONS) {
       /* ... */
    } else {
        this.reenqueue(); // [1]
    }
} else { // [2]
    TransactionManager.this.lookupCoordinator(CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
    this.reenqueue();
}
```

### Modifications

Return `COORDINATOR_NOT_AVAILABLE` instead of
`COORDINATOR_LOAD_IN_PROGRESS` when the transaction metadata topic is
loading. It's different with Kafka's behavior but it can avoid waiting
30 seconds by default for `initTransaction` in KoP.

In addition, configure the request timeout with 3 seconds for some cases
that `initTransaction` might expire so that the test time could be
reduced.

(cherry picked from commit b645003)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants