Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pushing dereferences within lambdas into table scan #21957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

zhaner08
Copy link
Contributor

@zhaner08 zhaner08 commented May 13, 2024

Description

This is to extend the enhancement discussed here #3925, and depends/extends on the original PR #4270 that is currently rebasing by @Desmeister

Since the issue and discussion had been idled for years and this kind of optimization could be critical to anyone having highly nested schema and using Unnest, I would like to use this PR to formally restart the discussion on how the community want to eventually support this and if this is on the right direction (I have a working version locally, not this one, that speeds up the query while reducing actual data processed)

From my understanding of the previous discussions, this should be done through below steps:

  • Convert non replicate symbol dereferencing involved with Unnest into lambda functions with subscript expressions for each of the Unnests
  • Push the lambda function down
    • Type 1: lambda function is already above TableScan, in this case, this rule will help to pushdown the dereferencing further, while for any connectors that dont support dereferencing, the rule will preserve the Lambda expression to remove columns
    • Type 2: Lambda functions are not at the ~Leaf, this will be handled by PushDownDereferenceThroughUnnest and many other expression specific rules. PushDownDereferenceThroughUnnest is not handling any unnest symbols currently, but only replicated symbols. In order to support unnest symbols, I believe at least a new expression has to be created, or subscript expression has to be extended otherwise I dont see an easy way to represent the dereferences so it can be further pushed down through other unnests in anyway. I need more guidance on how this could be done or possible with what we have now, that is why this PR in particular is not handling any complex cases like nested Unnest and only push lambdas down through project and filters in a limited way.
  • Pushing dereferencing into TableScan
    • This is kind of implemented by this PR. I extended the existing visitFunctionCall in ConnectorExpressionTranslator to create a new connector expression (can be merged with existing FieldDereference expression if possible), then passing those into existing applyProjection method to let connectors decide how to handle those. For this PR, only HiveMetadata has implementation to handle those, other connectors will simply ignoring them. The applyProjection will create new projections and HiveColumnHandle for Hive with extended HiveColumnProjectionInfo.
  • Pushing dereferences into file readers
    • This is done by this PR. We need a representation of dereferencing into Array (or potentially map). Currently everything is represented by simply Arrays of String (names) or Arrays of Integers (indexes) and by just using this, we cannot pass down any dereferencing that are more complex. I cherry-picked the Subfields classes from Presto since it's already established and have similar methods already implemented for Parquet reader. Though depends on how the community want to represent this, we can swap this with another representation as long as it can supporting anything more complex than simple Structs.
  • Readers skip column readings
    • This is done by the PR, for Parquet, file schema will be pruned to only contain needed columns and other columns will just be an empty block to be returned therefore reduce the actual data scanned while also reduced any data going through local and remote exchange.

This PR is written in a way to reduce the impacts to the existing features while I can fully validate the performance impact while gathering feedbacks and directions from the community. Therefore implementations are normally wrapped in an if instead of fully refactoring the existing method

I believe if this is the right direction, changes can be contributed through below phases

  1. Replacing the existing Array<dereferences> within HiveColumnProjectionInfo to Subfields or anything similar to that and make sure all methods that used to depend on Array<dereferences> now depend on the new representation
  2. Have the newly added optimization rule fully integrate with the existing applyProjection method (or not? It can simply be a non-iterative visitor at the very end like now.)
  3. Instead of just just pruning schemas, we also prune the output symbols/types of the tableScan (currently it keeps the original symbols but just returning empty blocks to minimize changes)
  4. Remove the Lambda expression if the translations are supported by the connector. The current overhead should be small though, but the risk of wrongly removing the lambda expression while connectors are not correctly pruning nested columns are large so this PR is currently still keeping the Lambda expression after the push down.
  5. Supports dereference pushdown of unnest symbols through ~all kind of expressions. I have the two rules added to support pushing down through project and filter, probably we can live with those in short term, but eventually have to address things like how to push down through unnest or other complex expressions

The change has been fully validated except rebasing to the latest Trino release that could have a lot of conflicts due to AST/IR refactoring

trino:default> ***BEFORE*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032355_00008_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032355_00008_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s, 16.5KB/s, 10% active
Per Node: 0.0 parallelism,     1 rows/s,   413B/s
Parallelism: 0.0
Peak Memory: 542B
1.02 [2 rows, 423B] [1 rows/s, 413B/s]


trino:default> ***After*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032332_00007_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032332_00007_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s,   14KB/s, 9% active
Per Node: 0.0 parallelism,     1 rows/s,   344B/s
Parallelism: 0.0
Peak Memory: 542B
1.04 [2 rows, 359B] [1 rows/s, 344B/s]

Byte scanned decreased from 423B to 359B for the sample query, we've seen large performance improvement in production queries

Additional context and related issues

I would really appreciate any kind of comments or feedbacks as without clear directions, I can't further extend this without risking of throwing everything away later. Any of the component should be easily plug in if we have a clear idea of how we want to do it otherwise.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# Section
* Enhance query performance on dereference on unnest symbols

@cla-bot cla-bot bot added the cla-signed label May 13, 2024
@zhaner08 zhaner08 requested a review from martint May 13, 2024 19:00
@github-actions github-actions bot added delta-lake Delta Lake connector hive Hive connector labels May 13, 2024
@zhaner08 zhaner08 self-assigned this May 13, 2024
@zhaner08 zhaner08 requested a review from pettyjamesm May 14, 2024 19:58
// Picked from Presto
public class Subfield
{
public interface PathElement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could be sealed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

// As a result, only support limited cases now which symbol reference has to be uniquely referenced
ImmutableList.Builder<Expression> expressionsBuilder = ImmutableList.<Expression>builder()
.addAll(project.getAssignments().getExpressions());
List<Expression> expressions = expressionsBuilder.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just be: List<Expression> expressions = ImmutableList.copyOf(project.getAssignments().getExpressions());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2


partialTranslations = partialTranslations.entrySet().stream().filter(entry -> {
ArrayFieldDereference arrayFieldDereference = (ArrayFieldDereference) entry.getValue();
return arrayFieldDereference.getTarget() instanceof Variable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could be return arrayFieldDereference.getTarget() instanceof Variable variable && symbolReferenceNamesCount.get(variable.getTarget().getName()) == 1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

combinedPrunedTypes = combinedPrunedTypes.union(prunedType);
}
}
return Optional.ofNullable(combinedPrunedTypes) // Should never be null since subfields is non-empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If combinedPrunedTypes should never be null, then you can just use Optional.of(combinedPrunedTypes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

…d filters, with bug fixes, style fixes and unit tests
@zhaner08 zhaner08 changed the title [WIP] Support pushing dereferences within lambdas into table scan Support pushing dereferences within lambdas into table scan May 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector performance
Development

Successfully merging this pull request may close these issues.

None yet

2 participants