Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured streaming job hangs after a while #665

Open
mschou1306 opened this issue Jan 5, 2023 · 0 comments
Open

Structured streaming job hangs after a while #665

mschou1306 opened this issue Jan 5, 2023 · 0 comments

Comments

@mschou1306
Copy link

Setup (what does our program do)

In short, we use an Azure event hub as a source for structured streaming, where we for each batch save parts of the data to different folders in a Delta Lake - the folder(s) depend on a column in the batch.

Pseudo-code:

1. Read stream from event hub
2. foreach batch
    2.1 Persist batch
    2.2 Collect list of folder names (given by column in batch)
    2.3 foreach folder name (using Scala foreach)
        2.3.1 Filter batchdata by column name
        2.3.2 Save filtered data to Delta Lake with path $base_path/$folder_name
    2.4 Unpersist batch

I can provide proper code examples, if necessary.

Issue

After running for several hours, the program will start to slow down, and at some point will hang completely until it is manually restarted.

Taking a heap dump after the program has started to hang, we can see that org.apache.qpid.proton.reactor.impl.ReactorImpl occupies 95% of the heap, and looking at the corresponding stacktrace (shown below), we see mentions of azure.eventhubs.impl.EventHubClientImpl, leading us to believe that it may have something to do with this library.

Heap dump stacktrace
at sun.nio.ch.NativeThread.current()J (Native Method)
  at sun.nio.ch.SinkChannelImpl.write(Ljava/nio/ByteBuffer;)I (SinkChannelImpl.java:165)
  at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.signalWorkQueue()V (ReactorDispatcher.java:97)
  at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.invoke(ILcom/microsoft/azure/eventhubs/impl/DispatchHandler;)V (ReactorDispatcher.java:72)
  at com.microsoft.azure.eventhubs.impl.Timer.schedule(Ljava/lang/Runnable;Ljava/time/Duration;)Ljava/util/concurrent/CompletableFuture; (Timer.java:26)
  at com.microsoft.azure.eventhubs.impl.EventHubClientImpl.managementWithRetry(Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture; (EventHubClientImpl.java:384)
  at com.microsoft.azure.eventhubs.impl.EventHubClientImpl.lambda$getPartitionRuntimeInformation$5(Ljava/util/Map;)Ljava/util/concurrent/CompletionStage; (EventHubClientImpl.java:354)
  at com.microsoft.azure.eventhubs.impl.EventHubClientImpl$$Lambda$1814.apply(Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source)
  at java.util.concurrent.CompletableFuture.uniCompose(Ljava/util/concurrent/CompletableFuture;Ljava/util/function/Function;Ljava/util/concurrent/CompletableFuture$UniCompose;)Z (CompletableFuture.java:966)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:940)
  at java.util.concurrent.CompletableFuture.postComplete()V (CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.postFire(Ljava/util/concurrent/CompletableFuture;I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:575)
  at java.util.concurrent.CompletableFuture$UniApply.tryFire(I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:594)
  at java.util.concurrent.CompletableFuture$Completion.run()V (CompletableFuture.java:456)
  at java.util.concurrent.Executors$RunnableAdapter.call()Ljava/lang/Object; (Executors.java:511)
  at java.util.concurrent.FutureTask.run()V (FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Ljava/util/concurrent/ScheduledThreadPoolExecutor$ScheduledFutureTask;)V (ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run()V (ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:624)
  at java.lang.Thread.run()V (Thread.java:748)

How have we tried to solve the problem?

Since we perform several actions (including several writes) in each batch, we ensure that we persist/unpersist the batch data, as per your documentation. Furthermore, have significantly increased spark.locality.wait to ensure locality, and we can see that all tasks are being executed with PROCESS_LOCAL locality level.

For reference, here are the various versions we use:

  • Spark version: 3.2.2
  • Artifact id: azure-eventhubs-spark_2.12
  • Package version: 2.3.22

Can you help with where to go from here? Is this an issue with our code, or a bug in the library?

Please let me know if you have any questions, and thanks in advance for your help 🙂

@mschou1306 mschou1306 changed the title Program hangs after a while Structured streaming job hangs after a while Jan 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant