Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Event Hub Consumer Group and Partition Id issue with spark streaming #655

Open
milinddhotarkar opened this issue Oct 10, 2022 · 2 comments

Comments

@milinddhotarkar
Copy link

I have a use case where I need to consume the 1 million events per second from Event Hub using spark streaming. I have created Event Hub with 10 partitions and 10 consumer groups to read the events in parallel using 10 spark streaming jobs for 1 CG per Partitions. Here the problem is each consumer reads all the events from all the partitions which create an issue of duplicate data. Ideally It should read the events from specified partition. I think It is a bug. I am using data bricks with PySpark streaming to consume the events Please help how to resolve the issue.

Feature Requests:

  • What issue are you trying to solve?
  • How do you want to solve it?
  • What is your use case for this feature?

Bug Report:

  • Actual behavior
  • Expected behavior
  • Spark version Azure Data Bricks 10.4 LTS
  • spark-eventhubs artifactId and version
@rachelxj-ms
Copy link

rachelxj-ms commented Oct 11, 2023

I got the same issue while I use this library in Azure Data bricks.
positionKey1 = {
"ehName": ehName,
"partitionId": 2
}
positionMap = {
json.dumps(positionKey1) : eventPosition1
}
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)

23/10/10 09:22:18 INFO EventHubsRDD: (TID 578) Computing EventHubs test, partition 2 sequence numbers 5 => 6
23/10/10 09:22:18 INFO EventHubsRDD: (TID 577) Beginning sequence number 66 is equal to the ending sequence number 66. Returning empty partition for EH: test on partition: 1
23/10/10 09:22:18 INFO EventHubsRDD: (TID 579) Beginning sequence number 3 is equal to the ending sequence number 3. Returning empty partition for EH: test on partition: 3
23/10/10 09:22:18 INFO CachedEventHubsReceiver: (TID 578) EventHubsCachedReceiver look up. For namespaceUri sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition {"ehName":"test","partitionId":2} consumer group $Default. requestSeqNo: 5, batchSize: 1
23/10/10 09:22:18 INFO CachedEventHubsReceiver: (TID 578) Finished receiving for namespaceUri: sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition: {"ehName":"test","partitionId":2} consumer group: $Default, batchSize: 1, elapsed time: 0 ms
.........................
23/10/10 09:22:21 INFO EventHubsRDD: (TID 581) Computing EventHubs test, partition 0 sequence numbers 64 => 65
23/10/10 09:22:21 INFO EventHubsRDD: (TID 583) Beginning sequence number 6 is equal to the ending sequence number 6. Returning empty partition for EH: test on partition: 2
23/10/10 09:22:21 INFO EventHubsRDD: (TID 584) Beginning sequence number 3 is equal to the ending sequence number 3. Returning empty partition for EH: test on partition: 3
23/10/10 09:22:21 INFO CachedEventHubsReceiver: (TID 581) EventHubsCachedReceiver look up. For namespaceUri sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition {"ehName":"test","partitionId":0} consumer group $Default. requestSeqNo: 64, batchSize: 1
23/10/10 09:22:21 INFO CachedEventHubsReceiver: (TID 581) Finished receiving for namespaceUri: sb://eventhub0929.servicebus.windows.net/ EventHubNameAndPartition: {"ehName":"test","partitionId":0} consumer group: $Default, batchSize: 1, elapsed time: 0 ms
23/10/10 09:22:21 INFO CodeGenerator: Code generated in 16.534578 ms

compute didn't filter the partition based on the ehConf configuration. It still receives events from all partitions. Please help fix it.
https://github.com/Azure/azure-event-hubs-spark/blob/master/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala#L101

@rachelxj-ms
Copy link

Hi @yamin-msft, can you please help with this ticket?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants