Skip to content

Commit

Permalink
[core] Add numWriters metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Apr 19, 2024
1 parent 85bd8a3 commit 56938db
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 14 deletions.
5 changes: 5 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
</tr>
</thead>
<tbody>
<tr>
<td>numWriters</td>
<td>Gauge</td>
<td>Number of writers in this parallelism.</td>
</tr>
<tr>
<td>bufferPreemptCount</td>
<td>Gauge</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -38,6 +39,7 @@
import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -110,6 +112,7 @@ protected void notifyNewWriter(RecordWriter<T> writer) {
+ " but this is: "
+ writer.getClass());
}

if (writeBufferPool == null) {
LOG.debug("Use default heap memory segment pool for write buffer.");
writeBufferPool =
Expand All @@ -119,6 +122,10 @@ protected void notifyNewWriter(RecordWriter<T> writer) {
.addOwners(this::memoryOwners);
}
writeBufferPool.notifyNewOwner((MemoryOwner) writer);

if (writerBufferMetric != null) {
writerBufferMetric.increaseNumWriters();
}
}

@Override
Expand All @@ -135,6 +142,16 @@ private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
}
}

@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
List<CommitMessage> result = super.prepareCommit(waitCompaction, commitIdentifier);
if (writerBufferMetric != null) {
writerBufferMetric.setNumWriters(writers.values().stream().mapToInt(Map::size).sum());
}
return result;
}

@Override
public void close() throws Exception {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,29 @@
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

/** Metrics for writer buffer. */
public class WriterBufferMetric {

private static final String GROUP_NAME = "writerBuffer";
private static final String NUM_WRITERS = "numWriters";
private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";
private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";

private final MetricGroup metricGroup;
private final AtomicInteger numWriters;

public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
MetricRegistry metricRegistry,
String tableName) {
metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
numWriters = new AtomicInteger(0);
metricGroup.gauge(NUM_WRITERS, numWriters::get);
metricGroup.gauge(
BUFFER_PREEMPT_COUNT,
() ->
Expand All @@ -62,6 +67,14 @@ private long getMetricValue(
return memoryPoolFactory == null ? -1 : function.apply(memoryPoolFactory);
}

public void increaseNumWriters() {
numWriters.incrementAndGet();
}

public void setNumWriters(int x) {
numWriters.set(x);
}

public void close() {
this.metricGroup.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,96 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;

/** test class for {@link TableWriteOperator} with primarykey writer. */
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;

import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/** test class for {@link TableWriteOperator} with primary key writer. */
public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {

@Override
protected void setTableConfig(Options options) {
options.set("primary-key", "a");
options.set("primary-key", "a,b");
options.set("partition", "a");
options.set("bucket", "1");
options.set("bucket-key", "a");
options.set("bucket-key", "b");
options.set("write-buffer-size", "256 b");
options.set("page-size", "32 b");
}

@Test
public void testNumWritersMetric() throws Exception {
String tableName = tablePath.getName();
FileStoreTable fileStoreTable = createFileStoreTable();
TableCommitImpl commit = fileStoreTable.newCommit(COMMIT_USER);

RowDataStoreWriteOperator rowDataStoreWriteOperator =
getRowDataStoreWriteOperator(fileStoreTable);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createWriteOperatorHarness(fileStoreTable, rowDataStoreWriteOperator);

TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
harness.setup(serializer);
harness.open();

OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup();
MetricGroup writerBufferMetricGroup =
metricGroup
.addGroup("paimon")
.addGroup("table", tableName)
.addGroup("writerBuffer");

Gauge<Integer> numWriters =
TestingMetricUtils.getGauge(writerBufferMetricGroup, "numWriters");

// write into three partitions
harness.processElement(GenericRow.of(1, 1), 1);
harness.processElement(GenericRow.of(2, 2), 2);
harness.processElement(GenericRow.of(3, 3), 3);
assertThat(numWriters.getValue()).isEqualTo(3);

// commit messages in three partitions, no writer should be cleaned
harness.prepareSnapshotPreBarrier(1);
harness.snapshot(1, 10);
harness.notifyOfCompletedCheckpoint(1);
commit.commit(
1,
harness.extractOutputValues().stream()
.map(c -> (CommitMessage) c.wrappedCommittable())
.collect(Collectors.toList()));
assertThat(numWriters.getValue()).isEqualTo(3);

// write into two partitions
harness.processElement(GenericRow.of(1, 11), 11);
harness.processElement(GenericRow.of(3, 13), 13);
// checkpoint has not come yet, so no writer should be cleaned
assertThat(numWriters.getValue()).isEqualTo(3);

// checkpoint comes, partition 2 has nothing to write, so it should be cleaned
harness.prepareSnapshotPreBarrier(2);
harness.snapshot(2, 20);
harness.notifyOfCompletedCheckpoint(2);
assertThat(numWriters.getValue()).isEqualTo(2);

harness.endInput();
harness.close();
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@

/** test class for {@link TableWriteOperator}. */
public abstract class WriterOperatorTestBase {

private static final RowType ROW_TYPE =
RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"});
protected static final String COMMIT_USER = "test";

@TempDir public java.nio.file.Path tempDir;
protected Path tablePath;

Expand Down Expand Up @@ -112,7 +115,7 @@ public void testMetric() throws Exception {
}

@NotNull
private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
protected static OneInputStreamOperatorTestHarness<InternalRow, Committable>
createWriteOperatorHarness(
FileStoreTable fileStoreTable, RowDataStoreWriteOperator operator)
throws Exception {
Expand All @@ -126,7 +129,7 @@ public void testMetric() throws Exception {
}

@NotNull
private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
protected static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
FileStoreTable fileStoreTable) {
StoreSinkWrite.Provider provider =
(table, commitUser, state, ioManager, memoryPool, metricGroup) ->
Expand All @@ -141,7 +144,7 @@ private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
memoryPool,
metricGroup);
RowDataStoreWriteOperator operator =
new RowDataStoreWriteOperator(fileStoreTable, null, provider, "test");
new RowDataStoreWriteOperator(fileStoreTable, null, provider, COMMIT_USER);
return operator;
}

Expand All @@ -153,21 +156,21 @@ protected FileStoreTable createFileStoreTable() throws Exception {
setTableConfig(conf);
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);

List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
List<String> primaryKeys = extractListOption(conf, CoreOptions.PRIMARY_KEY);
List<String> partitionKeys = extractListOption(conf, CoreOptions.PARTITION);

schemaManager.createTable(
new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, conf.toMap(), ""));
new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));
return FileStoreTableFactory.create(LocalFileIO.create(), conf);
}

@NotNull
private static List<String> setKeys(Options conf, ConfigOption<String> primaryKey) {
List<String> primaryKeys =
Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
private static List<String> extractListOption(Options conf, ConfigOption<String> option) {
List<String> result =
Optional.ofNullable(conf.get(option))
.map(key -> Arrays.asList(key.split(",")))
.orElse(Collections.emptyList());
conf.remove(primaryKey.key());
return primaryKeys;
conf.remove(option.key());
return result;
}
}

0 comments on commit 56938db

Please sign in to comment.