New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33212][runtime] add job status changed listener for lineage #24754
base: master
Are you sure you want to change the base?
[FLINK-33212][runtime] add job status changed listener for lineage #24754
Conversation
@@ -80,7 +80,7 @@ public CompletableFuture<JobClient> execute( | |||
clusterDescriptor.retrieve(clusterID); | |||
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient(); | |||
return clusterClient | |||
.submitJob(jobGraph) | |||
.submitJob(jobGraph, pipeline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PatrickRen How should we send lineagegraph into the cluster client without changing API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you are saying that the original submitJob with one parameter still works. As well as the new method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but there is a concern about the size of lineage info if put into job graph.
6b5b888
to
18d5d7a
Compare
18d5d7a
to
d904d63
Compare
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
Outdated
Show resolved
Hide resolved
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
Outdated
Show resolved
Hide resolved
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
Outdated
Show resolved
Hide resolved
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
Outdated
Show resolved
Hide resolved
d904d63
to
555ba41
Compare
@@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() { | |||
} | |||
|
|||
@Override | |||
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { | |||
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph, Pipeline pipeline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this with @HuangZhenQiu offline and we should remove this api change and the lineage logic will be applied on the future instead
bdaf645
to
5008234
Compare
private final ExecutorService executorService = | ||
Executors.newFixedThreadPool( | ||
4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need 4 threads? Do we expect concurrent calls here?
...c/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
Show resolved
Hide resolved
...c/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
Outdated
Show resolved
Hide resolved
5008234
to
ca78b3c
Compare
@gyfora @JingGe |
ca78b3c
to
0f2960d
Compare
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
0f2960d
to
6cfbd92
Compare
6cfbd92
to
c771823
Compare
jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) | ||
.whenCompleteAsync( | ||
(jobClient, throwable) -> { | ||
if (throwable == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do something if there is a throwable . Maybe notify to the job status changed listeners that there was an error and log the error. If there is a good reason to swallow the throwable here, then a comment explaining would be good.
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); | ||
.whenCompleteAsync( | ||
(jobClient, throwable) -> { | ||
if (throwable == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above - what happens if there is a throwable
.submitJob(jobGraph, userCodeClassloader) | ||
.whenComplete( | ||
(ignored, throwable) -> { | ||
if (throwable == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above
@davidradl |
What is the purpose of the change
This PR is to implement the job status changed listener for lineage in FLIP-314. This PR use class loader to load listeners in pipeline executors, including LocalExecutor, EmbeddedExecutor for application mode and AbstractSessionClusterExecutor for session cluster. When job graph submission is successfully, the JobCreatedEvent with lineage info will be published to listeners. During the runtime, job status change info will also be published to listeners.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation