You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When you load two delta tables in PySpark, each partitioned by columns with different names, partition filters applied after joining the two dataframes will only affect the dataset where the column name has not been changed if the datatype is decimal in one of the DataFrames. In contrast, when these tables are read directly in the parquet format, the partition filter is applied to both tables.
Steps to reproduce
from pyspark.sql import SparkSession
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, DecimalType
# Create a Spark session
spark = SparkSession.builder \
.appName("Write DF to S3 with Partitioning") \
.getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 1), ("Cathy", 2), ("David", 2), ("Eve", 3)]
columns = ["Name", "Category"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Specify S3 bucket and path (make sure you have the correct access rights and the bucket exists)
output_path = "s3a://test/"
# Write DataFrame to S3 with partitioning by the 'Category' column
df.write.mode("overwrite").format("delta").partitionBy("Category").save(output_path)
# Create new Data with Decimal Type
data = [("Alice", Decimal('1')), ("Bob", Decimal('1')), ("Cathy", Decimal('2')), ("David", Decimal('2')), ("Eve", Decimal('3'))]
columns = ["Name", "Category_renamed"]
# Define the schema
schema = StructType([
StructField("Name", StringType(), True),
StructField("Category_renamed", DecimalType(10, 0), True) # Decimal column with precision 10 and scale 0
])
# Create DataFrame
df_2 = spark.createDataFrame(data, schema=schema)
# Specify S3 bucket and path (make sure you have the correct access rights and the bucket exists)
output_path_2 = "s3a://test_rename/"
# Write DataFrame to S3 with partitioning by the 'Category' column
df_2.write.mode("overwrite").format("delta").partitionBy("Category_renamed").save(output_path_2)
# read the data and Explain the plan
read_1 = spark.read.format("delta").load(output_path)
read_2 = spark.read.format("delta").load(output_path_2).withColumnRenamed("Category_renamed", "Category")
df_join = read_1.join(read_2, on="Category").filter("Category=3")
df_join.explain()
# comparison to parquet
read_1 = spark.read.format("parquet").load(output_path)
read_2 = spark.read.format("parquet").load(output_path_2).withColumnRenamed("Category_renamed", "Category")
df_join = read_1.join(read_2, on="Category").filter("Category=3")
df_join.explain()
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
Yes. I can contribute a fix for this bug independently.
Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
No. I cannot contribute a bug fix at this time.
The text was updated successfully, but these errors were encountered:
Thank you for your reply. I also changed to my local file system, but I still have problems with either of the mentioned version combinations. Is the screenshot you are displaying from the first or the second explain()? The join in the parquet format will explicitly lead to the pushdown of the filter. Using the delta format, this is not the case. In the following screenshot, you can see the change in behavior of reading files with delta vs. parquet.
Bug
Which Delta project/connector is this regarding?
Describe the problem
When you load two delta tables in PySpark, each partitioned by columns with different names, partition filters applied after joining the two dataframes will only affect the dataset where the column name has not been changed if the datatype is decimal in one of the DataFrames. In contrast, when these tables are read directly in the parquet format, the partition filter is applied to both tables.
Steps to reproduce
Observed results
Expected results
The PartitionFilters are applied on both tables.
Further details
Environment information
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
The text was updated successfully, but these errors were encountered: