-
Notifications
You must be signed in to change notification settings - Fork 508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MOD-6572 replace semaphore with wait in jobq #4446
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4446 +/- ##
=======================================
Coverage 86.25% 86.25%
=======================================
Files 190 190
Lines 34822 34822
=======================================
+ Hits 30035 30037 +2
+ Misses 4787 4785 -2 ☔ View full report in Codecov by Sentry. |
wait and drain are modified accordingly.
sembm om dim - 768
rename priority_queue->num_threads_working to num_threads_not_idle uncomment LOG remove script
* add admin priority queue *add threads manager to send admin jobs to threads *not lazy gc init to avoid bg intialization
remove threads array from thpool struct
change thpool state to THPOOL_UNINITIALIZED in redisearch_thpool_terminate_when_empty wait for num working threads to be 0 in redisearch_thpool_terminate_pause_threads thpool cpp tests: TestTerminateWhenEmpty: test recreating the threads after temination new test: TestPauseResume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@@ -19,7 +19,7 @@ int ConcurrentSearch_CreatePool(int numThreads) { | |||
} | |||
int poolId = array_len(threadpools_g); | |||
threadpools_g = array_append(threadpools_g, redisearch_thpool_create(numThreads, DEFAULT_PRIVILEGED_THREADS_NUM, | |||
LogCallback)); | |||
LogCallback, "coord")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the pool name as a parameter. I would at least add a comment about why we chose this name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 rows above the array explictly initialized to size 1 with a comment "Only used by the coordinator, so 1 is enough"
deps/thpool/thpool.c
Outdated
|
||
/* Initialize threads if needed */ | ||
redisearch_thpool_verify_init(thpool_p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving into priority_queue_push_chain
to avoid duplications
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's the priority queue's responsibility to initialize the threads.
- I added a wrapper function
priority_queue_push_chain_init_threads
to the threadpool section priority_queue_push_chain_unsafe
: replacedredisearch_thpool_t *
argument that was used only to get the priority queue, with apriority_queue *
…lback, triggred by redis from the main thread remove thpool_init. not in use renaming of thpool structs' members remove priority_push_chain and introduce redisearch_thpool_push_chain_init_threads that encapsulate both safe push to the queue and threads verify init pthread_varrier: add volatile to cycle test_multithread: replace algo, datatype loop with an automatic generation of a function for each permutation (algo, datatype) as Test_burst_threads_sanity class attribute Thanks @GuyAv46
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very well done
This PR significantly modifies the threadpool's pulling mechanism, setting the stage for more flexible runtime configurations.
Changing pulling synchronization machanism
Previously, the threadpool was synchronized using a semaphore-based waiting mechanism for job retrieval. This PR replaces that mechanism with a condition variable.
The revised job pulling mechanism operates as follows:
2. Threads enter a waiting state on a condition variable when the job queue is both empty and running.
3. Upon adding a new job to the queue:
Configure threadpool at runtime
Threadpool state
can be either
initialized
- hasn_threads
valid and ready to pull threads,or
uninitialized
- some or all of the threads may have exited.Threadpool state is set to
uninitialized
whenterminate_when_empty
ordestroy
are called.Removed
threads_all_idle
condition variabledrain
andwait
are internally implemented with a busy wait.calling
wait()
is equivalent to callingdrain(threshold = 0, yieldCB = nullptr)
Thread state
A thread can be in one of three states:
running
,terminate_when_empty
, ordead
.The thread state can be configured by pushing a
change_state_job
to the admin priority queue.Pause and resume
When pausing the threadpool, the jobq state is changed to
paused
. pause function will return when there are no more jobs in progress (i.enum_working_threads
== 0)Intialization
The new design assumes that the threadpool is initialized by the main thread.
All threadpool initialization is lazy and occurs upon the first push to the queue.
Since the GC pushes to the jobq from the bg, it breaks this assumption. Hence, the GC thpool initialized upon module startup.
Auestethic changes
Add name to the threadpool
The threadpool name is used to set the thepool's threads names.
a thread name is
<thpool_name>-<rand_id>.
As the maximum number of threads per thpool is 10,000,
rand_id
is a random number between 0 to 9,999.Remove
thpool->threads
arrayThis array was unnecessary and has been removed.