Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chained futures keep executing although future was cancelled #2552

Open
BenediktKersjes opened this issue Jan 10, 2020 · 12 comments · May be fixed by #2608
Open

Chained futures keep executing although future was cancelled #2552

BenediktKersjes opened this issue Jan 10, 2020 · 12 comments · May be fixed by #2608

Comments

@BenediktKersjes
Copy link

Further investigation of #2551 shows that CompletableFutures behave differently in a Spring application.

The following code using CompletableFutures stops before the thenRun method is executed:

CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
    for (int i = 0; i < 10; i++) {
        try {
            System.out.println(i);
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
    }
}).thenRun(() -> {
    for (int i = 0; i < 10; i++) {
        try {
            System.out.println(i);
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
    }
});
Thread.sleep(500);

f.cancel(true);
try{
    f.get();
}
catch(CancellationException | ExecutionException e){
    System.out.println("future cancelled");
}
System.out.println("end");

Output:

1
2
future cancelled
end
3
4
5
6
7
8
9

However, when using the vavr future implementation, the second part is executed as well:

Future<Void> f = Future.run(() -> {
    int i = 1;
    while(i < 10){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        System.out.println(i++);
    }
}).andThen(v -> {
    int i = 1;
    while(i < 10){
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        System.out.println(i++);
    }
});
Thread.sleep(500);
f.cancel();
try{
    f.get();
}
catch(CancellationException e){
    System.out.println("future cancelled");
}

System.out.println("end");

Output:

1
2
future cancelled
end
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
@pivovarit
Copy link
Member

pivovarit commented Jan 14, 2020

I did run your example and ended up with a correct result:

1
2
future cancelled
end

We might be dealing with a subtle concurrency issue. Will investigate in upcoming days.
Which version are you using?

@BenediktKersjes
Copy link
Author

We are using version 0.10.2.

Did you run the example in a Spring application. Using a plain Java application, I also get the correct result.

@danieldietrich
Copy link
Member

Thanks for reporting!
It seems odd that a Spring application behaves differently. Does it change the default Java ExecutorService at runtime? I will check that...

@charvakcpatel007
Copy link

charvakcpatel007 commented May 2, 2020

Spring doesn't seem to be an issue here.
You can reproduce the issue by adding Thread.sleep so that main thread doesn't terminate and get the same result. As shown below.

		Future<Void> f = Future.run(() -> {
			int i = 1;
			while(i < 10){
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
				}
				System.out.println(i++);
			}
		}).andThen(v -> {
			int i = 1;
			while(i < 10){
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
				}
				System.out.println(i++);
			}
		});
		Thread.sleep(500);
		f.cancel();
		try{
			f.get();
		}
		catch(CancellationException e){
			System.out.println("future cancelled");
		}

		System.out.println("end");
		Thread.sleep(10000);
}

@charvakcpatel007
Copy link

I did further research. Please refer to updated code fragment.

Future<Void> f1 = Future.run(() -> {
            int i = 1;
            while (i < 10) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    System.out.println("f1 : InterruptedException");
                    throw new RuntimeException(e);
                }
                System.out.println("f1 : " + i++);
            }
        });
        Future<Void> f2 = f1.andThen(
                v -> v.onSuccess(aVoid -> {
					System.out.println("Value recieved from f1 :  , continuing with task" + v.toString());
                    int i = 1;
                    while (i < 10) {
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                        }
                        System.out.println("f2 : " + i++);
                    }
                }).onFailure(throwable -> {
                    System.out.println("Failure recieved from f1 : " + throwable.toString());
                }));
        Thread.sleep(4000);
        boolean ans = f2.cancel();
        try {
            f2.get();
        } catch (CancellationException e) {
            System.out.println("future cancelled");
        }
        System.out.println("end");
        Thread.sleep(1000000);

Here we have two future instance, f1 is the original and f2 is latter.
If we call cancel on f2, it is same example as the OPs. The output for this.

f1 : 1
f1 : 2
future cancelled
end
f1 : 3
f1 : 4
f1 : 5
f1 : 6
f1 : 7
f1 : 8
f1 : 9
Value recieved from f1 :  , continuing with taskSuccess(null)
f2 : 1
f2 : 2
f2 : 3
f2 : 4
f2 : 5
f2 : 6
f2 : 7
f2 : 8
f2 : 9

But if we change f2.cancel() to f1.cancel() it works as intended. output in that case is

f1 : 1
f1 : 2
f1 : InterruptedException
Failure received from f1 : java.util.concurrent.CancellationException
future canceled
end

Thus, future received after calling andThen ( f2 ) if we call cancel() on that, it doesn't affect f1. Now since f1 runs smoothly, it passes success value to f2 and it also executes the task in a proper manner.

@charvakcpatel007
Copy link

default Future<T> andThen(Consumer<? super Try<T>> action) {
Objects.requireNonNull(action, "action is null");
return run(executor(), complete ->
onComplete(t -> {
Try.run(() -> action.accept(t));
complete.with(t);
})
);
}

To make reference to my previous comment. f1 is Future of the original async task.
The above code returns f2 which is Future of an async task that adds the action to be performed after f1 is finished.
andThen call should actually return f1.
That will fix the issue.

If we do want to return f2 then, we change code like this

default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        return Future.<T>run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        ).onFailure(throwable -> {
            if (throwable instanceof CancellationException)
                cancel();
        });
    }

This code adds callback to f2 which makes sure if f2 is cancelled then that is passed to cancel of f1.

@danieldietrich
Copy link
Member

@charvakcpatel007 That looks great! Are you volunteering for a PR? 😊

@charvakcpatel007
Copy link

charvakcpatel007 commented May 17, 2020

@danieldietrich I wanted to ask which approach makes more sense.

  1. Returning Original Future or
  2. Adding a callback and keeping the return of child future as it is.

@danieldietrich
Copy link
Member

@charvakcpatel007 I think there might be a better implementation. Could you please try the following:

default Future<T> andThen(Consumer<? super Try<T>> action) {
    Objects.requireNonNull(action, "action is null");
    return run(executor(), complete ->
            onComplete(t -> {
                if (!isCancelled()) {
                    Try.run(() -> action.accept(t));
                }
                complete.with(t);
            })
    );
}

Thx!

@charvakcpatel007
Copy link

Hi,
Right now, the action provided in andThen argument will get executed albeit if the future is canceled value inside Try will be java.util.concurrent.CancellationException. The behavior seems to be correct. because if the original future is canceled, the actions in the andThen should receive Try with failure exception.
Your suggestion will change that.

Though the bug is different.

If you look at the code snippet above. There are two futures
f1 -> represents the return value of the original task
f2 -> represents the return value of the task that adds action in the f1. It doesn't represent the action that was submitted as the arg of andThen. (the difference is, one is Future of adding the action, and other is the action itself )

There can be multiple ways to solve this

  1. Make sure that when f2 is canceled, f1 is also canceled.
default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        return Future.<T>run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        ).onFailure(throwable -> {
            if (throwable instanceof CancellationException)
                cancel();
        });
    }
  1. Return the original future rather then the future that represents adding the action. So no need to propagate anything, since now the return is also f1.
default Future<T> andThen(Consumer<? super Try<T>> action) {
        Objects.requireNonNull(action, "action is null");
        run(executor(), complete ->
                onComplete(t -> {
                    Try.run(() -> action.accept(t));
                    complete.with(t);
                })
        );
        return this;
    }
  1. Make the adding of action as a sync operation. So no f2 in this case.
default Future<T> andThen(Consumer<? super Try<T>> action) {
        onComplete(t -> {
            Try.run(() -> action.accept(t));
        });
        return this;
    }

Please let me know your suggestions @danieldietrich

@danieldietrich
Copy link
Member

Thanks for your analysis, @charvakcpatel007!

We align to Scala, so I double-checked Scala's Future.andThen implementation:

Screenshot 2020-07-06 at 23 56 13

Our current andThen implementation is close to that of Scala's Future.

andThen executes the side-effecting action, 'ignores' a possible exception of that action and returns a new Futureinstance with the original result.

The problem is that our Future implementation completes with a Failure(CancellationException) if the Future is cancelled.

Cancellation means that further processing should be stopped. I think the correct solution is to fix FutureImpl the way that on cancellation no callback is called. Namely onComplete, onSuccess and onFailure must not be called. That way subsequent Futures aren't executed.

@charvakcpatel007
Copy link

charvakcpatel007 commented Jul 27, 2020

@danieldietrich Added a PR. Seems like this much should suffice. If it seems good then I can add tests covering this scenario.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants