Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState #15920

Merged
merged 5 commits into from May 15, 2024

Conversation

apourchet
Copy link
Contributor

This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within
StreamsPartitionAssignor#assign.

New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState.

One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

This PR implements read-only container classes for ApplicationState and
KafkaStreamsState, and initializes those within
StreamsPartitionAssignor#assign.

New internal methods were also added to the ClientState to easily pass
this data through to the KafkaStreamsState.
@ableegoldman ableegoldman changed the title KAFKA-15045: (KIP-924): Implement ApplicationState and KafkaStreamsState KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState May 11, 2024
@@ -459,6 +468,38 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
}
}

private ApplicationState getApplicationState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't use "get" in Streams getter names. I guess this isn't exactly a pure getter, but still. On that note, perhaps a better name would be buildApplicationState? 🤔

Also: even though it's all internal, I've been on a crusade to get everyone to write javadocs for methods in the StreamsPartitionAssignor with at least a brief explanation of what it does.

It's just a super complicated class that does a lot and often mutates things in a way that isn't obvious, so every little bit of documentation helps

@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

// compute the assignment of tasks to threads within each client and build the final group assignment

getApplicationState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we're just throwing away the return value for now but I'd still do this:

Suggested change
getApplicationState(
final ApplicationState applicationState = getApplicationState(

Otherwise it kind of seems like this method is supposed to be mutating the input parameters (many of the StreamsPartitionAssignor methods work this way so it's good to distinguish when we're just building something vs operating on the passed in structures)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpotBugs was complaining about unused variables when I had that, so I will simply leave the invocation of that new method out entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see. Well there's no concern about forgetting to put this back in since we'll eventually need the result lol. All good 👍

Comment on lines 38 to 43
public ApplicationStateImpl(
final AssignmentConfigs assignmentConfigs,
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
final Set<TaskId> statefulTasks,
final Set<TaskId> statelessTasks
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaStreams formatting for long signatures is (unfortunately) done like this :

Suggested change
public ApplicationStateImpl(
final AssignmentConfigs assignmentConfigs,
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
final Set<TaskId> statefulTasks,
final Set<TaskId> statelessTasks
) {
public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
final Set<TaskId> statefulTasks,
final Set<TaskId> statelessTasks) {

It's annoying, I know. Can you make a pass over all the methods and make sure they follow the AK style?

Comment on lines 62 to 63
final Set<TaskId> union = new HashSet<>(statefulTasks);
union.addAll(statelessTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could in theory be called multiple times, so we probably want to cache the result instead of building up a new map each time. We can still do it lazily, but I'd say just build the map in the constructor so we can make everything final (and unmodifiable)


@Override
public long lagFor(final TaskId task) {
final Long totalLag = taskLagTotals.get(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also throw if the KafkaStreamsState was built without requesting the task lags be computed, right? (IIRC we decided on UnsupportedOperationException in the last PR)

Same for the other lag-related methods here. I guess the safest thing to do is wrap the taskLagTotals field in an Optional and make it empty when the user passed in computeTaskLags=false to the ApplicationState#kafkaStreamsStates API?

public long lagFor(final TaskId task) {
final Long totalLag = taskLagTotals.get(task);
if (totalLag == null) {
throw new IllegalStateException("Tried to lookup lag for unknown task " + task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general rule that we clearly don't always follow in Streams, but should/are actively getting better about, we should make sure to always log an error before throwing a (new) exception. Sometimes the error handling can make it difficult to trace back an exception to the original source of error, and error logs help with that. Doesn't need to be too complicated -- although in this case, perhaps it would make sense to include the keySet of taskLagTotals in the error log (I'm thinking to help differentiate the case where this specific task isn't included vs the entire map being empty)


@Override
public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean computeTaskLags) {
return kafkaStreamsStates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure to respect the computeTaskLags param here. There's two things we can do: wait until this method and then call some KafkaStreamsStateImpl#computeTaskLags method, or just wait to construct the KafkaStreamsImpls at all until we know whether or not we should compute the task lags, and then pass the computeTaskLags flag into the KafkaStreamsImpl constructor.

I personally prefer the latter since that way there's no partially-initialized classes floating around and we don't have to keep track of when the task lags are computed/initialized.

return new TreeSet<>(previousStandbyTasks.taskIds());
}

public SortedMap<String, Set<TaskId>> taskIdsByConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call this taskIdsByPreviousConsumer or something to that effect (ie include the word "previous")

It's so hard to keep track of what pertains to the previous assignment vs the new assignment in this mess of a class lol. That's one of the biggest improvements in KIP-924

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, ping me when the build finishes running and I'll merge

@ableegoldman
Copy link
Contributor

The test env seems pretty unstable right now but we have at least one clean build for each java version if you look at the two latest runs. All test failures are unrelated as well. Seems safe to merge

@ableegoldman ableegoldman merged commit 0c5e8d3 into apache:trunk May 15, 2024
1 check failed
@ableegoldman
Copy link
Contributor

Merged to trunk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants