Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add numWriters metrics #3234

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
</tr>
</thead>
<tbody>
<tr>
<td>numWriters</td>
<td>Gauge</td>
<td>Number of writers in this parallelism.</td>
</tr>
<tr>
<td>bufferPreemptCount</td>
<td>Gauge</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -38,6 +39,7 @@
import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -110,6 +112,7 @@ protected void notifyNewWriter(RecordWriter<T> writer) {
+ " but this is: "
+ writer.getClass());
}

if (writeBufferPool == null) {
LOG.debug("Use default heap memory segment pool for write buffer.");
writeBufferPool =
Expand All @@ -119,6 +122,10 @@ protected void notifyNewWriter(RecordWriter<T> writer) {
.addOwners(this::memoryOwners);
}
writeBufferPool.notifyNewOwner((MemoryOwner) writer);

if (writerBufferMetric != null) {
writerBufferMetric.increaseNumWriters();
}
}

@Override
Expand All @@ -135,6 +142,16 @@ private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
}
}

@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
List<CommitMessage> result = super.prepareCommit(waitCompaction, commitIdentifier);
if (writerBufferMetric != null) {
writerBufferMetric.setNumWriters(writers.values().stream().mapToInt(Map::size).sum());
}
return result;
}

@Override
public void close() throws Exception {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,29 @@
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

/** Metrics for writer buffer. */
public class WriterBufferMetric {

private static final String GROUP_NAME = "writerBuffer";
private static final String NUM_WRITERS = "numWriters";
private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";
private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";

private final MetricGroup metricGroup;
private final AtomicInteger numWriters;

public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
MetricRegistry metricRegistry,
String tableName) {
metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
numWriters = new AtomicInteger(0);
metricGroup.gauge(NUM_WRITERS, numWriters::get);
metricGroup.gauge(
BUFFER_PREEMPT_COUNT,
() ->
Expand All @@ -62,6 +67,14 @@ private long getMetricValue(
return memoryPoolFactory == null ? -1 : function.apply(memoryPoolFactory);
}

public void increaseNumWriters() {
numWriters.incrementAndGet();
}

public void setNumWriters(int x) {
numWriters.set(x);
}

public void close() {
this.metricGroup.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,96 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;

/** test class for {@link TableWriteOperator} with primarykey writer. */
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;

import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/** test class for {@link TableWriteOperator} with primary key writer. */
public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {

@Override
protected void setTableConfig(Options options) {
options.set("primary-key", "a");
options.set("primary-key", "a,b");
options.set("partition", "a");
options.set("bucket", "1");
options.set("bucket-key", "a");
options.set("bucket-key", "b");
options.set("write-buffer-size", "256 b");
options.set("page-size", "32 b");
}

@Test
public void testNumWritersMetric() throws Exception {
String tableName = tablePath.getName();
FileStoreTable fileStoreTable = createFileStoreTable();
TableCommitImpl commit = fileStoreTable.newCommit(COMMIT_USER);

RowDataStoreWriteOperator rowDataStoreWriteOperator =
getRowDataStoreWriteOperator(fileStoreTable);
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
createWriteOperatorHarness(fileStoreTable, rowDataStoreWriteOperator);

TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
harness.setup(serializer);
harness.open();

OperatorMetricGroup metricGroup = rowDataStoreWriteOperator.getMetricGroup();
MetricGroup writerBufferMetricGroup =
metricGroup
.addGroup("paimon")
.addGroup("table", tableName)
.addGroup("writerBuffer");

Gauge<Integer> numWriters =
TestingMetricUtils.getGauge(writerBufferMetricGroup, "numWriters");

// write into three partitions
harness.processElement(GenericRow.of(1, 1), 1);
harness.processElement(GenericRow.of(2, 2), 2);
harness.processElement(GenericRow.of(3, 3), 3);
assertThat(numWriters.getValue()).isEqualTo(3);

// commit messages in three partitions, no writer should be cleaned
harness.prepareSnapshotPreBarrier(1);
harness.snapshot(1, 10);
harness.notifyOfCompletedCheckpoint(1);
commit.commit(
1,
harness.extractOutputValues().stream()
.map(c -> (CommitMessage) c.wrappedCommittable())
.collect(Collectors.toList()));
assertThat(numWriters.getValue()).isEqualTo(3);

// write into two partitions
harness.processElement(GenericRow.of(1, 11), 11);
harness.processElement(GenericRow.of(3, 13), 13);
// checkpoint has not come yet, so no writer should be cleaned
assertThat(numWriters.getValue()).isEqualTo(3);

// checkpoint comes, partition 2 has nothing to write, so it should be cleaned
harness.prepareSnapshotPreBarrier(2);
harness.snapshot(2, 20);
harness.notifyOfCompletedCheckpoint(2);
assertThat(numWriters.getValue()).isEqualTo(2);

harness.endInput();
harness.close();
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@

/** test class for {@link TableWriteOperator}. */
public abstract class WriterOperatorTestBase {

private static final RowType ROW_TYPE =
RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"a", "b"});
protected static final String COMMIT_USER = "test";

@TempDir public java.nio.file.Path tempDir;
protected Path tablePath;

Expand Down Expand Up @@ -112,7 +115,7 @@ public void testMetric() throws Exception {
}

@NotNull
private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
protected static OneInputStreamOperatorTestHarness<InternalRow, Committable>
createWriteOperatorHarness(
FileStoreTable fileStoreTable, RowDataStoreWriteOperator operator)
throws Exception {
Expand All @@ -126,7 +129,7 @@ public void testMetric() throws Exception {
}

@NotNull
private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
protected static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
FileStoreTable fileStoreTable) {
StoreSinkWrite.Provider provider =
(table, commitUser, state, ioManager, memoryPool, metricGroup) ->
Expand All @@ -141,7 +144,7 @@ private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
memoryPool,
metricGroup);
RowDataStoreWriteOperator operator =
new RowDataStoreWriteOperator(fileStoreTable, null, provider, "test");
new RowDataStoreWriteOperator(fileStoreTable, null, provider, COMMIT_USER);
return operator;
}

Expand All @@ -153,21 +156,21 @@ protected FileStoreTable createFileStoreTable() throws Exception {
setTableConfig(conf);
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);

List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
List<String> primaryKeys = extractListOption(conf, CoreOptions.PRIMARY_KEY);
List<String> partitionKeys = extractListOption(conf, CoreOptions.PARTITION);

schemaManager.createTable(
new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, conf.toMap(), ""));
new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));
return FileStoreTableFactory.create(LocalFileIO.create(), conf);
}

@NotNull
private static List<String> setKeys(Options conf, ConfigOption<String> primaryKey) {
List<String> primaryKeys =
Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
private static List<String> extractListOption(Options conf, ConfigOption<String> option) {
List<String> result =
Optional.ofNullable(conf.get(option))
.map(key -> Arrays.asList(key.split(",")))
.orElse(Collections.emptyList());
conf.remove(primaryKey.key());
return primaryKeys;
conf.remove(option.key());
return result;
}
}