Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add support of sequence field in first row #3264

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/content/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,13 @@ is an empty map instead of `+I[1->A]`

## First Row

By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. It differs from the
`deduplicate` merge engine that in the `first-row` merge engine, it will generate insert only changelog.
By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. You can use the first row merge engine with or without setting `sequence.field`.
When the sequence field is set, it retains the first row in a user-defined order and may generate an UPDATE_BEFORE message to adjust the outcome.
If the sequence field is not set, it keeps the first row in natural order, resulting in only an INSERT message being created.

{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
2. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config `ignore-delete` to ignore these two kinds records.
{{< /hint >}}

This is of great help in replacing log deduplication in streaming computation.
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,10 @@ public MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}

public boolean noDupKeysOverLevel0() {
return mergeEngine() == MergeEngine.FIRST_ROW && sequenceField().isEmpty();
}

public boolean ignoreDelete() {
return options.get(IGNORE_DELETE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void pushdown(Predicate keyFilter) {
options.scanManifestParallelism(),
branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
options.noDupKeysOverLevel0());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,16 @@ public void add(KeyValue kv) {
public ChangelogResult getResult() {
// 1. Compute the latest high level record and containLevel0 of candidates
LinkedList<KeyValue> candidates = mergeFunction.candidates();
Iterator<KeyValue> descending = candidates.descendingIterator();
Iterator<KeyValue> iterator =
mergeFunction.getLevelOrder() == LookupMergeFunction.LevelOrder.DESCENDING
? candidates.descendingIterator()
: candidates.iterator();
KeyValue highLevel = null;
boolean containLevel0 = false;
while (descending.hasNext()) {
KeyValue kv = descending.next();
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
if (kv.level() > 0) {
descending.remove();
iterator.remove();
if (highLevel == null) {
highLevel = kv;
}
Expand All @@ -141,14 +144,13 @@ public ChangelogResult getResult() {
}
}
}

// 3. Calculate result
KeyValue result = calculateResult(candidates, highLevel);

// 4. Set changelog when there's level-0 records
reusedResult.reset();
if (containLevel0 && lookupStrategy.produceChangelog) {
setChangelog(highLevel, result);
setChangelog(highLevel, result, reusedResult);
}

return reusedResult.setResult(result);
Expand All @@ -171,7 +173,8 @@ private KeyValue calculateResult(List<KeyValue> candidates, @Nullable KeyValue h
return mergeFunction2.getResult();
}

private void setChangelog(@Nullable KeyValue before, KeyValue after) {
protected void setChangelog(
@Nullable KeyValue before, KeyValue after, ChangelogResult reusedResult) {
if (before == null || !before.isAdd()) {
if (after.isAdd()) {
reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
Expand All @@ -196,7 +199,7 @@ private KeyValue replaceAfter(RowKind valueKind, KeyValue from) {
return replace(reusedAfter, valueKind, from);
}

private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
protected KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue from) {
return reused.replace(from.key(), from.sequenceNumber(), valueKind, from.value());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,27 @@ public class LookupMergeFunction implements MergeFunction<KeyValue> {
private final LinkedList<KeyValue> candidates = new LinkedList<>();
private final InternalRowSerializer keySerializer;
private final InternalRowSerializer valueSerializer;
private final LevelOrder levelOrder;

public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType valueType) {
MergeFunction<KeyValue> mergeFunction,
RowType keyType,
RowType valueType,
LevelOrder levelOrder) {
this.mergeFunction = mergeFunction;
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
this.levelOrder = levelOrder;
}

/**
* The relationship between the high-level order and the sequence order.
* <li>For the first row, as the level increases, so does the sequence number.
* <li>For all other merge engine, as the level increases, the sequence number decreases.
*/
enum LevelOrder {
DESCENDING,
ASCENDING
}

@Override
Expand All @@ -60,13 +75,16 @@ public void add(KeyValue kv) {
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
Iterator<KeyValue> descending = candidates.descendingIterator();
Iterator<KeyValue> iterator =
levelOrder == LevelOrder.DESCENDING
? candidates.descendingIterator()
: candidates().iterator();
KeyValue highLevel = null;
while (descending.hasNext()) {
KeyValue kv = descending.next();
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
if (kv.level() > 0) {
if (highLevel != null) {
descending.remove();
iterator.remove();
} else {
highLevel = kv;
}
Expand All @@ -83,14 +101,23 @@ LinkedList<KeyValue> candidates() {
return candidates;
}

LevelOrder getLevelOrder() {
return levelOrder;
}

public static MergeFunctionFactory<KeyValue> wrap(
MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType valueType) {
if (wrapped.create() instanceof FirstRowMergeFunction) {
if (wrapped.create().getClass() == FirstRowMergeFunction.class) {
// don't wrap first row, it is already OK
return wrapped;
}

return new Factory(wrapped, keyType, valueType);
return new Factory(
wrapped,
keyType,
valueType,
wrapped.create() instanceof UnOrderedFirstRowMergeFunction
? LevelOrder.ASCENDING
: LevelOrder.DESCENDING);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {
Expand All @@ -100,18 +127,25 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
private final MergeFunctionFactory<KeyValue> wrapped;
private final RowType keyType;
private final RowType rowType;
private final LevelOrder levelOrder;

private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType rowType) {
private Factory(
MergeFunctionFactory<KeyValue> wrapped,
RowType keyType,
RowType rowType,
LevelOrder levelOrder) {
this.wrapped = wrapped;
this.keyType = keyType;
this.rowType = rowType;
this.levelOrder = levelOrder;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
RowType valueType =
projection == null ? rowType : Projection.of(projection).project(rowType);
return new LookupMergeFunction(wrapped.create(projection), keyType, valueType);
return new LookupMergeFunction(
wrapped.create(projection), keyType, valueType, levelOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,47 @@ public MergeFunctionWrapper<ChangelogResult> create(
});
}
}

/** A {@link MergeFunctionWrapperFactory} for first row. */
public static class UnOrderedFirstRowMergeFunctionWrapperFactory<T>
implements MergeFunctionWrapperFactory<T> {

private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
private final LookupStrategy lookupStrategy;
@Nullable private final UserDefinedSeqComparator userDefinedSeqComparator;

public UnOrderedFirstRowMergeFunctionWrapperFactory(
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
LookupStrategy lookupStrategy,
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.lookupStrategy = lookupStrategy;
this.userDefinedSeqComparator = userDefinedSeqComparator;
}

@Override
public MergeFunctionWrapper<ChangelogResult> create(
MergeFunctionFactory<KeyValue> mfFactory,
int outputLevel,
LookupLevels<T> lookupLevels,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
return new UnOrderedFirstRowMergeFunctionWrapper<>(
mfFactory,
key -> {
try {
return lookupLevels.lookup(key, outputLevel + 1);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
},
valueEqualiser,
changelogRowDeduplicate,
lookupStrategy,
deletionVectorsMaintainer,
userDefinedSeqComparator);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.KeyValue;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

/**
* This class is the same as FirstRowMergeFunction, but it is specifically for merging unordered
* input.
*/
public class UnOrderedFirstRowMergeFunction extends FirstRowMergeFunction {
protected UnOrderedFirstRowMergeFunction(RowType keyType, RowType valueType) {
super(keyType, valueType);
}

public static MergeFunctionFactory<KeyValue> factory(RowType keyType, RowType valueType) {
return new UnOrderedFirstRowMergeFunction.Factory(keyType, valueType);
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;
private final RowType keyType;
private final RowType valueType;

public Factory(RowType keyType, RowType valueType) {
this.keyType = keyType;
this.valueType = valueType;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new UnOrderedFirstRowMergeFunction(keyType, valueType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import javax.annotation.Nullable;

import java.util.function.Function;

/** Wrapper for {@link MergeFunction}s to produce changelog by lookup for first row. */
public class UnOrderedFirstRowMergeFunctionWrapper<T>
extends LookupChangelogMergeFunctionWrapper<T> {

private final KeyValue reusedBefore = new KeyValue();
private final KeyValue reusedAfter = new KeyValue();

public UnOrderedFirstRowMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, T> lookup,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
super(
mergeFunctionFactory,
lookup,
valueEqualiser,
changelogRowDeduplicate,
lookupStrategy,
deletionVectorsMaintainer,
userDefinedSeqComparator);
}

@Override
protected void setChangelog(
@Nullable KeyValue before, KeyValue after, ChangelogResult reusedResult) {
if (after.level() == 0) {
if (before == null) {
reusedResult.addChangelog(after);
} else {
reusedResult
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, before))
.addChangelog(replace(reusedAfter, RowKind.UPDATE_AFTER, after));
}
}
}
}