Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsert merge strategy #1294

Draft
wants to merge 1 commit into
base: devel
Choose a base branch
from

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented Apr 27, 2024

Description

This started as building merge support for Athena Iceberg, but the SQL logic runs on some other engines out-of-the-box too (e.g. postgres, mssql, snowflake, databricks, bigquery). It primarily uses the MERGE INTO statement to implement upsert (update + insert) logic. Since both delete-insert and scd2 don't apply, I introduced a new merge strategy called upsert.

Characteristics:

  • needs a primary_key
  • uses hash of primary_key as deterministic row identifier (_dlt_id) in root table
  • implements "true" upsert for root table (only updates and inserts), but still needs deletes for child tables (to delete records for elements which have been removed from an updated list)

Related Issues

Additional Context

@jorritsandbrink jorritsandbrink self-assigned this Apr 27, 2024
@jorritsandbrink jorritsandbrink linked an issue Apr 27, 2024 that may be closed by this pull request
Copy link

netlify bot commented Apr 27, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit a31fe62
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/662d6fff585855000816ab80
😎 Deploy Preview https://deploy-preview-1294--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix @sh-rp Before continuing development on this branch, does the direction I've taken here make sense?

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK for a new merge strategy! My overall comments

  • why do we need to modify the normalizer? upsert should be compatible with insert-delete
  • when generating SQL, the only difference I see is that we can replace INSERT / DELETE into root table with MERGE statement and we do not need a temp table. the child tables however still need temp table to know which rows to delete. maybe our original merge sql should be restructured to separate the child tables processing so it can be reused?
  • we'll need to support merge keys and hard deletes as well.
  • we still need temp tables. was the problem with ATHENA a lack of temp tables?

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix

First addressing a main consideration here:
I don't think upsert can (and should) be compatible with delete-insert. upsert needs a primary_key to ensure a one-to-one relationship between records in the source and target, while delete-insert can go without primary_key.

Then addressing your points in order:

  1. The normalizer is changed because the upsert logic relies on a deterministic _dlt_id (hash of primary_key). I think these hashes could also be calculated on the fly in the SQL engine, but that would be inefficient.
  2. Biggest benefit will be for the root table indeed. Note though that the delete-insert logic I have here for upsert behaves different than the delete-insert logic for delete-insert. Difference is that the former is more specific—it only deletes/inserts records for removed/added elements in the list while the latter deletes/inserts all records for all elements, even if they didn't change.
    • merge_key might not really apply here. The composite of primary_key and merge_key would still need to be unique to ensure the one-to-one relationship, in which case it would make more sense to just include all columns in primary_key rather than splitting them across primary_key and merge_key.
    • hard_delete should be possible.
  3. Two problems with Athena for delete-insert: doesn't support temp tables and doesn't support transactions.
    • We could probably still do delete-insert and use CTEs instead of temp tables, but that would be inefficient (but not more inefficient than the Databricks implementation that uses temporary views). Using an actual table is another approach, but would not have the same performance as a temp table.

Two options:

  1. Just make delete-insert work for Athena Iceberg and accept the drawbacks (no transactions, no temp tables).
  2. Continue upsert and make it complete/solid. This goes beyond just Athena Iceberg and can be beneficial for more destinations.

@sh-rp
Copy link
Collaborator

sh-rp commented Apr 29, 2024

@jorritsandbrink Athena has some kind of concept of temp tables when you use the WITH statement. I'm not sure wether that is helpful in your case, but that is what I found when looking at your pr this morning and doing some research.

@sh-rp
Copy link
Collaborator

sh-rp commented Apr 29, 2024

Also why do we need a primary key? The merge into has this ON clause which should support AND and OR conditions for merging on composite keys. Or am I mistaken here?

Maybe we can make the athena merge work first like the other destinations without a new merge strategy? I am not sure why I did not do that, there was some road block, but it may have been that I did not fully understand our merges at the time.

@jorritsandbrink
Copy link
Collaborator Author

@sh-rp We already discussed this on Slack, but I'll add it here too for transparency and completeness.

Also why do we need a primary key? The merge into has this ON clause which should support AND and OR conditions for merging on composite keys. Or am I mistaken here?

We need primary_key to ensure a unique key for the ON clause in MERGE INTO. SQL engines throw a syntax error when trying to update a target table with duplicate keys. For example, mssql says:

"The MERGE statement attempted to UPDATE or DELETE the same row more than once. This happens when a target row matches more than one source row. A MERGE statement cannot UPDATE/DELETE the same row of the target table multiple times. Refine the ON clause to ensure a target row matches at most one source row, or use the GROUP BY clause to group the source rows."

postgres throws a similar error.

Consider this case to understand why these errors make sense:

staging table:

id att
1 bar
1 baz

destination table:

id att
1 foo

What should be the result of the destination table after upserting the staging table into it? It's not properly defined.

Athena has some kind of concept of temp tables when you use the WITH statement. I'm not sure wether that is helpful in your case, but that is what I found when looking at your pr this morning and doing some research.

Yes, these are the CTEs I mention in my previous comment under (4). We can definitely use that to make it work, but it would imply recalculation for each table, rather than calculating once and reusing results as is the case with a temp table. As such, it won't be efficient.

@rudolfix
Copy link
Collaborator

rudolfix commented May 1, 2024

@jorritsandbrink thx for the explanation. I reviewed the code and now I understand your concept better.
What I think we should do (@sh-rp do you agree):

  1. implement athena using insert-delete. I'd just create a table with a random name and then drop it (best effort). we can also create a view (but I doubt those are cached in any way). on databricks we use temporary views and MERGE statements to delete data. take a look
  2. stash the existing upsert code, we have a ticket for it convert sql merge jobs into MERGE sql statement #1129 where we can write a good spec

now regarding the upsert itself:

  • OK I get why merge key is not possible
  • you still use temp table in your code to optimize INSERT/MERGE into child tables. so it will not work for Athena
  • I get why we have primary key based hash! we had it in very early version of dlt! I think we could let people use such hash not only for upsert ie, by adding another hint that would provoke such hash
  • for the parent table MERGE statement I'd use primary key as it is. why: because arrow tables are not processed in the relational normalizer and will not work with upsert (we had the same problem with scd2 and it is temporarily solved which is not convenient).

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix Okay, I will leave this branch and PR open as input for #1129 and create a new PR that simply implements delete-insert for Athena Iceberg.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Planned
Development

Successfully merging this pull request may close these issues.

enable merge on ICEBERG via Athena
3 participants