Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): let shared SourceExecutor start from latest instead of specified offset #16626

Merged
merged 4 commits into from May 14, 2024

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented May 8, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#16576 (comment)

mentioned by Eric, this is also his original idea in his RFC

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
@xxchan xxchan requested a review from BugenZhao May 8, 2024 02:46
xxchan added 2 commits May 8, 2024 10:54
Signed-off-by: xxchan <xxchan22f@gmail.com>
Signed-off-by: xxchan <xxchan22f@gmail.com>
@xxchan xxchan requested a review from tabVersion May 8, 2024 04:14
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Have you verified when the Offset::End is resolved? In other words, if you create a Reader and then immediately pause it (without polling once), will the reader eventually start from the latest offset resolved during creation or when the stream is resumed and first polled?

@@ -24,6 +24,12 @@ pub struct KafkaSplit {
pub(crate) partition: i32,
pub(crate) start_offset: Option<i64>,
pub(crate) stop_offset: Option<i64>,
#[serde(skip)]
/// Used by shared source to hackily seek to the latest offset without fetching start offset first.
/// XXX: But why do we fetch low watermark for latest start offset..?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that theoretically we can also follow this approach for the Kafka sources specified scan.startup.mode='latest' by users?

Copy link
Member Author

@xxchan xxchan May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think so. @tabVersion @shanicky so why do we fetch low watermark instead of passing Offset::End?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that pulsar uses PulsarEnumeratorOffset in PulsarSplit, so I think it's perfectly ok.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, how about writing a new post about the shared source arch.

/// XXX: But why do we fetch low watermark for latest start offset..?
///
/// When this is `true`, `start_offset` will be ignored.
pub(crate) hack_seek_to_latest: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest changing another name like force_start_latest. This name sounds like doing some magic 🪄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think this is hacky and magical.. If possibile, I want to only use start_offset to do this. In other words, also use Offset::End for scan.startup.mode='latest'

@xxchan
Copy link
Member Author

xxchan commented May 10, 2024

Have you verified when the Offset::End is resolved?

It's when consumer.assign is called. So pausing doesn't change the starting position.

@xxchan xxchan added this pull request to the merge queue May 14, 2024
Merged via the queue into main with commit 7419439 May 14, 2024
27 of 28 checks passed
@xxchan xxchan deleted the xxchan/start-from-latest branch May 14, 2024 06:47
@xxchan xxchan mentioned this pull request May 19, 2024
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants