Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]FileID <filegroup> of partition path xxx=xx does not exist. #11202

Open
CaesarWangX opened this issue May 13, 2024 · 14 comments
Open

[SUPPORT]FileID <filegroup> of partition path xxx=xx does not exist. #11202

CaesarWangX opened this issue May 13, 2024 · 14 comments
Labels
flink Issues related to flink flink-sql stability

Comments

@CaesarWangX
Copy link

CaesarWangX commented May 13, 2024

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Spark3.4.1 + hudi0.14.0
After upgrading from Hudi 0.11 to Hudi 0.14, we occasionally encounter this error. Once it occurs, the task continues to fail even after restarting, and the error message remains the same with the FileID.

I have reviewed the following two issues, but unfortunately, there is no effective solution available.
#5886
#8890

Fortunately, by examining the source code, I have successfully replicated this issue and attempted to modify the source code to prevent task failures.

Any information or suggestions are helpful, thank you.

To Reproduce

Steps to reproduce the behavior:

  1. Filegroup has only one data file.
  2. Delete the deltacommit corresponding to this filegroup (only delete deltacommit, keep request and inflight).
  3. New data will continue to be written to this filegroup (determined by getSmallFiles).

BWT, Under normal circumstances, the task fails due to this reason when these three conditions are coincidentally met at the same time. How challenging it is indeed! Therefore, reproducing this error is quite difficult. Essentially, it occurs when the job fails after completing data writing to the filegroup but before committing the deltacommit.😅

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

*** Hudi version : 0.14.0, MOR, INSERT

  • Spark version : 3.4.1**

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) :

Additional context

Not finished yet, will provide more information later.
In Hudi 0.11, performing the same reproducible operation does not result in such an error, but in 0.14, it does. After examining the code, the key difference between Hudi 0.11 and 0.14 is as follows:
AbstractTableFileSystemView

Hudi0.11:
image
image
Screenshot2024_05_14_094258
Screenshot2024_05_14_094311

Hudi0.14:
Screenshot2024_05_14_094540
Screenshot2024_05_14_094611
Screenshot2024_05_14_094649

It seems like this:
After reproducing the issue using the steps mentioned above,
In version 0.11, the method getLatestFileSlicesBeforeOrOn retrieves a Stream called fileSliceStream, using fetchLatestFileSlicesBeforeOrOn.
In fetchLatestFileSlicesBeforeOrOn, after calling fetchAllStoredFileGroups, it then filters again (line 1017), which excludes the error-prone fileID(getAllFileSlices in getLatestFileSliceBeforeOrOn). This prevents meeting the third condition necessary for reproducing the issue.

In version 0.14, the getLatestFileSlicesBeforeOrOn method uses fetchAllStoredFileGroups.
In fetchAllStoredFileGroups, it directly retrieves all filegroups, and then filters them using getAllFileSlicesBeforeOn in a map operation. However, this filtering does not exclude filegroups that have lost deltacommit. Therefore, they are still considered writable small files, satisfying the third step of the reproduction steps, leading to the occurrence of the issue.

After comparison, we attempted to introduce a new method. Based on getAllFileSlicesBeforeOn, we defined a new method getAllFileSlicesBeforeOrOn. Before the normal filtering, this method first calls getAllFileSlices, which filters out filegroups with lost deltacommit.
Then, in getLatestFileSlicesBeforeOrOn, we modified line 846 from .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime)) to .map(fg -> fg.getAllFileSlicesBeforeOrOn(maxCommitTime)).
After testing, this prevents filegroups with lost deltacommit from being treated as small files.

Screenshot2024_05_14_100240

Screenshot2024_05_14_102329

Tested and it can work...Any information or suggestions are helpful, thank you.

Stacktrace

Caused by: java.util.NoSuchElementException: FileID 1b88b26f-94b1-4bd1-94ad-e919e17183ee-0 of partition path datatype=xxxx/year=2024/month=05/day=07 does not exist.
at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161)
at org.apache.hudi.io.HoodieMergeHandle.(HoodieMergeHandle.java:126)
at org.apache.hudi.io.HoodieConcatHandle.(HoodieConcatHandle.java:79)
at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:63)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
... 30 more

@danny0405
Copy link
Contributor

danny0405 commented May 14, 2024

Did you try the Hudi 0.14.1 release? Did you enable the metadata table?

@danny0405 danny0405 added flink Issues related to flink flink-sql stability labels May 14, 2024
@CaesarWangX
Copy link
Author

CaesarWangX commented May 14, 2024

Hi @danny0405 , we don't need the metadata table, so as i mentioned, we set metadata.enable=false. We are using hudi in AWS EMR, so we don't have chance to use hudi0.14.1. we also checked the code in 0.14.1, It looks the same as 0.14

@CaesarWangX
Copy link
Author

Hi @danny0405 @xushiyan , We are using spark3.4.1 and hudi0.14.0. Updated the context and please help look into this. Thank you

@CaesarWangX
Copy link
Author

The reason we do not use metadata table is that in spark structured streaming, enabling the metadata table will affect the efficiency of micro batch, as there will be additional list operations.

@danny0405
Copy link
Contributor

Caused by: java.util.NoSuchElementException: FileID xxxxx of partition path dt=2019-02-20 does not exist.
at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:159)
at org.apache.hudi.io.HoodieMergeHandle.(HoodieMergeHandle.java:121)
at org.apache.hudi.io.FlinkMergeHandle.(FlinkMergeHandle.java:70)
at org.apache.hudi.io.FlinkConcatHandle.(FlinkConcatHandle.java:53)
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:557)
at org.apache.hudi.client.HoodieFlinkWriteClient.insert(HoodieFlinkWriteClient.java:175)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$0(StreamWriteFunction.java:181)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)

The error msg indicates that you enabled the inline clustering for Flink, can you disable that and try again by using the async clustering instead.

@CaesarWangX
Copy link
Author

Hi @danny0405, I didn't understand your point. This is a job for writing data using Spark Structured Streaming + Hudi, and it's an MOR INSERT operation. It seems unrelated to Flink and async clustering, and async clustering is false by default.

@ad1happy2go
Copy link
Contributor

@CaesarWangX That's interesting. All confusion is as in your stack trace it is pointing to "FlinkMergeHandle" code which ideally spark streaming should not use.

@CaesarWangX
Copy link
Author

@ad1happy2go @danny0405 I'm sorry, I have re uploaded a new trace log 😅

@ad1happy2go
Copy link
Contributor

@CaesarWangX Can you also post writer configuration please

@CaesarWangX
Copy link
Author

@ad1happy2go Sure, here it is.
"hoodie.datasource.write.table.type" = "MERGE_ON_READ"
"hoodie.table.name" = "smart_event"
"hoodie.datasource.write.recordkey.field" = "rowkey"
"hoodie.datasource.write.operation" = "insert"
"hoodie.datasource.write.hive_style_partitioning" = "true"
"hoodie.datasource.hive_sync.partition_fields" = "dt"
"hoodie.datasource.hive_sync.partition_extractor_class" = "org.apache.hudi.hive.MultiPartKeysValueExtractor"
"hoodie.datasource.write.precombine.field" = "log_time"
"hoodie.upsert.shuffle.parallelism" = "200"
"hoodie.insert.shuffle.parallelism" = "200"
"hoodie.parquet.small.file.limit" = "441600"
"hoodie.parquet.max.file.size" = "829120"
"hoodie.index.type" = "BLOOM"
"hoodie.bloom.index.prune.by.ranges" = "true"
"hoodie.bloom.index.use.caching" = "true"
"hoodie.bloom.index.use.treebased.filter" = "true"
"hoodie.bloom.index.bucketized.checking" = "true"
"hoodie.bloom.index.filter.type" = "DYNAMIC_V0"
"hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS"
"hoodie.clean.automatic" = "true"
"hoodie.clean.async" = "true"
"hoodie.keep.min.commits" = "5"
"hoodie.keep.max.commits" = "7"
"hoodie.cleaner.parallelism" = "200"
"hoodie.embed.timeline.server" = "false"
"hoodie.datasource.compaction.async.enable" = "true"
"hoodie.cleaner.commits.retained" = "4"
"hoodie.copyonwrite.insert.auto.split" = "true"
"hoodie.merge.allow.duplicate.on.inserts" = "true"
"hoodie.metadata.enable" = "false"
"hoodie.cleaner.policy.failed.writes" = "NEVER"

@CaesarWangX
Copy link
Author

@ad1happy2go you can follow the following step to reproduce it

Steps to reproduce the behavior:

Filegroup has only one data file.
Delete the deltacommit corresponding to this filegroup (only delete deltacommit, keep request and inflight).
New data will continue to be written to this filegroup (determined by getSmallFiles).

@ad1happy2go
Copy link
Contributor

@CaesarWangX Is this the last delta commit in your timeline? Why we are deleting the delta commit file manually? You are saying if you do that, it's not running rollback?

@CaesarWangX
Copy link
Author

CaesarWangX commented May 16, 2024

Hi @ad1happy2go, yes, because we set "hoodie.cleaner.policy.failed.writes" = "NEVER", (just like the configs I provided above ). so it's not running rollback.
And the reason we manually deleted the delta commit file is to reproduce the issue of field not found issue
if you follow the steps to reproduce issue above, then you may found this issue😅

@CaesarWangX
Copy link
Author

Hi @ad1happy2go, Is there any progress? Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink flink-sql stability
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants