Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7718] Try to fetch the latestSourceProfile in HoodieIncrSource #11175

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

vinishjail97
Copy link
Contributor

Change Logs

Follow-up PR for #11159
Try to fetch the latestSourceProfile always, this ensures the profile is refreshed if it's no longer valid. The implementation of source profile takes care when a profile needs to be refreshed or re-computed again.

Impact

A new constructor added for HoodieIncrSource for emitting metrics related to source parallelism and source bytes ingested.

Risk level (write none, low medium or high below)

Medium

Documentation Update

None.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label May 8, 2024
@hudi-bot
Copy link

hudi-bot commented May 8, 2024

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Comment on lines +247 to +249
if (getLatestSourceProfile().isPresent()) {
src = coalesceOrRepartition(src, getLatestSourceProfile().get().getSourcePartitions());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could getLatestSourceProfile().map().orElse() be used instead of reassigning the variable?

@@ -344,7 +385,7 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe
snapshotCheckPointImplClassOpt.map(className ->
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, className));
TypedProperties typedProperties = new TypedProperties(properties);
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), metrics, new DefaultStreamContext(new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA), sourceProfile));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you validate the source parallelism is changed after passing the source profile?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants