Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListOffsetsRequest should only be sent to the leader replica #4616

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

kphelps
Copy link

@kphelps kphelps commented Feb 12, 2024

When using fetch-from-follower, it is currently possible for a consumer to get stuck in a loop sending ListOffsetRequest when we go through the rd_kafka_offset_reset path since the request is sent to the preferred replica. Instead, always send it to the leader.

Copy link

cla-assistant bot commented Feb 12, 2024

CLA assistant check
All committers have signed the CLA.

@emasab
Copy link
Collaborator

emasab commented Feb 20, 2024

It's correct to send the ListOffsets request to the preferred replica. The loop probably comes from this discovered bug:
#4620

When enabling debug logs, could you check if it's receiving FENCED_LEADER_EPOCH errors?

@kphelps
Copy link
Author

kphelps commented Feb 21, 2024

Nope, I'm seeing NOT_LEADER_OR_FOLLOWER errors.

@kphelps
Copy link
Author

kphelps commented Feb 21, 2024

Aha, from KIP-392:

The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

Looks like we unconditionally set the replica id to -1 here

Looks like the Java client opts to just always send to the leader. WDYT?

@emasab
Copy link
Collaborator

emasab commented Mar 5, 2024

@kphelps
The replica id should be set to -1 in clients, and to the broker id in followers, see the RPC definition
https://github.com/apache/kafka/blob/2f401ff4c85f6797391b8a3dd57d651f4de3d6ad/clients/src/main/resources/common/message/ListOffsetsRequest.json#L42

The error NOT_LEADER_OR_FOLLOWER happens when the broker isn't a replica for that partition.
In that case librdkafka refreshes metadata to get the leader again, here.

rd_kafka_metadata_refresh_known_topics(rk, NULL,

Is it possible to reproduce the issue and send a log with "debug": "all" or "debug": "consumer,cgrp,topic,fetch,metadata,broker,topic" ?

@kphelps
Copy link
Author

kphelps commented Mar 5, 2024

I'm working to reproduce this now, but have been having trouble in a controlled environment. Will share that when I get it.

The broker only allows fetching from the leader unless the replica id is set to -2 here which propagates down to retrieving the local log and erroring here.

@kphelps kphelps force-pushed the kphelps/list-offsets-leader branch from dfb9e3e to 341c62d Compare March 21, 2024 18:49
@kphelps
Copy link
Author

kphelps commented Mar 21, 2024

Found a test that was silently failing due to this issue

@emasab
Copy link
Collaborator

emasab commented Mar 26, 2024

Thanks @kphelps I was checking this issue more in depth and understood the problem, it's different from what I linked and as you said could be solved in two ways, by sending the request to the follower with -2 or to the leader as Java is doing.

The con of sending it to leader is that is case the follower is lagging behind it could have other offset resets when fetching, until it has caught up, I've checked broker code and tried using -2 by changing mock cluster implementation and it works too.

Will ask for an opinion internally too before deciding for one of the two solutions.

@emasab emasab added the bug label Mar 26, 2024
@emasab
Copy link
Collaborator

emasab commented Apr 10, 2024

Cannot fix it by sending the request to the follower because there are some problems:
if replica id was different from CONSUMER_REPLICA_ID (-1), the isolation level parameter would be ignored, so I'm following @kphelps proposal and using the same behaviour as Java, to send the request to the leader only.

broker code:

            val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
            val isClientRequest = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID
            val isolationLevelOpt = if (isClientRequest)
              Some(offsetRequest.isolationLevel)
            else
              None

removed test import
@emasab emasab force-pushed the kphelps/list-offsets-leader branch from 0c86070 to 90d269e Compare April 10, 2024 12:06
@emasab
Copy link
Collaborator

emasab commented Apr 10, 2024

/sem-approve

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants