Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix the incorrect aggregation result due to the sequence number #3101

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeDefaultVisitor;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.InternalRowUtils;

import javax.annotation.Nullable;

/** The sequence generator. */
public class SequenceGenerator {

private final int index;

private final Generator generator;
private final DataType fieldType;

public SequenceGenerator(String field, RowType rowType) {
index = rowType.getFieldNames().indexOf(field);
if (index == -1) {
throw new RuntimeException(
String.format(
"Can not find sequence field %s in table schema: %s", field, rowType));
}
fieldType = rowType.getTypeAt(index);
generator = fieldType.accept(new SequenceGeneratorVisitor());
}

public SequenceGenerator(int index, DataType dataType) {
this.index = index;

this.fieldType = dataType;
if (index == -1) {
throw new RuntimeException(String.format("Index : %s is invalid", index));
}
generator = fieldType.accept(new SequenceGeneratorVisitor());
}

public int index() {
return index;
}

public DataType fieldType() {
return fieldType;
}

@Nullable
public Long generate(InternalRow row) {
return generator.generateNullable(row, index);
}

private interface Generator {
long generate(InternalRow row, int i);

@Nullable
default Long generateNullable(InternalRow row, int i) {
if (row.isNullAt(i)) {
return null;
}
return generate(row, i);
}
}

private static class SequenceGeneratorVisitor extends DataTypeDefaultVisitor<Generator> {

@Override
public Generator visit(CharType charType) {
return stringGenerator();
}

@Override
public Generator visit(VarCharType varCharType) {
return stringGenerator();
}

private Generator stringGenerator() {
return (row, i) -> Long.parseLong(row.getString(i).toString());
}

@Override
public Generator visit(DecimalType decimalType) {
return (row, i) ->
InternalRowUtils.castToIntegral(
row.getDecimal(i, decimalType.getPrecision(), decimalType.getScale()));
}

@Override
public Generator visit(TinyIntType tinyIntType) {
return InternalRow::getByte;
}

@Override
public Generator visit(SmallIntType smallIntType) {
return InternalRow::getShort;
}

@Override
public Generator visit(IntType intType) {
return InternalRow::getInt;
}

@Override
public Generator visit(BigIntType bigIntType) {
return InternalRow::getLong;
}

@Override
public Generator visit(FloatType floatType) {
return (row, i) -> (long) row.getFloat(i);
}

@Override
public Generator visit(DoubleType doubleType) {
return (row, i) -> (long) row.getDouble(i);
}

@Override
public Generator visit(DateType dateType) {
return InternalRow::getInt;
}

@Override
public Generator visit(TimestampType timestampType) {
return (row, i) -> row.getTimestamp(i, timestampType.getPrecision()).getMillisecond();
}

@Override
public Generator visit(LocalZonedTimestampType localZonedTimestampType) {
return (row, i) ->
row.getTimestamp(i, localZonedTimestampType.getPrecision()).getMillisecond();
}

@Override
protected Generator defaultMethod(DataType dataType) {
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
}

/** The Seq wrapper. */
public static class Seq {
public final @Nullable SequenceGenerator generator;
/** Whether the sequence generator is exclusive for one column. */
public final boolean isExclusive;

public Seq(@Nullable SequenceGenerator generator, boolean isExclusive) {
this.generator = generator;
this.isExclusive = isExclusive;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.mergetree.SequenceGenerator;
import org.apache.paimon.mergetree.SequenceGenerator.Seq;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Projection;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;

/** Utils for merge function. */
public class MergeFunctionUtils {

public static Map<Integer, SequenceGenerator> getSequenceGenerator(
CoreOptions options, List<String> fieldNames, RowType rowType) {
Map<Integer, SequenceGenerator> fieldSequences = new HashMap<>();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
String sequenceFieldName =
k.substring(
FIELDS_PREFIX.length() + 1,
k.length() - SEQUENCE_GROUP.length() - 1);
SequenceGenerator sequenceGen = new SequenceGenerator(sequenceFieldName, rowType);
Arrays.stream(v.split(","))
.map(
fieldName -> {
int field = fieldNames.indexOf(fieldName);
if (field == -1) {
throw new IllegalArgumentException(
String.format(
"Field %s can not be found in table schema",
fieldName));
}
return field;
})
.forEach(
field -> {
if (fieldSequences.containsKey(field)) {
throw new IllegalArgumentException(
String.format(
"Field %s is defined repeatedly by multiple groups: %s",
fieldNames.get(field), k));
}
fieldSequences.put(field, sequenceGen);
});

// add self
fieldSequences.put(sequenceGen.index(), sequenceGen);
}
}
return fieldSequences;
}

public static Map<Integer, SequenceGenerator> projectSequence(
int[][] projection, Map<Integer, SequenceGenerator> fieldSequences) {
if (projection == null) {
return fieldSequences;
}
Map<Integer, SequenceGenerator> projectedSequences = new HashMap<>();
int[] projects = Projection.of(projection).toTopLevelIndexes();
Map<Integer, Integer> indexMap = new HashMap<>();
for (int i = 0; i < projects.length; i++) {
indexMap.put(projects[i], i);
}

fieldSequences.forEach(
(field, sequence) -> {
int newField = indexMap.getOrDefault(field, -1);
if (newField != -1) {
int newSequenceId = indexMap.getOrDefault(sequence.index(), -1);
if (newSequenceId == -1) {
throw new RuntimeException(
String.format(
"Can not find new sequence field for new field. new field index is %s",
newField));
} else {
projectedSequences.put(
newField,
new SequenceGenerator(newSequenceId, sequence.fieldType()));
}
}
});
return projectedSequences;
}

public static Seq[] toSeq(
Map<Integer, SequenceGenerator> fieldSequences, FieldAggregator[] fieldAggregators) {
Seq[] sequences = new Seq[fieldAggregators.length];
for (Map.Entry<Integer, SequenceGenerator> entry : fieldSequences.entrySet()) {
int index = entry.getKey();
Seq seq;
if (fieldAggregators[index] != null && fieldAggregators[index].requireSequence()) {
// sequence for the exclusive aggregation function
sequences[index] = new Seq(entry.getValue(), true);
// The sequence field itself.
sequences[entry.getValue().index()] = new Seq(entry.getValue(), true);
} else {
seq = new Seq(entry.getValue(), false);
if (sequences[index] == null) {
sequences[index] = seq;
}
}
}
return sequences;
}

public static Map<Integer, Set<Integer>> seqIndex2Field(
Map<Integer, SequenceGenerator> generators) {
Map<Integer, Set<Integer>> seq2Field = new HashMap<>();
for (Map.Entry<Integer, SequenceGenerator> entry : generators.entrySet()) {
int seqIndex = entry.getValue().index();
seq2Field.computeIfAbsent(seqIndex, k -> new HashSet<>());
Set<Integer> fields = seq2Field.get(seqIndex);
fields.add(entry.getKey());
}
return seq2Field;
}

/** The required sequence's aggregator can not share sequence filed. */
public static void validateSequence(
FieldAggregator[] aggregators,
Map<Integer, SequenceGenerator> sequences,
List<String> fieldNames) {
Map<Integer, Set<Integer>> seq2Field = seqIndex2Field(sequences);
for (int i = 0; i < aggregators.length; i++) {
FieldAggregator aggregator = aggregators[i];
if (aggregator != null && aggregator.requireSequence()) {
SequenceGenerator seqGen = sequences.get(i);
if (seqGen == null) {
continue;
// here we should allow, but for compatibility
}
int seqIndex = seqGen.index();
Set<Integer> values = seq2Field.get(seqIndex);
// remove self.
values.remove(seqIndex);
if (values.size() >= 2) {
throw new RuntimeException(
String.format(
"Should not share sequence field in [%s] due to %s",
values.stream()
.map(fieldNames::get)
.collect(Collectors.joining(",")),
aggregator.name()));
}
}
}
}
}