Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable merge on ICEBERG via Athena #633

Closed
rudolfix opened this issue Sep 13, 2023 · 4 comments · Fixed by #1315 · May be fixed by #1294
Closed

enable merge on ICEBERG via Athena #633

rudolfix opened this issue Sep 13, 2023 · 4 comments · Fixed by #1315 · May be fixed by #1294
Assignees

Comments

@rudolfix
Copy link
Collaborator

rudolfix commented Sep 13, 2023

Background
See rationale below for why iceberg/open table. We can support it easily via existing destinations: Athena and Snowflake.

Why not spark:

  • too much work for this ticket.

Requirements

    • add a new table hint table_format that will tell destination which table format to use
    • for Athena: add global destination setting to set all tables to ICEBERG
    • enable merge support for iceberg tables on athena

Implementation Notes

  • Bulk loading into iceberg on athena can only be done from one table to another. So we need to retain our code for defining regular athena tables and additionally add iceberg tables to copy from the first tables.
  • Iceberg has nice "merge into ... on" sql directives that we can hopefully use for the merge write disposition
  • Do we need to run maintenance commands that clean up the underlying parquet files on iceberg or should we leave this to the user?

Rationale
Just few points on why iceberg is a game changer on data lakes:

  • it allows evolving the partitions
    e.g. often data engineers design partitioning based on few use cases that they change over time, and Iceberg allows to just change those partition definitions, thanks to hidden partition (without rewriting the all dataset, that's the case for pure parquet)
  • it has build in procedures to optimize the dataset
    are you aware of the small file problem in data-lakes? if so iceberg has a build in function to compact "objects" and improve performance and cost. Just a note on that: I wrote quite some compaction procedures on my life, and I believe that Iceberg make that way of compacting data standardised (same as DeltaLake)
  • using disposition like Merge > e.g. you can upsert by id
  • being able to delete data from a dataset - that's what I mentioned already, and that's why I choose Iceberg to ingest sensitive data from Kakfa for example.
    this decision came after leading a GDPR project that allow to remove sensitive data on a data lake - again game changer that make data engineers life much easier.

last but not least: Iceberg is becoming one of the facto open table format in the data landscape:

  • Redshift now supports it (read only in preview - I net to test it)
  • Snowflake supports it in preview (read/write)
  • BiqQuery should support it (read only? - not sure)
  • Trino supports it (read/write)
  • The Athena supports it too - as it's an AWS fork of Trino (on engine v3)
  • Spark supports it - the most complete engine that support iceberg
  • Flink supports it - (read/write)
  • Dremio Support it - read for sure, not sure about writes
  • Clickhouse support it - only read
  • DuckDB - support it - only read
  • Java client - read and write
  • python client - read only for now
  • rust client - coming soon
  • go client - coming soon
@rudolfix rudolfix changed the title support ICEBERG/DELTA LAKE tables via Athena/Snowflake support ICEBERG via Athena Sep 17, 2023
@sh-rp sh-rp self-assigned this Sep 27, 2023
@rudolfix rudolfix changed the title support ICEBERG via Athena enable merge on ICEBERG via Athena Oct 18, 2023
@n0012
Copy link

n0012 commented Nov 15, 2023

I'm interested to have merge support for the Athena destination. For now I'll have to post process a merge within dbt or Airflow Athena operator. So in essence will use dlt to build Athena staging tables (with replace write disposition). I'd be happy to collaborate to help build this out.

@rudolfix
Copy link
Collaborator Author

@n0012 we had preliminary merge support working. @sh-rp was testing it. you can join our slack and ping me or Dave - maybe we can find out how you could help us...
what happens right now:

  1. we create append job instead of merge job in athena.py
  2. SQL engine on athena does not support temporary tables and a few other constructs we use for merge jobs so our default would need to be customized

(2) is the tricky part. we use just INSERT + DELETE to do merges but each destination has small differences in the syntax that needs to be handled

@toddy86
Copy link

toddy86 commented Dec 5, 2023

@rudolfix, what sort of timeline would you be looking at to add this merge support? I don't need a hard deadline or anything, just a vague idea if it's in the vicinity of weeks, 2-3 months or 6+ months etc

@nicor88
Copy link

nicor88 commented Apr 23, 2024

Adding here my use cases.

  1. I ingest data from Google Big query (Google analytics dumps) in my final destination S3(parquet)/glue catalog(that I partitioned by event_date) I perform a delete/insert by event_date with a lookback window of 2 days. Doing so yesterday day is delete and insert again, in this case containing the entire events for yesterday. Said so I would like to perform a delete+insert by event_date. I pretty much have these setup in 80% of my case.
    In order to do so I use aws-sdk-pandas with overwrite_partitions
import awswrangler as wr

source_df = ...# data from the last 7 days
wr.s3.to_parquet(
        df=source_df,
        path=f"s3://{DST_BUCKET}/{GLUE_TABLE_NAME}",
        dataset=True,
        schema_evolution=True,
        mode='overwrite_partitions',
        partition_cols=['my_date'],
        database='bronze',  # Athena/Glue database
        table=GLUE_TABLE_NAME  # Athena/Glue table
    )

worth to mention, I don't need to use iceberg here, because awswrangler overwrite the data for a partition performing a delete on s3 directly and just an append again. Using iceberg tables will be possible to do delete/insert

  1. I merge my source_data by a specific key (e.g. id) in that sense I perform an upsert by id, keeping the latest data for an id based on my extracted batch of data.

Once merge disposition is implemented for athena iceberg table I could consider to use dlt instead - also a deal breaker for me is that I need also to have partition definition on my table to reduce data scans, but this is another issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
6 participants