New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35197][table] Support the execution of supsend&resume materialized table in continuous refresh mode #24765
Conversation
c76f94c
to
082ac74
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hackergin Thanks for your contribution, I left some comments.
...g/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java
Outdated
Show resolved
Hide resolved
...g/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java
Outdated
Show resolved
Hide resolved
...g/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java
Outdated
Show resolved
Hide resolved
...link-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
Outdated
Show resolved
Hide resolved
|
||
// get checkpoint config from the materialized table | ||
CheckpointConfigInfo checkpointConfigInfo = | ||
getCheckpointConfigInfo(clusterClient, continuousRefreshHandler.getJobId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the JSON string directly? It looks a bit strange to convert the CheckpointConfigInfo to a json string, and then read the interval from json tree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I have not found a good way to directly obtain the response of type String.
A better way might be to add a get method to CheckpointConfigInfo class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
...y/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
Outdated
Show resolved
Hide resolved
...y/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
Outdated
Show resolved
Hide resolved
...y/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
Show resolved
Hide resolved
...y/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
Show resolved
Hide resolved
0b204cd
to
e436f70
Compare
… of materialized tables
…/resume nodes to operations
6cf510b
to
f3f6161
Compare
…lized table in continuous refresh mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
Support the execution of suspend & resume materialized table in continuous refresh mode
Brief change log
Verifying this change
-In order to test the repair of incomplete serialization and deserialization of materialized tables, some relevant logic in org.apache.flink.table.catalog.CatalogBaseTableResolutionTest#testPropertyDeSerialization has been added.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation