Skip to content

Commit

Permalink
add unaware bucket compaction worker
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed May 12, 2024
1 parent 28e7f20 commit 5abf5e3
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,33 @@ public void createTableDefault() throws Exception {
catalog.createTable(identifier(), schemaDefault(), true);
}

public void createTable(Identifier identifier) throws Exception {
catalog.createTable(identifier, schemaDefault(), false);
}

protected void commitDefault(List<CommitMessage> messages) throws Exception {
BatchTableCommit commit = getTableDefault().newBatchWriteBuilder().newCommit();
commit.commit(messages);
commit.close();
}

protected List<CommitMessage> writeDataDefault(int size, int times) throws Exception {
return writeData(getTableDefault(),size,times);
}

protected List<CommitMessage> writeData(Table table, int size, int times) throws Exception {
List<CommitMessage> messages = new ArrayList<>();
for (int i = 0; i < times; i++) {
messages.addAll(writeOnce(getTableDefault(), i, size));
messages.addAll(writeOnce(table, i, size));
}

return messages;
}

public FileStoreTable getTableDefault() throws Exception {
return (FileStoreTable) catalog.getTable(identifier());
return getTable(identifier());
}

public FileStoreTable getTable(Identifier identifier) throws Exception {
return (FileStoreTable) catalog.getTable(identifier);
}

private List<CommitMessage> writeOnce(Table table, int time, int size) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
private final Catalog.Loader catalogLoader;

// support multi table compaction
private transient Map<Identifier, UnawareBucketCompactor> compactionHelperContainer;
private transient Map<Identifier, UnawareBucketCompactor> compactorContainer;

private transient ExecutorService lazyCompactExecutor;

Expand All @@ -70,20 +70,20 @@ public AppendOnlyMultiTableCompactionWorkerOperator(
@Override
public void open() throws Exception {
LOG.debug("Opened a append-only multi table compaction worker.");
compactionHelperContainer = new HashMap<>();
compactorContainer = new HashMap<>();
catalog = catalogLoader.load();
}

@Override
protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long checkpointId)
throws IOException {
List<MultiTableCommittable> result = new ArrayList<>();
for (Map.Entry<Identifier, UnawareBucketCompactor> helperEntry :
compactionHelperContainer.entrySet()) {
Identifier tableId = helperEntry.getKey();
UnawareBucketCompactor helper = helperEntry.getValue();
for (Map.Entry<Identifier, UnawareBucketCompactor> compactorWithTable :
compactorContainer.entrySet()) {
Identifier tableId = compactorWithTable.getKey();
UnawareBucketCompactor compactor = compactorWithTable.getValue();

for (Committable committable : helper.prepareCommit(waitCompaction, checkpointId)) {
for (Committable committable : compactor.prepareCommit(waitCompaction, checkpointId)) {
result.add(
new MultiTableCommittable(
tableId.getDatabaseName(),
Expand All @@ -101,12 +101,12 @@ protected List<MultiTableCommittable> prepareCommit(boolean waitCompaction, long
public void processElement(StreamRecord<MultiTableAppendOnlyCompactionTask> element)
throws Exception {
Identifier identifier = element.getValue().tableIdentifier();
compactionHelperContainer
.computeIfAbsent(identifier, this::unwareBucketCompactionHelper)
compactorContainer
.computeIfAbsent(identifier, this::compactor)
.processElement(element.getValue());
}

private UnawareBucketCompactor unwareBucketCompactionHelper(Identifier tableId) {
private UnawareBucketCompactor compactor(Identifier tableId) {
try {
return new UnawareBucketCompactor(
(FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor);
Expand Down Expand Up @@ -136,8 +136,8 @@ public void close() throws Exception {
"Executors shutdown timeout, there may be some files aren't deleted correctly");
}

for (UnawareBucketCompactor helperEntry : compactionHelperContainer.values()) {
helperEntry.close();
for (UnawareBucketCompactor compactor : compactorContainer.values()) {
compactor.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperatorTest.packTask;

/**
* test for {@link AppendOnlyMultiTableCompactionWorkerOperator}.
*/
public class AppendOnlyMultiTableCompactionWorkerOperatorTest extends TableTestBase {
private final String[] tables = {
"a", "b"
};
@Test
public void testAsyncCompactionWorks() throws Exception {

AppendOnlyMultiTableCompactionWorkerOperator workerOperator =
new AppendOnlyMultiTableCompactionWorkerOperator(()->catalog, "user",new Options());

List<StreamRecord<MultiTableAppendOnlyCompactionTask>> records =new ArrayList<>();
//create table and write
for (String table : tables) {
Identifier identifier = identifier(table);
createTable(identifier);

// write 200 files
List<CommitMessage> commitMessages = writeData(getTable(identifier), 200, 20);

packTask(commitMessages, 5).stream().map(
task->new StreamRecord<>(new MultiTableAppendOnlyCompactionTask(task.partition(),task.compactBefore(),identifier))
).forEach(records::add);
}

Assertions.assertThat(records.size()).isEqualTo(8);
workerOperator.open();

for (StreamRecord<MultiTableAppendOnlyCompactionTask> record : records) {
workerOperator.processElement(record);
}

List<MultiTableCommittable> committables = new ArrayList<>();
Long timeStart = System.currentTimeMillis();
long timeout = 60_000L;

Assertions.assertThatCode(
() -> {
while (committables.size() != 8) {
committables.addAll(
workerOperator.prepareCommit(false, Long.MAX_VALUE));

Long now = System.currentTimeMillis();
if (now - timeStart > timeout && committables.size() != 8) {
throw new RuntimeException(
"Timeout waiting for compaction, maybe some error happens in "
+ AppendOnlySingleTableCompactionWorkerOperator
.class
.getName());
}
Thread.sleep(1_000L);
}
})
.doesNotThrowAnyException();
committables.forEach(
a ->
Assertions.assertThat(
((CommitMessageImpl) a.wrappedCommittable())
.compactIncrement()
.compactAfter()
.size()
== 1)
.isTrue());
Set<String> table = committables.stream().map(MultiTableCommittable::getTable).collect(Collectors.toSet());
Assertions.assertThat(table).hasSameElementsAs(Arrays.asList(tables));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected InternalRow dataDefault(int time, int size) {
return GenericRow.of(RANDOM.nextInt(), RANDOM.nextLong(), randomString());
}

private List<AppendOnlyCompactionTask> packTask(List<CommitMessage> messages, int fileSize) {
public static List<AppendOnlyCompactionTask> packTask(List<CommitMessage> messages, int fileSize) {
List<AppendOnlyCompactionTask> result = new ArrayList<>();
List<DataFileMeta> metas =
messages.stream()
Expand Down

0 comments on commit 5abf5e3

Please sign in to comment.