Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33212][runtime] add job status changed listener for lineage #24754

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HuangZhenQiu
Copy link
Contributor

@HuangZhenQiu HuangZhenQiu commented May 1, 2024

What is the purpose of the change

This PR is to implement the job status changed listener for lineage in FLIP-314. This PR use class loader to load listeners in pipeline executors, including LocalExecutor, EmbeddedExecutor for application mode and AbstractSessionClusterExecutor for session cluster. When job graph submission is successfully, the JobCreatedEvent with lineage info will be published to listeners. During the runtime, job status change info will also be published to listeners.

Brief change log

  • Add interfaces for job status change listener
  • Add Events and configs for job status listener.
  • Add logic pipeline executors for notifying the job created event for local, application mode, and session cluster.

Verifying this change

This change added tests and can be verified as follows:

  • The end to end test is covered by JobStatusListenerITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented)

@@ -80,7 +80,7 @@ public CompletableFuture<JobClient> execute(
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
return clusterClient
.submitJob(jobGraph)
.submitJob(jobGraph, pipeline)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PatrickRen How should we send lineagegraph into the cluster client without changing API?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you are saying that the original submitJob with one parameter still works. As well as the new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but there is a concern about the size of lineage info if put into job graph.

@flinkbot
Copy link
Collaborator

flinkbot commented May 1, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@HuangZhenQiu HuangZhenQiu force-pushed the FLINK-33212-lineage-listener branch from d904d63 to 555ba41 Compare May 7, 2024 02:25
@@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() {
}

@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph, Pipeline pipeline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this with @HuangZhenQiu offline and we should remove this api change and the lineage logic will be applied on the future instead

@HuangZhenQiu HuangZhenQiu force-pushed the FLINK-33212-lineage-listener branch 2 times, most recently from bdaf645 to 5008234 Compare May 8, 2024 21:57
Comment on lines 68 to 70
private final ExecutorService executorService =
Executors.newFixedThreadPool(
4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need 4 threads? Do we expect concurrent calls here?

@HuangZhenQiu
Copy link
Contributor Author

@gyfora @JingGe
Thanks for your review. I have partially resolved your comments. The lineage graph info is set into StreamGraph in this correlated PR https://github.com/apache/flink/pull/24618/files. I would also like to get more suggestion about how to make PRs more self-contained.

@HuangZhenQiu
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@JingGe
Copy link
Contributor

JingGe commented May 21, 2024

@flinkbot run azure

jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader))
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {
Copy link

@davidradl davidradl May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something if there is a throwable . Maybe notify to the job status changed listeners that there was an error and log the error. If there is a good reason to swallow the throwable here, then a comment explaining would be good.

.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
.whenCompleteAsync(
(jobClient, throwable) -> {
if (throwable == null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above - what happens if there is a throwable

.submitJob(jobGraph, userCodeClassloader)
.whenComplete(
(ignored, throwable) -> {
if (throwable == null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

@HuangZhenQiu
Copy link
Contributor Author

@davidradl
The throwable in executors are caught already in Execution environment. If there is a better idea to provide extra info for customers, I am glad to adopt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants