New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28254: CBO (Calcite Return Path): Multiple DISTINCT leads to wrong results #5245
base: master
Are you sure you want to change the base?
Conversation
// The original internal name is already obsolete as any DISTINCT keys are renamed. | ||
rsUDAFParamName = distinctColumnMapping.get(rsUDAFParamColInfo); | ||
} else { | ||
rsUDAFParamName = rsUDAFParamColInfo.getInternalName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInternalName
can retain KEY.col0:{index of RS keys}.col0
. It conflicts with the name generated by the following logic. So, the one in getInternanlName
could be no more available in GroupByOperator.
distinctColumnName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName
+ ":" + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j);
Non-Return Path is likely to resolve the correct column name with RowResolver. This PR would implement the logic by traversing params with two paths.
Why do we need the two iterations? That's because the non-distinct UDAF may refer to the input of DISTINCT UDAF which comes later like below.
SELECT
SUM(DISTINCT col1),
COUNT(DISTINCT col1),
SUM(col2), -- This has to refer to the key for `SUM(DISTINCT col2)`
MAX(DISTINCT col1),
SUM(DISTINCT col2),
MIN(DISTINCT col1)
FROM test;
Note that this method is used only when hive.map.aggr
is disabled. With it enabled, non-distinct UDAFs don't refer to DISTINCT keys as they are deduplicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@okumin
I'm not familiar with the logic of distinct key generation. IIUC the issue is caused by that some of these keys are conflicting with non-distinct keys when we are running the query you mentioned.
Could you please elaborate distinct key generation logic?
Why can we construct keys like this
distinctColumnName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName + ":" + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j);
I was checking the legacy non-cbo return patch code, it also has this
hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Lines 5372 to 5379 in a1420ed
if (isDistinct && lastKeyColName != null) { | |
// if aggr is distinct, the parameter is name is constructed as | |
// KEY.lastKeyColName:<tag>._colx | |
paraExpression = Utilities.ReduceField.KEY.name() + "." + | |
lastKeyColName + ":" + numDistinctUDFs + "." + | |
getColumnInternalName(i - 1); | |
} |
but I still don't understand why can we reference columns which are not coming any schema or what is the schema which contains these generated keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not. Let me dive into the current behavior. Please feel free to give me an additional question. I will try to give a supplementary information or investigate more.
Sample query
I will use the following query to illustrate a specific case.
CREATE TABLE test (agg_key STRING, udaf_key_1 INT, udaf_key_2 INT);
set hive.cbo.returnpath.hiveop=true;
set hive.map.aggr=false;
SELECT
agg_key,
SUM(DISTINCT udaf_key_1),
COUNT(DISTINCT udaf_key_1),
SUM(DISTINCT udaf_key_2),
SUM(udaf_key_2)
FROM test
GROUP BY agg_key;
How to encode keys on ReduceSinkOperator
ReduceSinkOperator stores (sort) keys and one union of all DISTINCT keys in the list of keys.
For the sample query, the actual ReduceSinkDesk is an object like below.
ReduceSinkDesk {
keyCols = [agg_key, udaf_key_1, udaf_key_2]
outputKeyColumnNames = [_col0, _col1]
distinctColumnIndices = [1, 1, 2]
numDistributionKeys = 1
}
Then, the following struct will be generated as a list of keys.
{
_col0 = Text for agg_key
_col1 = UnionObject for (udaf_key_1, udaf_key_1, udaf_key_2)
}
And its RowSchema of RS is [KEY._col0, KEY._col1:0._col0, KEY._col1:1._col0]
. To be honest, I am not sure how this is configured, but it makes sense to have deduplicated expressions.
KEY.%s:%s.%s
is an expression for a UNION column.
How Return Path configures UDAF keys
As you see, HiveGBOpConvUtil names UDAFs keys as below.
- If it is a distinct key, new names are assigned,
KEY.{the final output col name}:{position of UDAF}.{position of arguments of the UDAF}
KEY._col1:0.col0
for the arg ofSUM(DISTINCT udaf_key_1)
KEY._col1:1.col0
for the arg ofCOUNT(DISTINCT udaf_key_1)
KEY._col1:2.col0
for the arg ofSUM(DISTINCT udaf_key_2)
- If it is not a distinct key, the internal name of ReduceSink is used as it is
KEY._col1:1._col0
for the arg ofSUM(udaf_key_2)
, derived from naming of RSO,[KEY._col0, KEY._col1:0._col0, KEY._col1:1._col0]
- This conflicts with the arg of
COUNT(DISTINCT udaf_key_1)
- This conflicts with the arg of
Problem
The configured param names must be ones for the world of GroupByOperator. But the current implementation uses a name in the world of ReduceSinkOperator. In the worst case, the name in ReduceSinkOperator can conflict with the name for a newly created GroupByOperator, and can access a wrong column.
Questions
- Why does it happen only with
hive.map.aggr=false
?- With
hive.map.aggr=true
, it is basically impossible to reuse an expression of a DISTINCT key that could be deduplicated
- With
- Why GroupByOperator needs to rename the source column for UNION?
- Not sure yet
3cbf370
to
3afb655
Compare
Quality Gate passedIssues Measures |
Execution mode: llap | ||
Reduce Operator Tree: | ||
Group By Operator | ||
aggregations: sum(DISTINCT KEY._col0:0._col0), count(DISTINCT KEY._col0:1._col0), sum(KEY._col0:3._col0), max(DISTINCT KEY._col0:2._col0), sum(DISTINCT KEY._col0:3._col0), min(DISTINCT KEY._col0:4._col0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this patch, the third UDAF is not sum(KEY._col0:3._col0)
but sum(KEY._col0:1._col0)
, which means it reads a different column.
3681775#diff-c0641a94a4e5e0c28830fcdccd252506a0dc7842a24aa414bf096a229e8e4235R262
With set hive.cbo.returnpath.hiveop=false;
, Hive generates the same plan as that in q.out.
@@ -811,7 +811,7 @@ STAGE PLANS: | |||
Execution mode: llap | |||
Reduce Operator Tree: | |||
Group By Operator | |||
aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), sum(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2), sum(KEY._col1:0._col0), sum(KEY._col1:1._col0), sum(KEY._col0), sum(DISTINCT KEY._col1:2._col0), sum(DISTINCT KEY._col1:3._col0) | |||
aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), sum(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2), sum(KEY._col1:0._col0), sum(KEY._col1:1._col0), sum(KEY._col1:2._col0), sum(DISTINCT KEY._col1:2._col0), sum(DISTINCT KEY._col1:3._col0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the first KEY._col0
doesn't conflict generated columns for DISTINCT UDAFs. So, technically, the problem would not happen without the change of this PR, which mean either seems to be acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KEY._col0
had a special name because it is not only a distinct param but also an aggregation param. So, it is accessed with two types of names in GroupByOperator.
What changes were proposed in this pull request?
This PR would resolve the issue that causes a UDAF accesses a wrong key.
https://issues.apache.org/jira/browse/HIVE-28254
Why are the changes needed?
Bug fix. Without this fix, #5091 would generate wrong results.
Does this PR introduce any user-facing change?
Literally no. Affected queries are not generating correct results.
Is the change a dependency upgrade?
No.
How was this patch tested?
Added a qtest.