New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35030][runtime] Introduce Epoch Manager under async execution #24748
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left some comments PTAL.
int ongoingRecordCount; | ||
|
||
/** The action associated with non-record of this epoch(e.g. advance watermark). */ | ||
@Nonnull Runnable action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make this Nullable
? And provide one action for the Epoch
on close()
.
* @return the current open epoch. | ||
*/ | ||
public Epoch onRecord() { | ||
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about hold a reference of Epoch activeEpoch
instead of get the last one from queue?
outputQueue.remove(0); | ||
} | ||
} | ||
Epoch epoch = new Epoch(0, action); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This action should not be passed into the new epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
* All inputs are segment into distinct epochs, marked by the arrival of non-record inputs. | ||
* Records are assigned to a unique epoch based on their arrival. | ||
*/ | ||
public static class Epoch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest provide an auto-increment id for epoch, which is useful for debugging.
// is epoch have finished, the epoch will be removed from the output queue. | ||
if (epoch.tryFinish() && outputQueue.size() > 0) { | ||
if (epoch == outputQueue.getFirst()) { | ||
outputQueue.remove(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion: use outputQueue.pop();
here. And there should be a
while (outputQueue.peek().isFinished()) {
outputQueue.pop();
}
Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right👍, I ignored the case that later epochs were completed first.
* internal and away from API module for now, until we could see the concrete need for {@link | ||
* #PARALLEL_BETWEEN_EPOCH} from average users. | ||
*/ | ||
public enum ParallelMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a property of Epoch
? Or should the draining logic be in the close()
of some specific epochs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments below.
public void completeOneRecord(Epoch epoch) { | ||
epoch.ongoingRecordCount--; | ||
// If this epoch has been closed before and all records in | ||
// is epoch have finished, the epoch will be removed from the output queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is -> this
action.run(); | ||
return true; | ||
} | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method is invoked on an already finished epoch, the result should be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once an epoch is finished, it will be removed from the queue, one epoch won't finish twice.
*/ | ||
Closed, | ||
/** The records of this epoch have finished execution after the epoch is closed. */ | ||
Finished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In StreamOperators, finish()
would be invoked prior to close()
, while the design here is reversed. This might confuse other Flink developers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I will enrich the description here to mitigate confusion. I think Fig.6 in FLIP-425 is helpful for developers to understand these three EpochStatus
.
asyncExecutionController.drainInflightRecords(0); | ||
if (lastEpoch.tryFinish() && outputQueue.size() > 0) { | ||
outputQueue.remove(0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code should have been invoked in completeOneRecord()
during drainInFlightRecords
, thus could be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code is for the case:
all records in one epoch are completed before non-records arriving, completeOneRecord()
can't finish this epoch cause this epoch is not closed, so we need to check it again here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that lastEpoch.close();
is invoked right before this block of code, so this epoch must have been closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I understood the logic here. No problem for me now.
|
||
EpochStatus status; | ||
|
||
public Epoch(int recordCount, Runnable action) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that recordCount
is always 0, thus this constructor parameter could be removed.
|
||
public ParallelEpochManager() { | ||
super(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed ParallelEpochManager
and SerialEpochManager
as Zakelly's suggestion, the constructor was also removed.
@Override | ||
public void onNonRecord(Runnable action) { | ||
assert outputQueue.size() == 1; | ||
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: get(0) is enough given that outputQueue.size() == 1.
} | ||
|
||
@Override | ||
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from Watermark and WatermarkStatus, there are other events that could come through the data stream(See all subclasses of StreamElement and RuntimeEvent). This PR might need to handle all these events, and it needs some design how to treat all events in a unified way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InternalWatermark
also uses AbstractStreamOperator#processWatermark
.
For LatencyMarker
EndOfDataMarker
and RecordAttributes
, I plan to do it in FLINK-35031.
@@ -130,11 +152,17 @@ public AsyncExecutionController( | |||
}, | |||
"AEC-buffer-timeout")); | |||
|
|||
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether two epochs can be executed in parallel depends on the semantics of the non-record in between, which means it is not a unified configuration that can be set on the whole operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks for the suggestion, I put ParallelMode
as one parameter of onNonRecord()
, then we can choose different modes for different epochs.
public void onNonRecord(Runnable action) { | ||
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); | ||
lastEpoch.action = action; | ||
lastEpoch.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The action should be invoked when ALL previous epochs have finished, instead of only the last one epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is, I checked outputQueue.size() == 1
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the check. It just comes to me that the non-record might depend on not only all previous records, but also all previous non-record. Say we have an example data flow as follows
- Epoch 0 records: record 0, record 1
- Epoch 0 non-record: watermark 0
- Epoch 1 records: record 2, record 3
- Epoch 1 non-record: watermark 1
In the current implementation, it is guaranteed that record 0~3 must have been finished before watermark 1 is processed. But actually we may also need to guarantee that watermark 0 must have been finished as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Parallel mode
, we don't guarantee that watermark 0 must have been finished before watermark 1.
For Serial mode
, there is only one epoch in the queue at the same time, which provides the above guarantee as you mentioned.
66e44bb
to
9daa258
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! Left some comments as below.
if (activeEpoch.tryFinish()) { | ||
outputQueue.pop(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
outputQueue.pop();
}
here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the while loop
is not necessary here, changing it to while loop
won't have any bad effects either.
Because this method is only called on non-record arrivals, this epoch is the last epoch in the queue, its status changes will not affect previous epochs. And the previous epochs status would be updated on completeOneRecord()
.
this(id, null); | ||
} | ||
|
||
public Epoch(long id, Runnable action) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this constructor can be removed.
public void onNonRecord(Runnable action) { | ||
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1); | ||
lastEpoch.action = action; | ||
lastEpoch.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the check. It just comes to me that the non-record might depend on not only all previous records, but also all previous non-record. Say we have an example data flow as follows
- Epoch 0 records: record 0, record 1
- Epoch 0 non-record: watermark 0
- Epoch 1 records: record 2, record 3
- Epoch 1 non-record: watermark 1
In the current implementation, it is guaranteed that record 0~3 must have been finished before watermark 1 is processed. But actually we may also need to guarantee that watermark 0 must have been finished as well.
asyncExecutionController.drainInflightRecords(0); | ||
if (lastEpoch.tryFinish() && outputQueue.size() > 0) { | ||
outputQueue.remove(0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I understood the logic here. No problem for me now.
* Subsequent epochs can begin execution even if the previous epoch has not yet completed. | ||
* Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}. | ||
*/ | ||
PARALLEL_BETWEEN_EPOCH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming here is slightly different from the nouns used in FLIP-425. How about naming them as "strict order" and "out of order"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "out of order" is a bit too broad, so I changed it to the current name.
As the example you mentioned, "out of order" can't describe the scenario that watermark1 executes after record 0-3.
If there is a better way to describe "out of order", I am happy to change it.
outputQueue.pop(); | ||
} | ||
activeEpoch = new Epoch(epochNum++); | ||
outputQueue.add(activeEpoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the common parts between these two cases can be extracted.
* @param action the action associated with this non-record. | ||
* @param parallelMode the parallel mode for this epoch. | ||
*/ | ||
public void onNonRecord(Runnable action, ParallelMode parallelMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing Runnable to ThrowingRunnable? This can help reduce duplicated try-catch code blocks in invokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there is no exception in the signature of disposeContext()
, if we change Runnable to ThrowingRunnable here, we need to catch the exception in aec.disposeContext()
or epochManager.completeOneRecord()
.
I think that catching exceptions in StreamOperator
is more convenient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Given that we agreed to add AsyncExecutionController.processNonRecord
in another comment and that AsyncExecutionController
has exceptionHandler
, how about keeping EpochManager using Runnable, and change AsyncExecutionController.processNonRecord
to use ThrowingRunnable?
if (timeServiceManager != null) { | ||
timeServiceManager.advanceWatermark(mark); | ||
} | ||
output.emitWatermark(mark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be simpler to use super.processWatermark(mark)
.
@@ -311,6 +318,10 @@ public void drainInflightRecords(int targetNum) { | |||
} | |||
} | |||
|
|||
public EpochManager getEpochManager() { | |||
return epochManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that all usage of this method will be followed by onNonRecord()
. Thus it might be more straightforward to expose a method like onNonRecord
in this class. This would also be good for isolating implementation details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍Thanks for the suggestion, I added processNonRecord()
for aec, EpochManager
is not visible for operators now.
() -> { | ||
output.incrementAndGet(); | ||
}, | ||
ParallelMode.PARALLEL_BETWEEN_EPOCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite that SERIAL_BETWEEN_EPOCH
have been covered in AsyncExecutionControllerTest
, it might still be better to make EpochManagerTest
cover this situation as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructing aec is troublesome and will bring a lot of duplicate code, so I put the test of SERIAL_BETWEEN_EPOCH
in AsyncExecutionControllerTest
. I added a test for mix epoch mode in AsyncExecutionControllerTest
.
String.format("Failed to process watermark %s.", mark), e); | ||
} | ||
}, | ||
ParallelMode.SERIAL_BETWEEN_EPOCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion:
ParallelMode
could be configured withinEpochManager
orAEC
, and could be overridden from outside.AEC
should proxy the requests toEpochManager
, the operator does not know the details.
epoch.ongoingRecordCount--; | ||
// If one epoch has been closed before and all records in | ||
// this epoch have finished, the epoch will be removed from the output queue. | ||
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) { | ||
outputQueue.pop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
epoch.ongoingRecordCount--; | |
// If one epoch has been closed before and all records in | |
// this epoch have finished, the epoch will be removed from the output queue. | |
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) { | |
outputQueue.pop(); | |
} | |
if (--epoch.ongoingRecordCount == 0) { | |
// If one epoch has been closed before and all records in | |
// this epoch have finished, the epoch will be removed from the output queue. | |
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) { | |
outputQueue.pop(); | |
} | |
} |
if (activeEpoch.tryFinish() && outputQueue.size() > 0) { | ||
outputQueue.pop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need while() section as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the while loop
is not necessary here, changing it to while loop
won't have any bad effects either.
Because this method is only called on non-record arrivals, this epoch is the last epoch in the queue, its status changes will not affect previous epochs. And the previous epochs status would be updated on completeOneRecord()
.
if (activeEpoch.tryFinish()) { | ||
outputQueue.pop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need while() section as well
activeEpoch = new Epoch(epochNum++); | ||
outputQueue.add(activeEpoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better extract this part into private method initActive()
? And I'd suggest not adding the activeEpoch
into the queue.
* The reference to the {@link AsyncExecutionController}, used for {@link | ||
* ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing. | ||
*/ | ||
final AsyncExecutionController asyncExecutionController; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: AsyncExecutionController<?>
might be better. Same for other raw usages.
* @param action the action associated with this non-record. | ||
* @param parallelMode the parallel mode for this epoch. | ||
*/ | ||
public void onNonRecord(Runnable action, ParallelMode parallelMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Given that we agreed to add AsyncExecutionController.processNonRecord
in another comment and that AsyncExecutionController
has exceptionHandler
, how about keeping EpochManager using Runnable, and change AsyncExecutionController.processNonRecord
to use ThrowingRunnable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM overall.
super.processWatermark(mark); | ||
return; | ||
} | ||
asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to override processWatermark1
and processWatermark2
as well, or we can override processWatermark(Watermark, int)
, like that for processWatermarkStatus
. I also understand it that this PR is mainly responsible for introducing the epoch mechanism, and that we would have another jira ticket and PR to apply epoch to all events and all cases. So it is also OK for me if you would like to make the change in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractStreamOperatorV2
does not implement processWatermark1
, processWatermark2
and processWatermark(Watermark, int)
, if those methods will be added in the future, I would override them.
*/ | ||
public void onNonRecord(Runnable action, ParallelMode parallelMode) { | ||
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { | ||
asyncExecutionController.drainInflightRecords(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When doing drainInflightRecords
, will the activeEpoch
's ongoingRecordCount
reached 0 and completeOneRecord
called?
I'd suggest not do drain here. Instead we'd better mark blocking in AEC
, while the callback is to unblock the AEC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When doing drainInflightRecords, will the activeEpoch's ongoingRecordCount reached 0 and
completeOneRecord
called?
yes, completeOneRecord
would be called.
I changed the logic to: close activeEpoch
first, then drain in-flight records. The open active epoch would not be updated when doing drainInflightRecords
, and updating closed
epoch#ongoingRecordCount
is allowed.
} | ||
|
||
private void initNewActiveEpoch() { | ||
this.activeEpoch = new Epoch(epochNum++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest a tool method tryFinishInQueue
which does:
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
LOG.trace(
"Finish epoch: {}, outputQueue size: {}",
outputQueue.peek(),
outputQueue.size());
outputQueue.pop();
}
and in initNewActiveEpoch
, it does:
outputQueue.offer(activeEpoch);
activeEpoch = new Epoch(epochNum++);
tryFinishInQueue();
@@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler { | |||
*/ | |||
private final MailboxExecutor mailboxExecutor; | |||
|
|||
/** Exception handler to handle the exception thrown by asynchronous framework. */ | |||
private AsyncFrameworkExceptionHandler exceptionHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. make it final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update ! LGTM
Thanks all for the detailed review! 🚀🚀 |
What is the purpose of the change
This PR introduces Epoch Manager to handle watermark and watermark status processing under async execution.
Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g.watermark, record attributes). Records are assigned to a unique epoch based on their arrival, records within an epoch are allowed to be parallelized, while the non-record of an epoch can only be executed when all records in this epoch have finished.
Brief change log
SerialEpochManager
andParallelEpochManager
SerialEpochManager
to AECVerifying this change
This change added tests and can be verified as follows:
EpochManagerTest
AsyncExecutionControllerTest#testSerialEpochManager
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation