Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] incremental-between-scan-mode should respect changelog-producer #3290

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
```

By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying `'incremental-between-scan-mode'`.

In Batch SQL, the `DELETE` records are not allowed to be returned, so records of `-D` will be dropped.
If you want see `DELETE` records, you can use audit_log table:

Expand Down
3 changes: 3 additions & 0 deletions docs/content/spark/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ For example:
- '5,10' means changes between snapshot 5 and snapshot 10.
- 'TAG1,TAG3' means changes between TAG1 and TAG3.

By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying `'incremental-between-scan-mode'`.

Requires Spark 3.2+.

Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function.
Expand Down
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@
</tr>
<tr>
<td><h5>incremental-between-scan-mode</h5></td>
<td style="word-wrap: break-word;">delta</td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, 'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.<br /><br />Possible values:<ul><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. <br /><br />Possible values:<ul><li>"auto": Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
</tr>
<tr>
<td><h5>incremental-between-timestamp</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,10 +883,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<IncrementalBetweenScanMode> INCREMENTAL_BETWEEN_SCAN_MODE =
key("incremental-between-scan-mode")
.enumType(IncrementalBetweenScanMode.class)
.defaultValue(IncrementalBetweenScanMode.DELTA)
.defaultValue(IncrementalBetweenScanMode.AUTO)
.withDescription(
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, "
+ "'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.");
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. ");

public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
key("incremental-between-timestamp")
Expand Down Expand Up @@ -2068,6 +2067,9 @@ public String getValue() {

/** Specifies this scan type for incremental scan . */
public enum IncrementalBetweenScanMode implements DescribedEnum {
AUTO(
"auto",
"Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files."),
DELTA("delta", "Scan newly changed files between snapshots."),
CHANGELOG("changelog", "Scan changelog files between snapshots.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
options.incrementalBetweenScanMode();
ScanMode scanMode;
switch (scanType) {
case AUTO:
scanMode =
options.changelogProducer() == ChangelogProducer.NONE
? ScanMode.DELTA
: ScanMode.CHANGELOG;
break;
case DELTA:
scanMode = ScanMode.DELTA;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link IncrementalStartingScanner}. */
Expand All @@ -55,27 +60,22 @@ public void testScan() throws Exception {
write.compact(binaryRow(1), 0, false);
commit.commit(1, write.prepareCommit(true, 1));

assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
write.close();
commit.close();

IncrementalStartingScanner deltaScanner =
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.DELTA);
StartingScanner.ScannedResult deltaResult =
(StartingScanner.ScannedResult) deltaScanner.scan(snapshotReader);
assertThat(deltaResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(deltaResult.splits())))
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|500"));
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);

IncrementalStartingScanner changeLogScanner =
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.CHANGELOG);
StartingScanner.ScannedResult changeLogResult =
(StartingScanner.ScannedResult) changeLogScanner.scan(snapshotReader);
assertThat(changeLogResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(changeLogResult.splits())))
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(INCREMENTAL_BETWEEN.key(), "1,4");
List<Split> splits = table.copy(dynamicOptions).newScan().plan().splits();
assertThat(getResult(table.newRead(), splits))
.hasSameElementsAs(
Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|400", "+U 3|40|500"));

write.close();
commit.close();
dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
splits = table.copy(dynamicOptions).newScan().plan().splits();
assertThat(getResult(table.newRead(), splits))
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|500"));
}

@Override
Expand Down