Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Add debugging operator to identify skew in datasets inline #46490

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

robreeves
Copy link
Contributor

@robreeves robreeves commented May 8, 2024

What changes were proposed in this pull request?

This introduces a new debugging method to identify which values are producing skew in a dataset. I wrote it with joins in mind, but it can be used for any dataset.

Why are the changes needed?

Debugging skew join is a pain point. Once the skew is identified from the metrics (e.g. one partition has much more data), it is cumbersome to figure out exactly where that skew is coming from. Usually you need to write a manual aggregation and write the output to the logs. This streamlines the process by adding a method inlineColumnsCount to do this automatically, inferring the join keys to count.

The current solution still requires a code change. A useful extension to this would be to add a configuration like spark.sql.join.countkeyvals=true so this feature can be enabled without any code changes. It would have to be enabled for all joins, but for debugging that is okay. This can be useful for prod jobs and more generally anytime uploading a code change to test is a longer iteration cycle.

I would also like to add a SQL hint for pure SQL jobs.

Does this PR introduce any user-facing change?

Yes. It adds a new operator to count the value occurrence for a combination of columns. The counts will be periodically written to the executor logs.

How was this patch tested?

Manual testing:

import org.apache.spark.sql.execution.debug._

val a = spark.range(3)
val b = spark.range(100).withColumn("id2",when($"id" > 2, 2).otherwise($"id")).select($"id2")

a.join(b, $"id" === $"id2").inlineColumnsCount().collect

Column value counts after processing 10 rows
id#37L: 2,id2#41L: 2 = 8 (80%)
id#37L: 0,id2#41L: 0 = 1 (10%)
id#37L: 1,id2#41L: 1 = 1 (10%)
Column value counts after processing 20 rows
id#37L: 2,id2#41L: 2 = 18 (90%)
id#37L: 0,id2#41L: 0 = 1 (5%)
id#37L: 1,id2#41L: 1 = 1 (5%)
Column value counts after processing 30 rows
id#37L: 2,id2#41L: 2 = 28 (93%)
id#37L: 0,id2#41L: 0 = 1 (3%)
id#37L: 1,id2#41L: 1 = 1 (3%)
Column value counts after processing 40 rows
id#37L: 2,id2#41L: 2 = 38 (95%)
id#37L: 0,id2#41L: 0 = 1 (2%)
id#37L: 1,id2#41L: 1 = 1 (2%)
Column value counts after processing 50 rows
id#37L: 2,id2#41L: 2 = 48 (96%)
id#37L: 0,id2#41L: 0 = 1 (2%)
id#37L: 1,id2#41L: 1 = 1 (2%)
Column value counts after processing 60 rows
id#37L: 2,id2#41L: 2 = 58 (96%)
id#37L: 0,id2#41L: 0 = 1 (1%)
id#37L: 1,id2#41L: 1 = 1 (1%)
Column value counts after processing 70 rows
id#37L: 2,id2#41L: 2 = 68 (97%)
id#37L: 0,id2#41L: 0 = 1 (1%)
id#37L: 1,id2#41L: 1 = 1 (1%)
Column value counts after processing 80 rows
id#37L: 2,id2#41L: 2 = 78 (97%)
id#37L: 0,id2#41L: 0 = 1 (1%)
id#37L: 1,id2#41L: 1 = 1 (1%)
Column value counts after processing 90 rows
id#37L: 2,id2#41L: 2 = 88 (97%)
id#37L: 0,id2#41L: 0 = 1 (1%)
id#37L: 1,id2#41L: 1 = 1 (1%)
Column value counts after processing 100 rows
id#37L: 2,id2#41L: 2 = 98 (98%)
id#37L: 0,id2#41L: 0 = 1 (1%)
id#37L: 1,id2#41L: 1 = 1 (1%)

Here is an example of what the plan looks like:

== Parsed Logical Plan ==
DebugInlineColumnsCount
+- Join Inner, (id#54L = id2#58L)
   :- Range (0, 3, step=1, splits=Some(12))
   +- Project [id2#58L]
      +- Project [id#56L, CASE WHEN (id#56L > cast(2 as bigint)) THEN cast(2 as bigint) ELSE id#56L END AS id2#58L]
         +- Range (0, 100, step=1, splits=Some(12))

== Analyzed Logical Plan ==
id: bigint, id2: bigint
DebugInlineColumnsCount [id#54L, id2#58L]
+- Join Inner, (id#54L = id2#58L)
   :- Range (0, 3, step=1, splits=Some(12))
   +- Project [id2#58L]
      +- Project [id#56L, CASE WHEN (id#56L > cast(2 as bigint)) THEN cast(2 as bigint) ELSE id#56L END AS id2#58L]
         +- Range (0, 100, step=1, splits=Some(12))

== Optimized Logical Plan ==
DebugInlineColumnsCount [id#54L, id2#58L]
+- Join Inner, (id#54L = id2#58L)
   :- Range (0, 3, step=1, splits=Some(12))
   +- Project [CASE WHEN (id#56L > 2) THEN 2 ELSE id#56L END AS id2#58L]
      +- Range (0, 100, step=1, splits=Some(12))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- DebugInlineColumnsCount [id#54L, id2#58L]
   +- SortMergeJoin [id#54L], [id2#58L], Inner
      :- Sort [id#54L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#54L, 200), ENSURE_REQUIREMENTS, [plan_id=218]
      :     +- Range (0, 3, step=1, splits=12)
      +- Sort [id2#58L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id2#58L, 200), ENSURE_REQUIREMENTS, [plan_id=217]
            +- Project [CASE WHEN (id#56L > 2) THEN 2 ELSE id#56L END AS id2#58L]
               +- Range (0, 100, step=1, splits=12)

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant