Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async lookups return incorrect results because JsonRowDataSerializationSchema is not thread-safe #121

Open
grzegorz8 opened this issue Aug 30, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@grzegorz8
Copy link
Member

I noticed that lookup in async mode may not work as expected. Namely, when multiple HTTP requests are sent at once in async mode, some of the results seems to be duplicated for other rows.

I believe the root cause is the fact that org.apache.flink.formats.json.JsonRowDataSerializationSchema is not thread-safe.

    /** Reusable object node. */
    private transient ObjectNode node;

    @Override
    public byte[] serialize(RowData row) {
        if (node == null) {
            node = mapper.createObjectNode();
        }

        try {
            runtimeConverter.convert(mapper, node, row);
            return mapper.writeValueAsBytes(node);
        } catch (Throwable t) {
            throw new RuntimeException(String.format("Could not serialize row '%s'.", row), t);
        }
    }

The serialization schema is used in com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreator#createLookupQuery.

@kristoffSC kristoffSC added the bug Something isn't working label Aug 31, 2024
@kristoffSC
Copy link
Collaborator

kristoffSC commented Aug 31, 2024

Hi @grzegorz8
Many thanks for debugging and reporting this, its a tricky one... also I think we have the same issue with responseBodyDecoder...

I've looked at KinesisFireHoseSink Connector - https://issues.apache.org/jira/browse/FLINK-24228 where it seems the JsonRowDataSerializationSchema is used in a same way as we do here... the only difference is that we are using it in Lookup not Sink...

It seems that AsyncSinkWriter has a build in support for this via ElementConverter interface -> KinesisFirehoseSinkElementConverter and this parts seems not existing for Lookup Sinks, or maybe I dont see it.

My suggestion for now is to... wrap
LookupQueryInfo lookupQueryInfo = lookupQueryCreator.createLookupQuery(lookupRow);
and
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));

with synchronized block...

Also maybe worth reporting this lack of functionality to Flink?

@kristoffSC
Copy link
Collaborator

Ok @grzegorz8
I think we have an alternative solution.

To sole this problem we have to decouple query creation and response processing from request send.
Currently all those three things are hidden in JavaNetHttpPollingClient under queryAndProcess and processHttpResponse methods.

I think we can extract HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); from queryAndProcess and then add two new methods to PollingClient interface:

  1. prepareQuery(RowData lookupData)
  2. processHttpResponse

After that we need to change the AsyncHttpTableLookupFunction::asyncLookup line 77 a little bit.

Instead what we have now:
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);

We would need something like that:

SomeQueryObject query = decorate.prepareQuery(llookupRow);
future.completeAsync(() -> decorate.lookup(query), pullingThreadPool)

And later modify future.whenCompleteAsync so it will call decorate.processHttpResponse on one thread and follow up with current code.

This is a raw sketch, but I think this will solve the problem by ensuring that schema will be used only by one thread.
WDYT?
Do you want to implement this change @grzegorz8 ?

@grzegorz8
Copy link
Member Author

I spent some time refactoring the way you suggested, but I came to the conclusion this approach requires quite huge refactoring which does not bring much benefit in comparison to calling SerializationSchema in "synchronized" way. In both approaches serialization is the bottleneck with parallelism=1. So I propose to implement the simple approach as temporary solution, and in the future try to find the way to be able to run serialization concurrently in safe way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants