Skip to content

Commit

Permalink
use refacted combined compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed May 12, 2024
1 parent 9c53d3e commit ad6e873
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 498 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
import org.apache.paimon.flink.sink.CombinedTableCompactorSink;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.MultiTablesCompactorSink;
import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -188,23 +189,34 @@ private void buildForCombinedMode() {
ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
// TODO: Currently, multi-tables compaction don't support tables which bucketmode is UNWARE.
MultiTablesCompactorSourceBuilder sourceBuilder =
new MultiTablesCompactorSourceBuilder(
CombinedTableCompactorSourceBuilder sourceBuilder =
new CombinedTableCompactorSourceBuilder(
catalogLoader(),
databasePattern,
includingPattern,
excludingPattern,
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
DataStream<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();

DataStream<RowData> partitioned =
// multi bucket table which has multi bucket in a partition like fix bucket and dynamic
// bucket
DataStream<RowData> awareBucketTableSource =
partition(
source,
sourceBuilder
.withEnv(env)
.withContinuousMode(isStreaming)
.buildAwareBucketTableSource(),
new BucketsRowChannelComputer(),
tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
new MultiTablesCompactorSink(catalogLoader(), tableOptions).sinkFrom(partitioned);

// unaware bucket table
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource =
sourceBuilder
.withEnv(env)
.withContinuousMode(isStreaming)
.buildForUnawareBucketsTableSource();

new CombinedTableCompactorSink(catalogLoader(), tableOptions)
.sinkFrom(awareBucketTableSource, unawareBucketTableSource);
}

private void buildForTraditionalCompaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.WrappedManifestCommittable;
Expand Down Expand Up @@ -48,7 +49,7 @@
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;

/** A sink for processing multi-tables in dedicated compaction job. */
public class MultiTablesCompactorSink implements Serializable {
public class CombinedTableCompactorSink implements Serializable {
private static final long serialVersionUID = 1L;

private static final String WRITER_NAME = "Writer";
Expand All @@ -59,54 +60,75 @@ public class MultiTablesCompactorSink implements Serializable {

private final Options options;

public MultiTablesCompactorSink(Catalog.Loader catalogLoader, Options options) {
public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) {
this.catalogLoader = catalogLoader;
this.ignorePreviousFiles = false;
this.options = options;
}

public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
public DataStreamSink<?> sinkFrom(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the states of write and
// commit operators.
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser);
return sinkFrom(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);
}

public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String initialCommitUser) {
public DataStreamSink<?> sinkFrom(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource,
String initialCommitUser) {
// do the actually writing action, no snapshot generated in this stage
SingleOutputStreamOperator<MultiTableCommittable> written =
doWrite(input, initialCommitUser, input.getParallelism());
DataStream<MultiTableCommittable> written =
doWrite(awareBucketTableSource, unawareBucketTableSource, initialCommitUser);

// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}

public SingleOutputStreamOperator<MultiTableCommittable> doWrite(
DataStream<RowData> input, String commitUser, Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
public DataStream<MultiTableCommittable> doWrite(
DataStream<RowData> awareBucketTableSource,
DataStream<MultiTableAppendOnlyCompactionTask> unawareBucketTableSource,
String commitUser) {
StreamExecutionEnvironment env = awareBucketTableSource.getExecutionEnvironment();
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

SingleOutputStreamOperator<MultiTableCommittable> written =
input.transform(
WRITER_NAME,
SingleOutputStreamOperator<MultiTableCommittable> multiBucketTableRewriter =
awareBucketTableSource
.transform(
String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
createWriteOperator(
combinedMultiComacptionWriteOperator(
env.getCheckpointConfig(), isStreaming, commitUser))
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);
.setParallelism(awareBucketTableSource.getParallelism());

SingleOutputStreamOperator<MultiTableCommittable> unawareBucketTableRewriter =
unawareBucketTableSource
.transform(
String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME),
new MultiTableCommittableTypeInfo(),
new AppendOnlyMultiTableCompactionWorkerOperator(
catalogLoader, commitUser, options))
.setParallelism(unawareBucketTableSource.getParallelism());

if (!isStreaming) {
assertBatchConfiguration(env, written.getParallelism());
assertBatchConfiguration(env, multiBucketTableRewriter.getParallelism());
assertBatchConfiguration(env, unawareBucketTableRewriter.getParallelism());
}

if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
declareManagedMemory(
multiBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
declareManagedMemory(
unawareBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
return written;
return multiBucketTableRewriter.union(unawareBucketTableRewriter);
}

protected DataStreamSink<?> doCommit(
Expand Down Expand Up @@ -138,8 +160,9 @@ protected DataStreamSink<?> doCommit(
}

// TODO:refactor FlinkSink to adopt this sink
protected OneInputStreamOperator<RowData, MultiTableCommittable> createWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
protected OneInputStreamOperator<RowData, MultiTableCommittable>
combinedMultiComacptionWriteOperator(
CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) {
return new MultiTablesStoreCompactOperator(
catalogLoader,
commitUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.operator.MultiTablesBatchCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.MultiTablesStreamingCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedAwareBatchSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSourceFunction;
import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSourceFunction;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -35,9 +38,9 @@

/**
* source builder to build a Flink compactor source for multi-tables. This is for dedicated
* compactor jobs.
* compactor jobs in combined mode.
*/
public class MultiTablesCompactorSourceBuilder {
public class CombinedTableCompactorSourceBuilder {
private final Catalog.Loader catalogLoader;
private final Pattern includingPattern;
private final Pattern excludingPattern;
Expand All @@ -47,7 +50,7 @@ public class MultiTablesCompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;

public MultiTablesCompactorSourceBuilder(
public CombinedTableCompactorSourceBuilder(
Catalog.Loader catalogLoader,
Pattern databasePattern,
Pattern includingPattern,
Expand All @@ -60,39 +63,60 @@ public MultiTablesCompactorSourceBuilder(
this.monitorInterval = monitorInterval;
}

public MultiTablesCompactorSourceBuilder withContinuousMode(boolean isContinuous) {
public CombinedTableCompactorSourceBuilder withContinuousMode(boolean isContinuous) {
this.isContinuous = isContinuous;
return this;
}

public MultiTablesCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) {
public CombinedTableCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
}

public DataStream<RowData> build() {
public DataStream<RowData> buildAwareBucketTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
RowType produceType = BucketsTable.getRowType();
if (isContinuous) {
return MultiTablesStreamingCompactorSourceFunction.buildSource(
return CombinedAwareStreamingSourceFunction.buildSource(
env,
"MultiTables-StreamingCompactorSource",
"Combine-MultiBucketTables--StreamingCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
} else {
return MultiTablesBatchCompactorSourceFunction.buildSource(
return CombinedAwareBatchSourceFunction.buildSource(
env,
"MultiTables-BatchCompactorSource",
"Combine-MultiBucketTables-BatchCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
catalogLoader,
includingPattern,
excludingPattern,
databasePattern);
}
}

public DataStream<MultiTableAppendOnlyCompactionTask> buildForUnawareBucketsTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
if (isContinuous) {
return CombinedUnawareStreamingSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-StreamingCompactorSource",
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
} else {
return CombinedUnawareBatchSourceFunction.buildSource(
env,
"Combined-UnawareBucketTables-BatchCompactorSource",
catalogLoader,
includingPattern,
excludingPattern,
databasePattern);
}
}
}

0 comments on commit ad6e873

Please sign in to comment.