Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: CDC does not respect when the table is rolled back. #10247

Open
javrasya opened this issue Apr 29, 2024 · 2 comments 路 May be fixed by #10252
Open

Spark: CDC does not respect when the table is rolled back. #10247

javrasya opened this issue Apr 29, 2024 · 2 comments 路 May be fixed by #10252
Labels
bug Something isn't working

Comments

@javrasya
Copy link
Contributor

javrasya commented Apr 29, 2024

Apache Iceberg version

1.4.3

Query engine

Spark

Please describe the bug 馃悶

We had to rollback our table because it had some broken snapshots. We are turning that table which gets upserts into a changelog stream in the downstream and process it that way. We use time boundaries. The way how it seems to work is that it looks at the history of the table and do some sort of a time travel query to find the recent snapshot id as of the end timestamp we pass down the the CDC procedure.

But since it only uses the history entries which does not give enough info about if the snapshots are in the link of the main branch reference.

Here is the problematic line which calls the function in the iceberg-core

endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp);

https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L350-L358

I think it should disregard the snapshots when they are no longer in the main branch link

@javrasya javrasya added the bug Something isn't working label Apr 29, 2024
@javrasya javrasya changed the title Spark CDC does not respect when the table is rolled back. Spark: CDC does not respect when the table is rolled back. Apr 29, 2024
@manuzhang
Copy link
Contributor

manuzhang commented Apr 29, 2024

@javrasya Do you mean when the endSnapshotId is no longer an ancestor of the current snapshot, it should be skipped? For example, in the following test, the second change log should only contain row(1, "a", "INSERT", 0, snap1.snapshotId())?

  @TestTemplate
  public void testQueryWithRollback() {
    createTable();

    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
    Table table = validationCatalog.loadTable(tableIdent);
    Snapshot snap1 = table.currentSnapshot();
    long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());

    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
    table.refresh();
    Snapshot snap2 = table.currentSnapshot();
    long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());

    sql("CALL %s.system.rollback_to_snapshot('%s', %d)", catalogName, tableIdent, snap1.snapshotId());
    table.refresh();
    Snapshot snap4 = table.currentSnapshot();
    assertThat(snap4).isEqualTo(snap1);

    sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
    table.refresh();
    Snapshot snap3 = table.currentSnapshot();
    long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

    assertEquals(
        "Should have expected changed rows up to snapshot 3",
        ImmutableList.of(
            row(1, "a", "INSERT", 0, snap1.snapshotId()),
            row(1, "a", "DELETE", 1, snap3.snapshotId()),
            row(-2, "a", "INSERT", 1, snap3.snapshotId())),
        changelogRecords(null, rightAfterSnap3));

    assertEquals(
        "Should have expected changed rows up to snapshot 2",
        ImmutableList.of(
            row(1, "a", "INSERT", 0, snap1.snapshotId()),
            row(2, "b", "INSERT", 1, snap2.snapshotId())),
        changelogRecords(null, rightAfterSnap2));

@javrasya
Copy link
Contributor Author

Exactly @manuzhang . It feels like it should filter that out and this is a bug. Wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants