Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow inner class being parameter of withTimestampAssigner in Table API stream table source #281

Open
crazyzhou opened this issue Oct 17, 2019 · 3 comments

Comments

@crazyzhou
Copy link
Contributor

Problem description
Due to the serialization validation for connectorProperties, only public static-inner/outer class implements AssignerWithTimeWindow is supported as a parameter of withTimestampAssigner in Table API stream table source. We should support inner class for easier use.

Problem location
https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/Pravega.java#L292

Suggestions for an improvement

@EronWright
Copy link
Contributor

I'm a little concerned that we're missing an ingredient with how assigners would be configured in Flink SQL. One of the main use cases of Flink SQL is to be able to use Flink without writing Java code. It is therefore important that a table source provide a pure properties-based experience. See the pure-SQL experience depicted here:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

Basically, we need a way to interoperate with the rowtime schema elements as seen here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#rowtime-attributes

One approach could be, recommend that the user extract the timestamp by using from-field for timestamp extraction, while using from-source for watermark generation. This may require some enhancement to the underlying source function, so emit watermarks without emitting timestamps.

rowtime:
  timestamps:
    type: from-field
  watermarks:
    type: from-source

@EronWright
Copy link
Contributor

Also, notice in the Kafka table source that there's a property called sink-partitioner (with an enum for possible values) and a property called sink-partitioner-class. Consider using this naming convention (-class) wherever we take a class name.

@crazyzhou
Copy link
Contributor Author

crazyzhou commented Oct 18, 2019

Thanks for the comment. @EronWright Here is what I'd like to explain.

rowtime:
  timestamps:
    type: from-field
  watermarks:
    type: from-source

This one is exactly what I have considered during the design. The problem is that when users want to use source with Pravega watermark, we force them to give an implementation of timestamp for each event. This implementation of our AssignerWithTimeWindows is required to enable Pravega watermark in Flink source. We also pass it into the connectorProperties which causes this issue. Therefore, in order not to conflict with users' assignment in the AssignerWithTimeWindows, we currently only support below style to enable Pravega watermark in Table API.

rowtime:
  timestamps:
    type: from-source
  watermarks:
    type: from-source

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants