Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35030][runtime] Introduce Epoch Manager under async execution #24748

Merged
merged 1 commit into from May 17, 2024

Conversation

fredia
Copy link
Contributor

@fredia fredia commented Apr 30, 2024

What is the purpose of the change

This PR introduces Epoch Manager to handle watermark and watermark status processing under async execution.
Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g.watermark, record attributes). Records are assigned to a unique epoch based on their arrival, records within an epoch are allowed to be parallelized, while the non-record of an epoch can only be executed when all records in this epoch have finished.

Brief change log

  • Add SerialEpochManager and ParallelEpochManager
  • Wire SerialEpochManager to AEC
  • Leverage EpochManager to process watermark/watermark status

Verifying this change

This change added tests and can be verified as follows:

  • EpochManagerTest
  • AsyncExecutionControllerTest#testSerialEpochManager

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 30, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@fredia fredia changed the title [FLINK-35030][runtime] Introduce Epoch Manager for under async execution [FLINK-35030][runtime] Introduce Epoch Manager under async execution Apr 30, 2024
@fredia fredia marked this pull request as ready for review May 7, 2024 08:27
@Zakelly Zakelly self-requested a review May 7, 2024 09:56
Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I left some comments PTAL.

int ongoingRecordCount;

/** The action associated with non-record of this epoch(e.g. advance watermark). */
@Nonnull Runnable action;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about make this Nullable? And provide one action for the Epoch on close().

* @return the current open epoch.
*/
public Epoch onRecord() {
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about hold a reference of Epoch activeEpoch instead of get the last one from queue?

outputQueue.remove(0);
}
}
Epoch epoch = new Epoch(0, action);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This action should not be passed into the new epoch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

* All inputs are segment into distinct epochs, marked by the arrival of non-record inputs.
* Records are assigned to a unique epoch based on their arrival.
*/
public static class Epoch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest provide an auto-increment id for epoch, which is useful for debugging.

// is epoch have finished, the epoch will be removed from the output queue.
if (epoch.tryFinish() && outputQueue.size() > 0) {
if (epoch == outputQueue.getFirst()) {
outputQueue.remove(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion: use outputQueue.pop(); here. And there should be a

while (outputQueue.peek().isFinished()) {
    outputQueue.pop();
}

Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right👍, I ignored the case that later epochs were completed first.

* internal and away from API module for now, until we could see the concrete need for {@link
* #PARALLEL_BETWEEN_EPOCH} from average users.
*/
public enum ParallelMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a property of Epoch? Or should the draining logic be in the close() of some specific epochs?

Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments below.

public void completeOneRecord(Epoch epoch) {
epoch.ongoingRecordCount--;
// If this epoch has been closed before and all records in
// is epoch have finished, the epoch will be removed from the output queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is -> this

action.run();
return true;
}
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is invoked on an already finished epoch, the result should be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once an epoch is finished, it will be removed from the queue, one epoch won't finish twice.

*/
Closed,
/** The records of this epoch have finished execution after the epoch is closed. */
Finished
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In StreamOperators, finish() would be invoked prior to close(), while the design here is reversed. This might confuse other Flink developers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I will enrich the description here to mitigate confusion. I think Fig.6 in FLIP-425 is helpful for developers to understand these three EpochStatus.

asyncExecutionController.drainInflightRecords(0);
if (lastEpoch.tryFinish() && outputQueue.size() > 0) {
outputQueue.remove(0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code should have been invoked in completeOneRecord() during drainInFlightRecords, thus could be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code is for the case:
all records in one epoch are completed before non-records arriving, completeOneRecord() can't finish this epoch cause this epoch is not closed, so we need to check it again here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that lastEpoch.close(); is invoked right before this block of code, so this epoch must have been closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I understood the logic here. No problem for me now.


EpochStatus status;

public Epoch(int recordCount, Runnable action) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that recordCount is always 0, thus this constructor parameter could be removed.


public ParallelEpochManager() {
super();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor seems unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed ParallelEpochManager and SerialEpochManager as Zakelly's suggestion, the constructor was also removed.

@Override
public void onNonRecord(Runnable action) {
assert outputQueue.size() == 1;
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: get(0) is enough given that outputQueue.size() == 1.

}

@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from Watermark and WatermarkStatus, there are other events that could come through the data stream(See all subclasses of StreamElement and RuntimeEvent). This PR might need to handle all these events, and it needs some design how to treat all events in a unified way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InternalWatermark also uses AbstractStreamOperator#processWatermark.
For LatencyMarker EndOfDataMarker and RecordAttributes, I plan to do it in FLINK-35031.

@@ -130,11 +152,17 @@ public AsyncExecutionController(
},
"AEC-buffer-timeout"));

if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether two epochs can be executed in parallel depends on the semantics of the non-record in between, which means it is not a unified configuration that can be set on the whole operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for the suggestion, I put ParallelMode as one parameter of onNonRecord(), then we can choose different modes for different epochs.

public void onNonRecord(Runnable action) {
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
lastEpoch.action = action;
lastEpoch.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The action should be invoked when ALL previous epochs have finished, instead of only the last one epoch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is, I checked outputQueue.size() == 1 here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the check. It just comes to me that the non-record might depend on not only all previous records, but also all previous non-record. Say we have an example data flow as follows

  • Epoch 0 records: record 0, record 1
  • Epoch 0 non-record: watermark 0
  • Epoch 1 records: record 2, record 3
  • Epoch 1 non-record: watermark 1

In the current implementation, it is guaranteed that record 0~3 must have been finished before watermark 1 is processed. But actually we may also need to guarantee that watermark 0 must have been finished as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Parallel mode, we don't guarantee that watermark 0 must have been finished before watermark 1.

For Serial mode, there is only one epoch in the queue at the same time, which provides the above guarantee as you mentioned.

@fredia fredia force-pushed the FLINK-35030 branch 2 times, most recently from 66e44bb to 9daa258 Compare May 9, 2024 10:32
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! Left some comments as below.

if (activeEpoch.tryFinish()) {
outputQueue.pop();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use

        while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
            outputQueue.pop();
        }

here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the while loop is not necessary here, changing it to while loop won't have any bad effects either.

Because this method is only called on non-record arrivals, this epoch is the last epoch in the queue, its status changes will not affect previous epochs. And the previous epochs status would be updated on completeOneRecord().

this(id, null);
}

public Epoch(long id, Runnable action) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this constructor can be removed.

public void onNonRecord(Runnable action) {
Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
lastEpoch.action = action;
lastEpoch.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the check. It just comes to me that the non-record might depend on not only all previous records, but also all previous non-record. Say we have an example data flow as follows

  • Epoch 0 records: record 0, record 1
  • Epoch 0 non-record: watermark 0
  • Epoch 1 records: record 2, record 3
  • Epoch 1 non-record: watermark 1

In the current implementation, it is guaranteed that record 0~3 must have been finished before watermark 1 is processed. But actually we may also need to guarantee that watermark 0 must have been finished as well.

asyncExecutionController.drainInflightRecords(0);
if (lastEpoch.tryFinish() && outputQueue.size() > 0) {
outputQueue.remove(0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I understood the logic here. No problem for me now.

* Subsequent epochs can begin execution even if the previous epoch has not yet completed.
* Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
*/
PARALLEL_BETWEEN_EPOCH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming here is slightly different from the nouns used in FLIP-425. How about naming them as "strict order" and "out of order"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "out of order" is a bit too broad, so I changed it to the current name.
As the example you mentioned, "out of order" can't describe the scenario that watermark1 executes after record 0-3.

If there is a better way to describe "out of order", I am happy to change it.

outputQueue.pop();
}
activeEpoch = new Epoch(epochNum++);
outputQueue.add(activeEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the common parts between these two cases can be extracted.

* @param action the action associated with this non-record.
* @param parallelMode the parallel mode for this epoch.
*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing Runnable to ThrowingRunnable? This can help reduce duplicated try-catch code blocks in invokers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is no exception in the signature of disposeContext() , if we change Runnable to ThrowingRunnable here, we need to catch the exception in aec.disposeContext() or epochManager.completeOneRecord().

I think that catching exceptions in StreamOperator is more convenient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Given that we agreed to add AsyncExecutionController.processNonRecord in another comment and that AsyncExecutionController has exceptionHandler, how about keeping EpochManager using Runnable, and change AsyncExecutionController.processNonRecord to use ThrowingRunnable?

if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be simpler to use super.processWatermark(mark).

@@ -311,6 +318,10 @@ public void drainInflightRecords(int targetNum) {
}
}

public EpochManager getEpochManager() {
return epochManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that all usage of this method will be followed by onNonRecord(). Thus it might be more straightforward to expose a method like onNonRecord in this class. This would also be good for isolating implementation details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍Thanks for the suggestion, I added processNonRecord() for aec, EpochManager is not visible for operators now.

() -> {
output.incrementAndGet();
},
ParallelMode.PARALLEL_BETWEEN_EPOCH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite that SERIAL_BETWEEN_EPOCH have been covered in AsyncExecutionControllerTest, it might still be better to make EpochManagerTest cover this situation as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructing aec is troublesome and will bring a lot of duplicate code, so I put the test of SERIAL_BETWEEN_EPOCH in AsyncExecutionControllerTest . I added a test for mix epoch mode in AsyncExecutionControllerTest .

String.format("Failed to process watermark %s.", mark), e);
}
},
ParallelMode.SERIAL_BETWEEN_EPOCH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion:

  1. ParallelMode could be configured within EpochManager or AEC, and could be overridden from outside.
  2. AEC should proxy the requests to EpochManager, the operator does not know the details.

Comment on lines 122 to 128
epoch.ongoingRecordCount--;
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
outputQueue.pop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
epoch.ongoingRecordCount--;
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
outputQueue.pop();
}
if (--epoch.ongoingRecordCount == 0) {
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
outputQueue.pop();
}
}

Comment on lines 108 to 110
if (activeEpoch.tryFinish() && outputQueue.size() > 0) {
outputQueue.pop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need while() section as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the while loop is not necessary here, changing it to while loop won't have any bad effects either.

Because this method is only called on non-record arrivals, this epoch is the last epoch in the queue, its status changes will not affect previous epochs. And the previous epochs status would be updated on completeOneRecord().

Comment on lines 98 to 100
if (activeEpoch.tryFinish()) {
outputQueue.pop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need while() section as well

Comment on lines 102 to 103
activeEpoch = new Epoch(epochNum++);
outputQueue.add(activeEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better extract this part into private method initActive()? And I'd suggest not adding the activeEpoch into the queue.

* The reference to the {@link AsyncExecutionController}, used for {@link
* ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
*/
final AsyncExecutionController asyncExecutionController;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: AsyncExecutionController<?> might be better. Same for other raw usages.

* @param action the action associated with this non-record.
* @param parallelMode the parallel mode for this epoch.
*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Given that we agreed to add AsyncExecutionController.processNonRecord in another comment and that AsyncExecutionController has exceptionHandler, how about keeping EpochManager using Runnable, and change AsyncExecutionController.processNonRecord to use ThrowingRunnable?

Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. LGTM overall.

super.processWatermark(mark);
return;
}
asyncExecutionController.processNonRecord(() -> super.processWatermark(mark));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to override processWatermark1 and processWatermark2 as well, or we can override processWatermark(Watermark, int), like that for processWatermarkStatus. I also understand it that this PR is mainly responsible for introducing the epoch mechanism, and that we would have another jira ticket and PR to apply epoch to all events and all cases. So it is also OK for me if you would like to make the change in the next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractStreamOperatorV2 does not implement processWatermark1 , processWatermark2 and processWatermark(Watermark, int), if those methods will be added in the future, I would override them.

*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
asyncExecutionController.drainInflightRecords(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing drainInflightRecords, will the activeEpoch's ongoingRecordCount reached 0 and completeOneRecord called?

I'd suggest not do drain here. Instead we'd better mark blocking in AEC, while the callback is to unblock the AEC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing drainInflightRecords, will the activeEpoch's ongoingRecordCount reached 0 and completeOneRecord called?

yes, completeOneRecord would be called.
I changed the logic to: close activeEpoch first, then drain in-flight records. The open active epoch would not be updated when doing drainInflightRecords, and updating closed epoch#ongoingRecordCount is allowed.

}

private void initNewActiveEpoch() {
this.activeEpoch = new Epoch(epochNum++);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest a tool method tryFinishInQueue which does:

while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
                LOG.trace(
                        "Finish epoch: {}, outputQueue size: {}",
                        outputQueue.peek(),
                        outputQueue.size());
                outputQueue.pop();
            }

and in initNewActiveEpoch, it does:

outputQueue.offer(activeEpoch);
activeEpoch = new Epoch(epochNum++);
tryFinishInQueue();

@@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
*/
private final MailboxExecutor mailboxExecutor;

/** Exception handler to handle the exception thrown by asynchronous framework. */
private AsyncFrameworkExceptionHandler exceptionHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. make it final?

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update ! LGTM

@fredia
Copy link
Contributor Author

fredia commented May 16, 2024

Thanks all for the detailed review! 🚀🚀

@fredia fredia merged commit f1ecb9e into apache:master May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants