Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Flink read / write data branch #3029

Merged
merged 1 commit into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>Whether to create underlying storage when reading and writing the table.</td>
</tr>
<tr>
<td><h5>branch</h5></td>
<td style="word-wrap: break-word;">"main"</td>
<td>String</td>
<td>Specify branch name.</td>
</tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

public static final String FILE_FORMAT_ORC = "orc";
public static final String FILE_FORMAT_AVRO = "avro";
public static final String FILE_FORMAT_PARQUET = "parquet";
Expand Down Expand Up @@ -1178,6 +1181,17 @@ public Path path() {
return path(options.toMap());
}

public String branch() {
return branch(options.toMap());
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH.key())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use options.get() will handle with the default value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the previous description was incorrect, Most of the parameters for this method come from "org.apache.paimon.table.FileStoreTable #options", 'branch' parameter may be empty.

return options.get(BRANCH.key());
}
return BRANCH.defaultValue();
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Base {@link FileStore} implementation.
*
Expand Down Expand Up @@ -104,7 +102,7 @@ public FileStorePathFactory pathFactory() {

@Override
public SnapshotManager snapshotManager() {
return new SnapshotManager(fileIO, options.path());
return new SnapshotManager(fileIO, options.path(), options.branch());
}

@Override
Expand Down Expand Up @@ -175,10 +173,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -196,7 +190,7 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
branchName,
options.branch(),
newStatsFileHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
Expand Down Expand Up @@ -71,11 +70,7 @@ public BucketMode bucketMode() {

@Override
public AppendOnlyFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public AppendOnlyFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.fileIndexReadEnabled());
}

Expand Down
4 changes: 0 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public interface FileStore<T> extends Serializable {

FileStoreScan newScan();

FileStoreScan newScan(String branchName);

ManifestList.Factory manifestListFactory();

ManifestFile.Factory manifestFileFactory();
Expand All @@ -81,8 +79,6 @@ public interface FileStore<T> extends Serializable {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);

SnapshotDeletion newSnapshotDeletion();

TagManager newTagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** {@link FileStore} for querying and updating {@link KeyValue}s. */
Expand Down Expand Up @@ -112,11 +111,7 @@ public BucketMode bucketMode() {

@Override
public KeyValueFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
Expand All @@ -209,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
return pathFactoryMap;
}

private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
return getDataTable(identifier);
Table table = getDataTable(identifier);
return table;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
private final String branchName;

private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
Expand All @@ -102,8 +101,7 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName) {
Integer scanManifestParallelism) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
Expand All @@ -115,7 +113,6 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -397,7 +394,7 @@ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
if (manifests == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot(branchName)
? snapshotManager.latestSnapshot()
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean fileIndexReadEnabled) {
super(
partitionType,
Expand All @@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.simpleStatsConverters =
new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
this.fileIndexReadEnabled = fileIndexReadEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void commit(
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot(branchName);
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand Down Expand Up @@ -654,7 +654,7 @@ private int tryCommit(
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
if (tryCommitOnce(
tableFiles,
Expand Down Expand Up @@ -754,7 +754,7 @@ public boolean tryCommitOnce(
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down Expand Up @@ -839,7 +839,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest(branchName).get().id();
long latestSchemaId = schemaManager.latest().get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
Expand Down Expand Up @@ -904,7 +904,7 @@ public boolean tryCommitOnce(
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId, branchName);
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public KeyValueFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine) {
super(
Expand All @@ -75,8 +74,7 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.fieldKeyStatsConverters =
new SimpleStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ public FileStoreScan newScan() {
return wrapped.newScan();
}

@Override
public FileStoreScan newScan(String branchName) {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newScan(branchName);
}

@Override
public ManifestList.Factory manifestListFactory() {
return wrapped.manifestListFactory();
Expand Down Expand Up @@ -144,12 +138,6 @@ public FileStoreCommit newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public FileStoreCommit newCommit(String commitUser, String branchName) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, branchName);
}

@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ public SnapshotReader newSnapshotReader() {
return wrapped.newSnapshotReader();
}

@Override
public SnapshotReader newSnapshotReader(String branchName) {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader(branchName);
}

@Override
public CoreOptions coreOptions() {
return wrapped.coreOptions();
Expand Down Expand Up @@ -270,12 +264,6 @@ public TableCommitImpl newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public TableCommitImpl newCommit(String commitUser, String branchName) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, branchName);
}

@Override
public LocalTableQuery newLocalTableQuery() {
privilegeChecker.assertCanSelect(identifier);
Expand Down