Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [SPARK] Unintended Rewrite of Other Partitions During Partition-Level Delta Table Update #3054

Open
2 of 8 tasks
yatharth-zeotap opened this issue May 6, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@yatharth-zeotap
Copy link

yatharth-zeotap commented May 6, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

We have a Delta table partitioned by the country column. When running a Delta table update job to update and insert new data for a specific partition value, e.g., country = 'c1', we observed that the data in other partitions is also being rewritten, effectively rewriting the entire table, including partitions that should remain untouched.

Earlier too we were using the same DeltaMergeBuilder construct(delta version: 2.1.0) except the whenNotMatchedBySource clause and that used to work as per expectations.
(originally reported on: https://delta-users.slack.com/archives/CJ70UCSHM/p1714990170284469)

Steps to reproduce

This is the DeltaMergeBuilder I am using. Just to put into context my updates dataframe only contains data for the single country partition I am processing

import io.delta.tables.DeltaTable
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession

val deltaTableData = Seq((1, "ITA", "Male"), (2, "GBR", "Female"), (3, "ITA", "Male"), (4, "GBR", "Male"), (5, "ITA", "Female"))
spark.createDataFrame(deltaTableData).toDF("id", "country", "gender")
	.coalesce(1).write.format("delta").partitionBy("country").save("./src/test/resources/test-delta-table")

val updatesData = Seq((1, "ITA", "Male"), (5, "ITA", "Female"), (7, "ITA", "Male"))
val updatesDF = spark.createDataFrame(updatesData).toDF("id", "country", "gender")
val deltaTable = DeltaTable.forPath("./src/test/resources/test-delta-table")
deltaTable.as("Snapshot")
	.merge(updatesDF.as("Updates"), "Snapshot.id = Updates.id and Snapshot.country = 'ITA' ")
    .whenMatched()
    .updateExpr(Map("gender" -> "Updates.gender") )
    .whenNotMatched()
    .insertAll()
    .whenNotMatchedBySource("Snapshot.country = 'ITA' ")
    .delete()
    .execute()

spark.read.format("json").load("./src/test/resources/test-delta-table/_delta_log/*1.json").show(false)

Observed results

The delta log json when using whenNotMatchedBySourceDelete

  • The data for all the partitions is getting duplicated, all the partitions are touched

image (1)

The delta log json when using whenNotMatchedBySourceDelete is not used (commented out)

  • The data for all the partitions is not getting duplicated, only the single partition as desired is touched

image (2)

Expected results

Only the partition being updated should be rewritten instead of the whole table

Environment information

Delta Lake version: io.delta:delta-core_2.12:2.3.0
Spark version: 3.3.1
Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@yatharth-zeotap yatharth-zeotap added the bug Something isn't working label May 6, 2024
@yatharth-zeotap yatharth-zeotap changed the title [BUG] Unintended Rewrite of Other Partitions During Partition-Level Delta Table Update [BUG] [SPARK] Unintended Rewrite of Other Partitions During Partition-Level Delta Table Update May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant