Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]xxx.parquet is not a Parquet file #11178

Open
MrAladdin opened this issue May 9, 2024 · 12 comments
Open

[SUPPORT]xxx.parquet is not a Parquet file #11178

MrAladdin opened this issue May 9, 2024 · 12 comments
Labels
data-corruption Issues related to corruption of data i.e. corrupt parquet and log files index priority:critical production down; pipelines stalled; Need help asap.

Comments

@MrAladdin
Copy link

MrAladdin commented May 9, 2024

Describe the problem you faced

A clear and concise description of the problem.

  1. When using the record_index type index to upsert an MOR type table, this exception suddenly occurred, leading to the downstream being unable to perform data reading. Other tables constructed in the same manner have not yet experienced this exception.
  2. The reasons and how to fix it, as well as how to prevent it in the future, as the downstream is continuously using it, thank you very much.

Environment Description

  • Hudi version :0.14.1

  • Spark version :3.4

  • Hive version :3.1.2

  • Hadoop version :3.1

  • Storage (HDFS/S3/GCS..) :hdfs

  • Running on Docker? (yes/no) :no

Stacktrace

Caused by: java.lang.RuntimeException: viewfs://nbns/user/quantum_social/lakehouse/social/dwd_social_kbi_beauty_lower_v1/partition_index_date=202302/229164d5-911f-49df-91b5-cb15aecc60de-0_2531-32510-4568658_20240508183714815.parquet is not a Parquet file (length is too low: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:540)
at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:777)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:39)
at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.footerFileMetaData$lzycompute$1(Spark34LegacyHoodieParquetFileFormat.scala:184)
at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.footerFileMetaData$1(Spark34LegacyHoodieParquetFileFormat.scala:183)
at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark34LegacyHoodieParquetFileFormat.scala:187)
at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$2(HoodieBaseRelation.scala:582)
at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:673)
at org.apache.hudi.RecordMergingFileIterator.(Iterators.scala:249)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@ad1happy2go
Copy link
Contributor

@MrAladdin There is a related fix here - https://github.com/apache/hudi/pull/10883/files
Can you try this out?

@MrAladdin
Copy link
Author

MrAladdin commented May 9, 2024

@MrAladdin There is a related fix here - https://github.com/apache/hudi/pull/10883/files Can you try this out?

The 0.14.1 version does not have hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java.

@ad1happy2go
Copy link
Contributor

@MrAladdin Yes, you are correct. This may applies to LSMTimelineWriter. @danny0405 Any idea here?

@danny0405
Copy link
Contributor

I'm wondering how the table got written, is it written by Flink streaming pipeline?

@MrAladdin
Copy link
Author

MrAladdin commented May 9, 2024

I'm wondering how the table got written, is it written by Flink streaming pipeline?

Spark Structured Streaming

When using the record_index type index to upsert an MOR type table, this exception suddenly occurred, leading to the downstream being unable to perform data reading. Other tables constructed in the same manner have not yet experienced this exception.

Asynchronous compaction has been enabled within Spark Structured Streaming.

@codope codope added priority:critical production down; pipelines stalled; Need help asap. data-corruption Issues related to corruption of data i.e. corrupt parquet and log files index labels May 9, 2024
@ad1happy2go
Copy link
Contributor

@MrAladdin Can you please share the timeline and writer configurations.

@MrAladdin
Copy link
Author

MrAladdin commented May 10, 2024

@MrAladdin Can you please share the timeline and writer configurations.

df
.writeStream
.format("hudi")
.option("hoodie.table.base.file.format", "PARQUET")
.option("hoodie.allow.empty.commit", "true")
.option("hoodie.datasource.write.drop.partition.columns", "false")
.option("hoodie.table.services.enabled", "true")
.option("hoodie.datasource.write.streaming.checkpoint.identifier", "lakehouse-dwd-social-kbi-beauty-lower-v1-writer-1")
.option(PRECOMBINE_FIELD.key(), "date_kbiudate")
.option(RECORDKEY_FIELD.key(), "records_key")
.option(PARTITIONPATH_FIELD.key(), "partition_index_date")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.combine.before.upsert", "true")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload")

  .option("hoodie.file.listing.parallelism", "200")

  .option("hoodie.schema.on.read.enable", "true")

  //markers
  .option("hoodie.write.markers.type", "DIRECT")

  //timeline server
  .option("hoodie.embed.timeline.server", "true")
  .option("hoodie.embed.timeline.server.async", "true")
  .option("hoodie.embed.timeline.server.gzip", "true")
  .option("hoodie.embed.timeline.server.reuse.enabled", "true")
  .option("hoodie.filesystem.view.incr.timeline.sync.enable", "true")

  //File System View Storage Configurations
  .option("hoodie.filesystem.view.remote.timeout.secs", "1200")
  .option("hoodie.filesystem.view.remote.retry.enable", "true")
  .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", "500")
  .option("hoodie.filesystem.view.remote.retry.max_numbers", "15")
  .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000")
  //.option("hoodie.filesystem.operation.retry.enable","true")

  //schema cache
  .option("hoodie.schema.cache.enable", "true")

  //spark write
  .option("hoodie.datasource.write.streaming.ignore.failed.batch", "false")
  .option("hoodie.datasource.write.streaming.retry.count", "6")
  .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000")

  //metadata
  .option("hoodie.metadata.enable", "true")
  .option("hoodie.metadata.index.async", "false")
  .option("hoodie.metadata.index.check.timeout.seconds", "900")
  .option("hoodie.auto.adjust.lock.configs", "true")
  .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true")
  .option("hoodie.metadata.compact.max.delta.commits", "20")
  .option("hoodie.metadata.max.reader.memory", "3221225472")
  .option("hoodie.metadata.max.reader.buffer.size", "1073741824")

  //index type
  .option("hoodie.metadata.record.index.enable", "true")
  .option("hoodie.index.type", "RECORD_INDEX")
  .option("hoodie.record.index.use.caching", "true")
  .option("hoodie.record.index.input.storage.level", "MEMORY_AND_DISK_SER")
  .option("hoodie.metadata.max.init.parallelism", "100000")
  .option("hoodie.metadata.record.index.min.filegroup.count", "720")
  .option("hoodie.metadata.record.index.growth.factor", "2.0")
  .option("hoodie.metadata.record.index.max.filegroup.count", "10000")
  .option("hoodie.metadata.record.index.max.filegroup.size", "1073741824")
  .option("hoodie.metadata.auto.initialize", "true")
  .option("hoodie.metadata.max.logfile.size", "2147483648")
  .option("hoodie.metadata.max.deltacommits.when_pending", "1000")

  //
  .option("hoodie.parquet.field_id.write.enabled", "true")
  .option("hoodie.copyonwrite.insert.auto.split", "true")
  .option("hoodie.record.size.estimation.threshold", "1.0")
  .option("hoodie.parquet.block.size", "536870912")
  .option("hoodie.parquet.max.file.size", "536870912")
  .option("hoodie.parquet.small.file.limit", "209715200")
  .option("hoodie.logfile.max.size", "536870912")
  .option("hoodie.logfile.data.block.max.size", "536870912")
  .option("hoodie.logfile.to.parquet.compression.ratio", "0.35")

  //archive
  .option("hoodie.keep.max.commits", "30")
  .option("hoodie.keep.min.commits", "20")
  .option("hoodie.commits.archival.batch", "10")
  .option("hoodie.archive.automatic", "true")
  .option("hoodie.archive.async", "true")
  .option("hoodie.archive.beyond.savepoint", "true")
  .option("hoodie.fail.on.timeline.archiving", "true")
  .option("hoodie.archive.merge.enable", "true")
  .option("hoodie.archive.merge.files.batch.size", "10")
  .option("hoodie.archive.merge.small.file.limit.bytes", "20971520")

  //cleaner
  .option("hoodie.clean.allow.multiple", "true")
  .option("hoodie.cleaner.incremental.mode", "true")
  .option("hoodie.clean.async", "true")
  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
  .option("hoodie.cleaner.delete.bootstrap.base.file", "true")
  .option("hoodie.clean.automatic", "true")
  .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS")
  .option("hoodie.cleaner.hours.retained", "6")
  .option("hoodie.clean.trigger.strategy", "NUM_COMMITS")
  .option("hoodie.clean.max.commits", "10")

  //compact
  .option("hoodie.datasource.compaction.async.enable", "true")
  .option("hoodie.compact.inline", "false")
  .option("hoodie.compact.schedule.inline", "false")
  .option("hoodie.compaction.lazy.block.read", "true")
  .option("hoodie.compaction.reverse.log.read", "false")
  .option("hoodie.compaction.logfile.size.threshold", "314572800")
  .option("hoodie.compaction.target.io", compact_limit)
  .option("hoodie.compaction.strategy", "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy")
  .option("hoodie.compact.inline.trigger.strategy", "NUM_AND_TIME")
  .option("hoodie.compact.inline.max.delta.commits", "10")
  .option("hoodie.compact.inline.max.delta.seconds", "7200")
  .option("hoodie.memory.compaction.fraction", "0.6")


  .option("hoodie.datasource.write.reconcile.schema", "true")
  .option("hoodie.write.set.null.for.missing.columns", "true")
  .option("hoodie.avro.schema.external.transformation", "true")
  .option("hoodie.avro.schema.validate", "true")


  //lock
  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
  .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
  .option("hoodie.write.lock.filesystem.expire", "10")


  .option(config.HoodieWriteConfig.TBL_NAME.key(), table_name)
  .option("path", output_path + "/" + table_name)
  .option("checkpointLocation", checkpoint_path)
  .outputMode(OutputMode.Append())
  .queryName("lakehouse-dwd-social-kbi-beauty-lower-v1")
  .start()

1、In fact, there is only one writing program, and all table services are completed within the structured writing program. Just discovered that in .option(RECORDKEY_FIELD.key(), "records_key"), the records_key is unique under each partition, and only a very small number of data instances will have the same records_key but in different partitions. Since record_index is a global index, is this the reason that causes the exception during upsert?
2、I have a question: When using Spark Structured Streaming to write data, the number of hfile files under .hoodie/metadata/record_index is twice the amount set by .option("hoodie.metadata.record.index.min.filegroup.count", "720"), but when using offline Spark DataFrame for batch data writing, each submission will generate a corresponding number of hfile, leading to an excessively large number of hfiles under record_index. What is the reason for this, and how can we better control the number of hfile files under .hoodie/metadata/record_index and what is the most reasonable setting for the size of each hfile? Also, what are the specific parameter names involved?
3、When using Spark Structured Streaming to write data, if it is found that individual hfile files are too large, by using .option("hoodie.metadata.record.index.min.filegroup.count", "1000") to change the number of hfile files under .hoodie/metadata/record_index later, will it take effect after restarting the program, and how to modify it when it does not take effect?

Thanks

@MrAladdin
Copy link
Author

@ad1happy2go I need your help to answer the question I replied to you above, thank you.

@ad1happy2go
Copy link
Contributor

@MrAladdin

  1. Ideally this should not be the reason for this exception, as it's more like parquet file only got corrupted. Are you facing this issue frequently?
  2. Not very sure about it. Adding @xushiyan in case he knows.
  3. if individual hfile file are too large, you can increase file group count. Seems like in each file group there are too many record keys assigned. One you restart the writer (spark streaming job) it will take effect for new writes. To fix the size of the already existing index files, you may need to create record index again only.

@MrAladdin
Copy link
Author

MrAladdin commented May 13, 2024

@MrAladdin

  1. Ideally this should not be the reason for this exception, as it's more like parquet file only got corrupted. Are you facing this issue frequently?
  2. Not very sure about it. Adding @xushiyan in case he knows.
  3. if individual hfile file are too large, you can increase file group count. Seems like in each file group there are too many record keys assigned. One you restart the writer (spark streaming job) it will take effect for new writes. To fix the size of the already existing index files, you may need to create record index again only.

1.The problem occasionally encountered in version 0.12, the solution is to delete the damaged files with the command hadoop fs -rm -r. Now, after upgrading, this issue appears for the first time in version 0.14.
3.In the ideal state, does each hfile file in the record_index maintain a size of 1GB, and how to rebuild the overly large record_index, is it through a simple command or by rewriting the total data?

@MrAladdin
Copy link
Author

@xushiyan I need your help to answer the question I replied to you above, thank you.

2、I have a question: When using Spark Structured Streaming to write data, the number of hfile files under .hoodie/metadata/record_index is twice the amount set by .option("hoodie.metadata.record.index.min.filegroup.count", "720"), but when using offline Spark DataFrame for batch data writing, each submission will generate a corresponding number of hfile, leading to an excessively large number of hfiles under record_index. What is the reason for this, and how can we better control the number of hfile files under .hoodie/metadata/record_index and what is the most reasonable setting for the size of each hfile? Also, what are the specific parameter names involved?

@ad1happy2go
Copy link
Contributor

ad1happy2go commented May 15, 2024

  1. You can rebuild the index using HoodieIndexer once after stopping your pipeline and delete the existing index. That should create properly sized index files ideally. you can test that first though.

1 Not sure about the root cause or any scenario what can cause this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-corruption Issues related to corruption of data i.e. corrupt parquet and log files index priority:critical production down; pipelines stalled; Need help asap.
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants