Skip to content

Commit

Permalink
[spark][core] fix when a table have no snapshot or no statistics (#3341)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed May 17, 2024
1 parent 29d13fd commit fac9477
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.EmptyRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -204,16 +206,22 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (!(split instanceof StatisticTable.StatisticSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Statistics statistics = dataTable.statistics().get();
Iterator<Statistics> statisticsIterator =
Collections.singletonList(statistics).iterator();
Iterator<InternalRow> rows = Iterators.transform(statisticsIterator, this::toRow);
if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRow.from(projection).replaceRow(row));

Optional<Statistics> statisticsOptional = dataTable.statistics();
if (statisticsOptional.isPresent()) {
Statistics statistics = statisticsOptional.get();
Iterator<Statistics> statisticsIterator =
Collections.singletonList(statistics).iterator();
Iterator<InternalRow> rows = Iterators.transform(statisticsIterator, this::toRow);
if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRow.from(projection).replaceRow(row));
}
return new IteratorRecordReader<>(rows);
} else {
return new EmptyRecordReader<>();
}
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(Statistics statistics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ case class PaimonAnalyzeTableColumnCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = DataSourceV2Relation.create(v2Table, Some(catalog), Some(identifier))
val attributes = getColumnsToAnalyze(relation)

val currentSnapshot = table.snapshotManager().latestSnapshot()
if (currentSnapshot == null) {
return Seq.empty[Row]
}

// compute stats
val attributes = getColumnsToAnalyze(relation, columnNames, allColumns)
val totalSize = PaimonStatsUtils.calculateTotalSize(
sparkSession.sessionState,
table.name(),
Expand Down Expand Up @@ -92,10 +96,7 @@ case class PaimonAnalyzeTableColumnCommand(
Seq.empty[Row]
}

private def getColumnsToAnalyze(
relation: DataSourceV2Relation,
columnNames: Option[Seq[String]],
allColumns: Boolean): Seq[Attribute] = {
private def getColumnsToAnalyze(relation: DataSourceV2Relation): Seq[Attribute] = {
if (columnNames.isDefined && allColumns) {
throw new UnsupportedOperationException(
"Parameter `columnNames` and `allColumns` are " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
Assertions.assertEquals(0, spark.sql("select * from `T$statistics`").count())

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

Expand All @@ -70,6 +71,13 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
Row(2, 0, 2, "{ }"))
}

test("Paimon analyze: analyze table without snapshot") {
spark.sql(s"CREATE TABLE T (id STRING, name STRING)")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
Assertions.assertEquals(0, spark.sql("select * from `T$statistics`").count())
}

test("Paimon analyze: analyze no scan") {
spark.sql(s"CREATE TABLE T (id STRING, name STRING)")
assertThatThrownBy(() => spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS NOSCAN"))
Expand Down

0 comments on commit fac9477

Please sign in to comment.