Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28254: CBO (Calcite Return Path): Multiple DISTINCT leads to wrong results #5245

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

okumin
Copy link
Contributor

@okumin okumin commented May 12, 2024

What changes were proposed in this pull request?

This PR would resolve the issue that causes a UDAF accesses a wrong key.

https://issues.apache.org/jira/browse/HIVE-28254

Why are the changes needed?

Bug fix. Without this fix, #5091 would generate wrong results.

Does this PR introduce any user-facing change?

Literally no. Affected queries are not generating correct results.

Is the change a dependency upgrade?

No.

How was this patch tested?

Added a qtest.

// The original internal name is already obsolete as any DISTINCT keys are renamed.
rsUDAFParamName = distinctColumnMapping.get(rsUDAFParamColInfo);
} else {
rsUDAFParamName = rsUDAFParamColInfo.getInternalName();
Copy link
Contributor Author

@okumin okumin May 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getInternalName can retain KEY.col0:{index of RS keys}.col0. It conflicts with the name generated by the following logic. So, the one in getInternanlName could be no more available in GroupByOperator.

          distinctColumnName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName
                  + ":" + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j);

Non-Return Path is likely to resolve the correct column name with RowResolver. This PR would implement the logic by traversing params with two paths.
Why do we need the two iterations? That's because the non-distinct UDAF may refer to the input of DISTINCT UDAF which comes later like below.

SELECT
  SUM(DISTINCT col1),
  COUNT(DISTINCT col1),
  SUM(col2), -- This has to refer to the key for `SUM(DISTINCT col2)`
  MAX(DISTINCT col1),
  SUM(DISTINCT col2),
  MIN(DISTINCT col1)
FROM test;

Note that this method is used only when hive.map.aggr is disabled. With it enabled, non-distinct UDAFs don't refer to DISTINCT keys as they are deduplicated.

Copy link
Contributor

@kasakrisz kasakrisz May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@okumin
I'm not familiar with the logic of distinct key generation. IIUC the issue is caused by that some of these keys are conflicting with non-distinct keys when we are running the query you mentioned.

Could you please elaborate distinct key generation logic?
Why can we construct keys like this

distinctColumnName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName + ":" + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j);

I was checking the legacy non-cbo return patch code, it also has this

if (isDistinct && lastKeyColName != null) {
// if aggr is distinct, the parameter is name is constructed as
// KEY.lastKeyColName:<tag>._colx
paraExpression = Utilities.ReduceField.KEY.name() + "." +
lastKeyColName + ":" + numDistinctUDFs + "." +
getColumnInternalName(i - 1);
}

but I still don't understand why can we reference columns which are not coming any schema or what is the schema which contains these generated keys?

Copy link
Contributor Author

@okumin okumin May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not. Let me dive into the current behavior. Please feel free to give me an additional question. I will try to give a supplementary information or investigate more.

Sample query

I will use the following query to illustrate a specific case.

CREATE TABLE test (agg_key STRING, udaf_key_1 INT, udaf_key_2 INT);

set hive.cbo.returnpath.hiveop=true;
set hive.map.aggr=false;

SELECT
  agg_key,
  SUM(DISTINCT udaf_key_1),
  COUNT(DISTINCT udaf_key_1),
  SUM(DISTINCT udaf_key_2),
  SUM(udaf_key_2)
FROM test
GROUP BY agg_key;

How to encode keys on ReduceSinkOperator

ReduceSinkOperator stores (sort) keys and one union of all DISTINCT keys in the list of keys.

For the sample query, the actual ReduceSinkDesk is an object like below.

ReduceSinkDesk {
  keyCols = [agg_key, udaf_key_1, udaf_key_2]
  outputKeyColumnNames = [_col0, _col1]
  distinctColumnIndices = [1, 1, 2]
  numDistributionKeys = 1
}

Then, the following struct will be generated as a list of keys.

{
  _col0 = Text for agg_key
  _col1 = UnionObject for (udaf_key_1, udaf_key_1, udaf_key_2)
}

And its RowSchema of RS is [KEY._col0, KEY._col1:0._col0, KEY._col1:1._col0]. To be honest, I am not sure how this is configured, but it makes sense to have deduplicated expressions.

KEY.%s:%s.%s is an expression for a UNION column.

How Return Path configures UDAF keys

As you see, HiveGBOpConvUtil names UDAFs keys as below.

  • If it is a distinct key, new names are assigned, KEY.{the final output col name}:{position of UDAF}.{position of arguments of the UDAF}
    • KEY._col1:0.col0 for the arg of SUM(DISTINCT udaf_key_1)
    • KEY._col1:1.col0 for the arg of COUNT(DISTINCT udaf_key_1)
    • KEY._col1:2.col0 for the arg of SUM(DISTINCT udaf_key_2)
  • If it is not a distinct key, the internal name of ReduceSink is used as it is
    • KEY._col1:1._col0 for the arg of SUM(udaf_key_2), derived from naming of RSO, [KEY._col0, KEY._col1:0._col0, KEY._col1:1._col0]
      • This conflicts with the arg of COUNT(DISTINCT udaf_key_1)

Problem

The configured param names must be ones for the world of GroupByOperator. But the current implementation uses a name in the world of ReduceSinkOperator. In the worst case, the name in ReduceSinkOperator can conflict with the name for a newly created GroupByOperator, and can access a wrong column.

Questions

  • Why does it happen only with hive.map.aggr=false?
    • With hive.map.aggr=true, it is basically impossible to reuse an expression of a DISTINCT key that could be deduplicated
  • Why GroupByOperator needs to rename the source column for UNION?
    • Not sure yet

Copy link

sonarcloud bot commented May 12, 2024

Quality Gate Passed Quality Gate passed

Issues
3 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarCloud

Execution mode: llap
Reduce Operator Tree:
Group By Operator
aggregations: sum(DISTINCT KEY._col0:0._col0), count(DISTINCT KEY._col0:1._col0), sum(KEY._col0:3._col0), max(DISTINCT KEY._col0:2._col0), sum(DISTINCT KEY._col0:3._col0), min(DISTINCT KEY._col0:4._col0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this patch, the third UDAF is not sum(KEY._col0:3._col0) but sum(KEY._col0:1._col0), which means it reads a different column.
3681775#diff-c0641a94a4e5e0c28830fcdccd252506a0dc7842a24aa414bf096a229e8e4235R262

With set hive.cbo.returnpath.hiveop=false;, Hive generates the same plan as that in q.out.

@@ -811,7 +811,7 @@ STAGE PLANS:
Execution mode: llap
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), sum(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2), sum(KEY._col1:0._col0), sum(KEY._col1:1._col0), sum(KEY._col0), sum(DISTINCT KEY._col1:2._col0), sum(DISTINCT KEY._col1:3._col0)
aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), sum(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2), sum(KEY._col1:0._col0), sum(KEY._col1:1._col0), sum(KEY._col1:2._col0), sum(DISTINCT KEY._col1:2._col0), sum(DISTINCT KEY._col1:3._col0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the first KEY._col0 doesn't conflict generated columns for DISTINCT UDAFs. So, technically, the problem would not happen without the change of this PR, which mean either seems to be acceptable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KEY._col0 had a special name because it is not only a distinct param but also an aggregation param. So, it is accessed with two types of names in GroupByOperator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants