Skip to content

Commit

Permalink
refact the scan logic of combined mode compaction to reuse the logic …
Browse files Browse the repository at this point in the history
…of scan table.
  • Loading branch information
wg1026688210 committed May 10, 2024
1 parent 28e7f20 commit 4ed800f
Show file tree
Hide file tree
Showing 8 changed files with 909 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.system.BucketsTable;

import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions;

/**
* This class is responsible for implementing the scanning logic {@link MultiTableScanBase} for the
* table with multi bucket such as dynamic or fixed bucket table.
*/
public class MultiAwareBucketTableScan extends MultiTableScanBase<Tuple2<Split, String>> {
protected transient Map<Identifier, BucketsTable> tablesMap;
protected transient Map<Identifier, StreamTableScan> scansMap;

public MultiAwareBucketTableScan(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
isRunning);
tablesMap = new HashMap<>();
scansMap = new HashMap<>();
}

@Override
List<Tuple2<Split, String>> doScan() {
List<Tuple2<Split, String>> splits = new ArrayList<>();
for (Map.Entry<Identifier, StreamTableScan> entry : scansMap.entrySet()) {
Identifier identifier = entry.getKey();
StreamTableScan scan = entry.getValue();
splits.addAll(
scan.plan().splits().stream()
.map(split -> new Tuple2<>(split, identifier.getFullName()))
.collect(Collectors.toList()));
}
return splits;
}

@Override
public boolean checkTableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.Split;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable;

/**
* This class is responsible for implementing the scanning logic for the table of different type
* buckets during compaction.
*
* @param <T> the result of scanning file :
* <ol>
* <li>Tuple2<{@link Split},String> for the table with multi buckets, such as dynamic or fixed
* bucket table.
* <li>{@link MultiTableAppendOnlyCompactionTask} for the table witch fixed single bucket
* ,such as unaware bucket table.
* </ol>
*/
public abstract class MultiTableScanBase<T> {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableScanBase.class);
protected final Pattern includingPattern;
protected final Pattern excludingPattern;
protected final Pattern databasePattern;

protected transient Catalog catalog;

protected AtomicBoolean isRunning;
protected boolean isStreaming;

public MultiTableScanBase(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
catalog = catalogLoader.load();

this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.databasePattern = databasePattern;
this.isRunning = isRunning;
this.isStreaming = isStreaming;
}

protected void updateTableMap()
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<String> databases = catalog.listDatabases();

for (String databaseName : databases) {
if (databasePattern.matcher(databaseName).matches()) {
List<String> tables = catalog.listTables(databaseName);
for (String tableName : tables) {
Identifier identifier = Identifier.create(databaseName, tableName);
if (shouldCompactTable(identifier, includingPattern, excludingPattern)
&& (!checkTableScanned(identifier))) {
Table table = catalog.getTable(identifier);
if (!(table instanceof FileStoreTable)) {
LOG.error(
String.format(
"Only FileStoreTable supports compact action. The table type is '%s'.",
table.getClass().getName()));
continue;
}

FileStoreTable fileStoreTable = (FileStoreTable) table;
addScanTable(fileStoreTable, identifier);
}
}
}
}
}

public ScanResult scanTable(SourceFunction.SourceContext<T> ctx)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
try {
if (!isRunning.get()) {
return ScanResult.FINISHED;
}

updateTableMap();
List<T> tasks = doScan();

tasks.forEach(ctx::collect);
return tasks.isEmpty() ? ScanResult.IS_EMPTY : ScanResult.IS_NON_EMPTY;
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is finished.");
return ScanResult.FINISHED;
}
}

abstract List<T> doScan();

/** Check if table has been scanned. */
abstract boolean checkTableScanned(Identifier identifier);

/** Add the scan table to the table map. */
abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier);

/** the result of table scanning. */
public enum ScanResult {
FINISHED,
IS_EMPTY,
IS_NON_EMPTY
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/**
* This class is responsible for implementing the scanning logic {@link MultiTableScanBase} for the
* table with fix single bucket such as unaware bucket table.
*/
public class MultiUnawareBucketTableScan
extends MultiTableScanBase<MultiTableAppendOnlyCompactionTask> {

private static final Logger LOG = LoggerFactory.getLogger(MultiUnawareBucketTableScan.class);

protected transient Map<Identifier, AppendOnlyTableCompactionCoordinator> tablesMap;

public MultiUnawareBucketTableScan(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
isRunning);
tablesMap = new HashMap<>();
}

@NotNull
@Override
List<MultiTableAppendOnlyCompactionTask> doScan() {
// do scan and plan action, emit append-only compaction tasks.
List<MultiTableAppendOnlyCompactionTask> tasks = new ArrayList<>();
for (Map.Entry<Identifier, AppendOnlyTableCompactionCoordinator> tableIdAndCoordinator :
tablesMap.entrySet()) {
Identifier tableId = tableIdAndCoordinator.getKey();
AppendOnlyTableCompactionCoordinator compactionCoordinator =
tableIdAndCoordinator.getValue();
compactionCoordinator.run().stream()
.map(
task ->
new MultiTableAppendOnlyCompactionTask(
task.partition(), task.compactBefore(), tableId))
.forEach(tasks::add);
}
return tasks;
}

@Override
public boolean checkTableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
tablesMap.put(
identifier,
new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming));
}
}
}

0 comments on commit 4ed800f

Please sign in to comment.