Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Starting with 8e20e1ee, after broker goes down and back up, rd_kafka_destroy of groupconsumer hangs #4674

Open
Quuxplusone opened this issue Apr 5, 2024 · 9 comments

Comments

@Quuxplusone
Copy link
Contributor

Quuxplusone commented Apr 5, 2024

I'm pulling this out into a new Issue, because we keep discussing it in different random PRs/issues and it'll help to have just one place to discuss it.

On #4667 I wrote:

We observed that destroying a groupconsumer would often hang waiting for the broker thread to exit. We tediously bisected the problem to the specific commit 8e20e1e (the last commit before the v2.0.0rc1 tag). Only then did we find that a lot of people on GitHub were already complaining about that commit as introducing a resource leak: the commit adds a call to rd_kafka_toppar_keep that bumps the refcount of the toppar, and I don't immediately see a corresponding rd_kafka_toppar_destroy anywhere.

Reverting 8e20e1e (as in this commit) does fix the hang in groupconsumer destruction which we were observing, so we've applied this patch to our downstream library.

Fixes #4486.

Then in #4669 (comment) I wrote:

@emasab wrote:

@Quuxplusone in case it causes a hang, please try this fix [i.e. #4669] and in case it's still happening tell me how to reproduce it.

Okay, I just tried it. My employer's test case is red with librdkafka v2.3.0, green with librdkafka v2.3.0-plus-#4667, but unfortunately remains red with librdkafka v2.3.0-plus-#4669.

That's not to say that #4669 is useless or fails to fix a real bug; it just doesn't fix our bug.

Then @emasab asked:

Is your failing test doing something specific, like deleting a topic or using the cooperative-sticky assignor?

I can't get super detailed (because the layers of abstraction between librdkafka and the level of the test itself are all three of confusing/internal/proprietary), but I'm pretty sure we don't know anything about "assignors" so we're doing whatever the default is, there. The structure of our test is more or less:

  • Create two distinct rd_kafka_ts in the same process: one a groupconsumer on topic T1 and broker B1, and the other a producer on topic T2 on broker B2. (Actually, make that four: we have another (non-group) consumer running, and another producer, too, for other purposes. I think it's just four total.)
  • When making the groupconsumer: enable.auto.commit=false, enable.auto.offset.store=false, auto.offset.reset=smallest. Also, rd_kafka_conf_set_rebalance_cb to a callback that looks for RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS and remembers how many partitions it saw.
  • Set the groupconsumer consuming, i.e. loop on rd_kafka_consumer_poll until either it returns NULL or until it has returned RD_KAFKA_RESP_ERR__PARTITION_EOF on T1 as many times as there are partitions. (I suspect this second condition, and thus the rebalance callback, is irrelevant; but I don't know.)
  • Stop Kafka broker B1, so that the groupconsumer can't talk to Kafka anymore. Wait 3 minutes.
  • Restart Kafka broker B1. Wait 3 minutes.
  • (During all this, we're also pushing data to topic T1 on broker B1, so that the groupconsumer has stuff to consume. Also, each time the groupconsumer sees a message, we produce a message to the producer talking to B2 and then commit the offset to the groupconsumer via rd_kafka_commit(rk, toppar, true) where toppar is freshly created with rd_kafka_topic_partition_list_new and destroyed with rd_kafka_topic_partition_list_destroy immediately afterward.)
  • Shut down the groupconsumer: rd_kafka_consumer_close(rk). This seems to return RD_KAFKA_RESP_ERR_NO_ERROR.
  • Destroy the groupconsumer: rd_kafka_destroy(rk). This blocks here...
Thread 4 (Thread 0x7fded5ffb700 (LWP 30469)):
#0  0x00007fdeee498017 in pthread_join () from /lib64/libpthread.so.0
#1  0x00000000007ca5a2 in thrd_join ()
#2  0x0000000000734b56 in rd_kafka_destroy_app ()
[...]
  • which I believe is waiting for rd_kafka_thread_main to exit, but it's blocked...
Thread 3 (Thread 0x7fde5f7fe700 (LWP 30478)):
#0  0x00007fdeee498017 in pthread_join () from /lib64/libpthread.so.0
#1  0x00000000007ca5a2 in thrd_join ()
#2  0x0000000000732fef in rd_kafka_destroy_internal ()
#3  0x00000000007389a5 in rd_kafka_thread_main ()
#4  0x00000000007ca2d7 in _thrd_wrapper_function ()
#5  0x00007fdeee496ea5 in start_thread () from /lib64/libpthread.so.0
#6  0x00007fdeee1bfb0d in clone () from /lib64/libc.so.6
  • which I believe is waiting for one of these threads in rd_kafka_broker_thread_main but I'm not sure which thread. We have a total of 35 threads still extant at this point. Remember that the nongroup consumer and the two producers are all still running.
Thread 7 (Thread 0x7fde5d7fa700 (LWP 20537)):
#0  0x00007fdeee49ade2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00000000007ca469 in cnd_timedwait ()
#2  0x0000000000763771 in rd_kafka_q_pop_serve.localalias ()
#3  0x0000000000747198 in rd_kafka_broker_ops_io_serve ()
#4  0x00000000007481cf in rd_kafka_broker_serve ()
#5  0x0000000000748833 in rd_kafka_broker_thread_main ()
#6  0x00000000007ca2d7 in _thrd_wrapper_function ()
#7  0x00007fdeee496ea5 in start_thread () from /lib64/libpthread.so.0
#8  0x00007fdeee1bfb0d in clone () from /lib64/libc.so.6

Thread 6 (Thread 0x7fde5cff9700 (LWP 20538)):
#0  0x00007fdeee1b4ddd in poll () from /lib64/libc.so.6
#1  0x000000000075d25e in rd_kafka_transport_poll ()
#2  0x000000000075f10e in rd_kafka_transport_io_serve ()
#3  0x0000000000747119 in rd_kafka_broker_ops_io_serve ()
#4  0x00000000007477b9 in rd_kafka_broker_consumer_serve ()
#5  0x0000000000747ef1 in rd_kafka_broker_serve ()
#6  0x0000000000748585 in rd_kafka_broker_thread_main ()
#7  0x00000000007ca2d7 in _thrd_wrapper_function ()
#8  0x00007fdeee496ea5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007fdeee1bfb0d in clone () from /lib64/libc.so.6

Thread 2 (Thread 0x7fdeb8ff9700 (LWP 30480)):
#0  0x00007fdeee49ade2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00000000007ca469 in cnd_timedwait ()
#2  0x0000000000763771 in rd_kafka_q_pop_serve.localalias ()
#3  0x0000000000747198 in rd_kafka_broker_ops_io_serve ()
#4  0x00000000007477b9 in rd_kafka_broker_consumer_serve ()
#5  0x0000000000747ef1 in rd_kafka_broker_serve ()
#6  0x0000000000748833 in rd_kafka_broker_thread_main ()
#7  0x00000000007ca2d7 in _thrd_wrapper_function ()
#8  0x00007fdeee496ea5 in start_thread () from /lib64/libpthread.so.0
#9  0x00007fdeee1bfb0d in clone () from /lib64/libc.so.6

Anyway, I don't think this gives you enough to go on, but at least we have a central place to talk about the hang now, instead of scattering it over different PRs.

@emasab
Copy link
Collaborator

emasab commented Apr 8, 2024

@Quuxplusone thanks for the description of the problem. Will try to reproduce it, maybe it's linked to stopping and restarting the broker. Could you try it without rd_kafka_conf_set_rebalance_cb to see if it happens too?

@emasab
Copy link
Collaborator

emasab commented Apr 8, 2024

Some issues that could be related
#4519
#4131
#3954

@Quuxplusone
Copy link
Contributor Author

@Quuxplusone thanks for the description of the problem. Will try to reproduce it, maybe it's linked to stopping and restarting the broker. Could you try it without rd_kafka_conf_set_rebalance_cb to see if it happens too?

With vanilla librdkafka v2.3.0, but with our custom rd_kafka_conf_set_rebalance_cb removed from the groupconsumer, our test is still red. With v2.3.0 plus removing our rd_kafka_conf_set_rebalance_cb plus adding #4667, our test remains green. (IOW, whatever our weird rebalance callback is doing, and regardless of whether it's necessary for us in general, it seems irrelevant to this specific test.)

@Mrigank11
Copy link

@Quuxplusone we're also facing similar issues in our use-cases, but it's very sporadic. We're working on a stable reproducer to move further. Checking if you have a public reproducer that you could share?

Currently, I'm trying with the following test-setup, but the issue is reproduced very rarely:

  • start a cluster with 3 brokers, 1 topic, 1000 partitions
  • push 10 messages to the topic
  • start consumer, read 5 messages
  • stop kafka cluster, wait for 4 minutes, and restart the cluster
  • read 5 more messages from the consumer
  • close the consumer

Any ideas for improving the reproducer would be very helpful.

@Quuxplusone
Copy link
Contributor Author

@Quuxplusone we're also facing similar issues in our use-cases, but it's very sporadic. We're working on a stable reproducer to move further. Checking if you have a public reproducer that you could share?

Nope, I never came up with a reproducer. But we haven't seen the problem at all since we applied #4667.
I can add that our test topic definitely didn't have 1000 partitions, and off the top of my head I'm pretty sure it just had 1. Likewise I'm pretty sure we have only 1 broker.

@Mrigank11
Copy link

Thanks for the inputs @Quuxplusone. Using the steps mentioned in my previous comment, I could sporadically reproduce the issue. I tried collecting valgrind traces and noticed a definite memory leak in the cases where I could reproduce the issue:

==3234364== 6,836 (1,000 direct, 5,836 indirect) bytes in 1 blocks are definitely lost in loss record 216 of 223
==3234364==    at 0x4C3CE4B: calloc (vg_replace_malloc.c:1328)
==3234364==    by 0xA02869A: rd_calloc (rd.h:161)
==3234364==    by 0xA02869A: rd_kafka_topic_new0 (rdkafka_topic.c:333)
==3234364==    by 0xA07C92A: rd_kafka_toppar_get2 (rdkafka_partition.c:443)
==3234364==    by 0xA07FC16: rd_kafka_topic_partition_ensure_toppar (rdkafka_partition.c:2964)
==3234364==    by 0xA088476: rd_kafka_assignment_add (rdkafka_assignment.c:732)
==3234364==    by 0xA0664EA: rd_kafka_cgrp_assign (rdkafka_cgrp.c:3659)
==3234364==    by 0xA07494B: rd_kafka_cgrp_handle_assign_op (rdkafka_cgrp.c:4858)
==3234364==    by 0xA07494B: rd_kafka_cgrp_op_serve (rdkafka_cgrp.c:5023)
==3234364==    by 0xA03DD0E: rd_kafka_q_serve (rdkafka_queue.c:553)
==3234364==    by 0xA00D6F3: rd_kafka_thread_main (rdkafka.c:2117)
==3234364==    by 0x55BF2D1: start_thread (pthread_create.c:476)
==3234364==    by 0x5F9AE72: clone (clone.S:95)

Is it of any help in debugging the root cause? This memory leak isn't present when the test-program exits cleanly. Noting that I've also used @Mekk's patch, to trigger the memory leak otherwise it'd be very difficult to identify from the dozen's of objects in the heap.

Cc: @emasab @edenhill

@emasab
Copy link
Collaborator

emasab commented May 15, 2024

@Quuxplusone could you verify these things?

  • if it does happen calling only rd_kafka_destroy without closing the consumer first. That will close the consumer too, but after setting a flag.
  • that all these objects are destroyed before calling rd_kafka_destroy(), even in corner cases

For C, make sure the following objects are destroyed prior to calling
rd_kafka_consumer_close() and rd_kafka_destroy():

  • rd_kafka_message_t
  • rd_kafka_topic_t
  • rd_kafka_topic_partition_t
  • rd_kafka_topic_partition_list_t
  • rd_kafka_event_t
  • rd_kafka_queue_t

@Quuxplusone
Copy link
Contributor Author

  • that all these objects are destroyed before calling rd_kafka_destroy(), even in corner cases

For C, make sure the following objects are destroyed prior to calling
rd_kafka_consumer_close() and rd_kafka_destroy():

  • rd_kafka_message_t
  • rd_kafka_topic_t
  • rd_kafka_topic_partition_t
  • rd_kafka_topic_partition_list_t
  • rd_kafka_event_t
  • rd_kafka_queue_t

Could you explain how to verify that? (Especially since in this case we have multiple independent rd_kafka_t objects alive at once.)
Our working hypothesis is still that #4667 fixes a leak of one rd_kafka_toppar_keep. All our observations are consistent with that hypothesis, so after applying #4667 downstream and seeing no more hangs, I haven't looked at this in quite a while. If it's possible to detect leaked toppars, such that I could follow your advice above to make sure I wasn't leaking anything... could you apply the same approach to determine whether librdkafka is leaking?

@emasab
Copy link
Collaborator

emasab commented May 21, 2024

@Quuxplusone given 8e20e1e adds a reference from an rd_kafka_op_t of type BARRIER to the topic partition in case you're forwarding a partition to a queue you should verify that queue destruction happens before calling destroy on those rd_kafka_t objects.

I've created PR #4724 to fix that case I was mentioning in #4667 . You can try it to see if it fixes your case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants