diff --git a/src/Makefile b/src/Makefile index a66af12df..692f2be7d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -383,7 +383,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o fifo.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o fifo.o bjm.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/bjm.c b/src/bjm.c new file mode 100644 index 000000000..4c61509a2 --- /dev/null +++ b/src/bjm.c @@ -0,0 +1,276 @@ +/* + * Background Job Manager - submit jobs to a background thread. + */ +#include "fmacros.h" +#include +#include +#include +#include + +#include "bjm.h" +#include "fifo.h" +#include "zmalloc.h" +#include "atomicvar.h" +#include "serverassert.h" +#include "server.h" // Gah! Hate to pull this in. CPU affinity & BIO config. + +static const unsigned BJM_THREAD_STACK_SIZE = 4 * 1024 * 1024; +static const int INITIAL_FUNCTION_CAPACITY = 8; + +static int functionsCount; +static int functionsCapacity; +static bjmJobFunc *functions; // An array of function pointers. Index matches job lists. + +// A FIFO queue with a mutex to protect access +typedef struct { + pthread_mutex_t mutex; + Fifo *fifo; +} MutexFifo; + +// A Joblist contains a specific function to be executed with a list of privdata +typedef struct { + bjmJobFunc func; // The callback function for the jobs + MutexFifo jobs; // The contained list of jobs (privdata values) + serverAtomic long job_count; // Might be greater than length(jobs). It includes in-progress. +} Joblist; + +// This arrays hold a Joblist* for each known callback function. +static Joblist **jobsByFunc; // Array indexed by index in functions[] + +// This FIFO queue hold Joblists from the array above. Each time one of those +// Joblists becomes non-empty, it gets added to the active queue. +static MutexFifo activeJoblists; + +static serverAtomic long queuedJobCount; +static serverAtomic long processedJobCount; + +static pthread_cond_t wakeup_cond; // Triggered when jobs are submitted + +static int threadCount = 0; +static pthread_t *threads; // Array of threads + + +static void mutexFifoInit(MutexFifo *q) { + pthread_mutex_init(&q->mutex, NULL); + q->fifo = fifoCreate(); +} + + +static void mutexFifoLock(MutexFifo *q) { + pthread_mutex_lock(&q->mutex); +} + + +static void mutexFifoUnlock(MutexFifo *q) { + pthread_mutex_unlock(&q->mutex); +} + + +static void increaseFunctionCapacity(void) { + functionsCapacity *= 2; + functions = zrealloc(functions, functionsCapacity * sizeof(bjmJobFunc)); + jobsByFunc = zrealloc(jobsByFunc, functionsCapacity * sizeof(Joblist*)); +} + + +// Find the function's index. Adds the function if it's a new one. +static int getFuncIdx(bjmJobFunc func) { + // It's expected that the function count is small, probably spanning only 1 or 2 cache lines. + // A simple linear search will be faster than a complex structure like hash. + for (int i = 0; i < functionsCount; i++) { + if (functions[i] == func) return i; + } + + // At this point, we know that the function isn't in the list. Insert at end. + if (functionsCount == functionsCapacity) increaseFunctionCapacity(); + int idx = functionsCount++; + functions[idx] = func; + jobsByFunc[idx] = zmalloc(sizeof(Joblist)); + mutexFifoInit(&jobsByFunc[idx]->jobs); + jobsByFunc[idx]->func = func; + atomicSet(jobsByFunc[idx]->job_count, 0); + return idx; +} + + +/* Pull one job from the active joblists. Synchronously waits for a job if none available. + * privdata_ptr - returns the caller supplied privdata. + * joblist_ptr - returns the joblist that the job was taken from. This is needed by the caller + * in order to (later) decrement the job_count. + * Returns: + * Returns the bjmJobFunc to be called. + */ +static bjmJobFunc waitForJob(void **privdata_ptr, Joblist **joblist_ptr) { + bjmJobFunc func = NULL; + + mutexFifoLock(&activeJoblists); + while (fifoLength(activeJoblists.fifo) == 0) { + pthread_cond_wait(&wakeup_cond, &activeJoblists.mutex); + } + + Joblist *joblist = fifoPeek(activeJoblists.fifo); + func = joblist->func; + *joblist_ptr = joblist; + + mutexFifoLock(&joblist->jobs); + *privdata_ptr = fifoPop(joblist->jobs.fifo); + + if (fifoLength(joblist->jobs.fifo) == 0) { + // No jobs left for this function + fifoPop(activeJoblists.fifo); + } else if (fifoLength(activeJoblists.fifo) > 1) { + // Rotate the joblist for this function to the end + fifoPop(activeJoblists.fifo); + fifoPush(activeJoblists.fifo, joblist); + } + // Keep the lock on the individual joblist until it is properly handled in + // the activeJobLists. Can't have the size changing. + mutexFifoUnlock(&joblist->jobs); + mutexFifoUnlock(&activeJoblists); + + return func; +} + + +static void *pthreadFunction(void *arg) { + int threadNum = (intptr_t)arg; + + const int MAX_THREAD_NAME = 16; + char thread_name[MAX_THREAD_NAME]; + snprintf(thread_name, MAX_THREAD_NAME, "bjm thread %d", threadNum); + valkey_set_thread_title(thread_name); + + serverSetCpuAffinity(server.bio_cpulist); + + makeThreadKillable(); + + /* Block SIGALRM so only the main thread will receive the watchdog signal. */ + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGALRM); + if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) { + serverLog(LL_WARNING, "Warning: can't mask SIGALRM in BJM thread: %s", strerror(errno)); + } + + while (1) { + void *privdata; + Joblist *joblist; + bjmJobFunc func = waitForJob(&privdata, &joblist); + + func(privdata); // Execute the callback + atomicDecr(joblist->job_count, 1); // Decrement count AFTER callback finishes + atomicDecr(queuedJobCount, 1); + atomicIncr(processedJobCount, 1); + } + + return NULL; +} + + +// API +void bjmInit(int numThreads) { + if (threadCount == numThreads) return; // Silently skip to support testing + assert(threadCount == 0); // But don't allow changing the number of threads + + functionsCount = 0; + functionsCapacity = INITIAL_FUNCTION_CAPACITY; + + functions = zmalloc(functionsCapacity * sizeof(bjmJobFunc)); + jobsByFunc = zmalloc(functionsCapacity * sizeof(Joblist*)); + + atomicSet(queuedJobCount, 0); + atomicSet(processedJobCount, 0); + + mutexFifoInit(&activeJoblists); + + pthread_cond_init(&wakeup_cond, NULL); + + threadCount = numThreads; + threads = zmalloc(threadCount * sizeof(pthread_t)); + + pthread_attr_t attr; + size_t stacksize; + pthread_attr_init(&attr); + pthread_attr_getstacksize(&attr, &stacksize); + if (stacksize < BJM_THREAD_STACK_SIZE) stacksize = BJM_THREAD_STACK_SIZE; + pthread_attr_setstacksize(&attr, stacksize); + + for (int i = 0; i < threadCount; i++) { + void *arg = (void*)(intptr_t)i; + if (pthread_create(&threads[i], &attr, pthreadFunction, arg) != 0) { + serverLog(LL_WARNING, "Fatal: Can't initialize background jobs."); + exit(1); + } + } +} + + +// API +bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func) { + return getFuncIdx(func) + 1; // +1 to avoid 0 (uninitialized static) being a valid value +} + + +// API +void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata) { + Joblist *joblist = jobsByFunc[funcHandle - 1]; + + mutexFifoLock(&joblist->jobs); + fifoPush(joblist->jobs.fifo, privdata); + atomicIncr(joblist->job_count, 1); + atomicIncr(queuedJobCount, 1); + if (fifoLength(joblist->jobs.fifo) == 1) { + // Reader threads take the activeJobists lock before the joblist lock. But this can't + // cause deadlock because this joblist isn't in the active joblist yet. + mutexFifoLock(&activeJoblists); + fifoPush(activeJoblists.fifo, joblist); + mutexFifoUnlock(&activeJoblists); + } + pthread_cond_signal(&wakeup_cond); + mutexFifoUnlock(&joblist->jobs); +} + + +// API +void bjmKillThreads(void) { + for (int i = 0; i < threadCount; i++) { + if (threads[i] == pthread_self()) continue; + if (pthread_cancel(threads[i]) == 0) { + int err = pthread_join(threads[i], NULL); + if (err == 0) { + serverLog(LL_WARNING, "BJM thread #%d terminated", i); + } else { + serverLog(LL_WARNING, "BJM thread #%d can not be joined: %s", i, strerror(err)); + } + } + } +} + + +// API +long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle) { + long jobCount; + atomicGet(jobsByFunc[funcHandle - 1]->job_count, jobCount); + return jobCount; +} + + +// API +sds bjmCatInfo(sds info) { + long queuedJobs, processedJobs; + atomicGet(queuedJobCount, queuedJobs); + atomicGet(processedJobCount, processedJobs); + + info = sdscatprintf(info, + "# BackgroundJobManager\r\n" + "bjm_num_threads:%d\r\n" + "bjm_num_callbacks:%d\r\n" + "bjm_jobs_in_queue:%ld\r\n" + "bjm_processed_jobs:%ld\r\n", + threadCount, + functionsCount, + queuedJobs, + processedJobs + ); + return info; +} diff --git a/src/bjm.h b/src/bjm.h new file mode 100644 index 000000000..cda2a5999 --- /dev/null +++ b/src/bjm.h @@ -0,0 +1,63 @@ +/* + * Background Job Manager - submit jobs to a background thread. + */ + +#ifndef __BJM_H__ +#define __BJM_H__ + +#include "sds.h" + + +/* Initialize BJM with the requested number of background threads. + */ +void bjmInit(int numThreads); + + +/* Provided job functions will be executed on a different thread and passed the provided privdata. + */ +typedef void (*bjmJobFunc)(void *privdata); + +/* After registering a function, the returned function handle can be used to submit jobs. + */ +typedef int bjmJobFuncHandle; + + +/* Register a job function which can process background jobs. A handle is returned for submitting + * jobs & gathering metrics. This function is idempotent - submitting the same function again will + * return the same handle. Handle values will be > 0, so this pattern can be used: + * + * static bjmJobFuncHandle myHandle; + * if (!myHandle) myHandle = bjmRegisterJobFunc(myFunc); + * bjmSubmitJob(myHandle, ...); + * + * This co-locates a static variable at the point of job submission, and avoids repeated + * registration calls. + */ +bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func); + + +/* Submit a job to BJM. The provided function will be executed on a background thread. privdata + * will be provided as a parameter to the provided function. For fairness, jobs with different + * callback functions will be executed in round-robin fashion. Since jobs are executed across + * multiple threads, there is no guarantee as to ordering or exclusion between jobs. + */ +void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata); + + +/* Kill all threads in an unclean way. Non-recoverable. + * Only used during collection of debug information. + */ +void bjmKillThreads(void); + + +/* Count the number of pending/active jobs for the given job function. + * Note that this value is highly volatile as background threads are processing the jobs. + */ +long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle); + + +/* Provide metrics data for INFO + */ +sds bjmCatInfo(sds info); + +#endif