Skip to content

Commit

Permalink
bjm: Background Job Manager
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Brunner <brunnerj@amazon.com>
  • Loading branch information
JimB123 committed Apr 23, 2024
1 parent 928459d commit b0d6261
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Makefile
Expand Up @@ -383,7 +383,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o fifo.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o fifo.o bjm.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
276 changes: 276 additions & 0 deletions src/bjm.c
@@ -0,0 +1,276 @@
/*
* Background Job Manager - submit jobs to a background thread.
*/
#include "fmacros.h"
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <signal.h>

#include "bjm.h"
#include "fifo.h"
#include "zmalloc.h"
#include "atomicvar.h"
#include "serverassert.h"
#include "server.h" // Gah! Hate to pull this in. CPU affinity & BIO config.

static const unsigned BJM_THREAD_STACK_SIZE = 4 * 1024 * 1024;
static const int INITIAL_FUNCTION_CAPACITY = 8;

static int functionsCount;
static int functionsCapacity;
static bjmJobFunc *functions; // An array of function pointers. Index matches job lists.

// A FIFO queue with a mutex to protect access
typedef struct {
pthread_mutex_t mutex;
Fifo *fifo;
} MutexFifo;

// A Joblist contains a specific function to be executed with a list of privdata
typedef struct {
bjmJobFunc func; // The callback function for the jobs
MutexFifo jobs; // The contained list of jobs (privdata values)
serverAtomic long job_count; // Might be greater than length(jobs). It includes in-progress.
} Joblist;

// This arrays hold a Joblist* for each known callback function.
static Joblist **jobsByFunc; // Array indexed by index in functions[]

// This FIFO queue hold Joblists from the array above. Each time one of those
// Joblists becomes non-empty, it gets added to the active queue.
static MutexFifo activeJoblists;

static serverAtomic long queuedJobCount;
static serverAtomic long processedJobCount;

static pthread_cond_t wakeup_cond; // Triggered when jobs are submitted

static int threadCount = 0;
static pthread_t *threads; // Array of threads


static void mutexFifoInit(MutexFifo *q) {
pthread_mutex_init(&q->mutex, NULL);
q->fifo = fifoCreate();
}


static void mutexFifoLock(MutexFifo *q) {
pthread_mutex_lock(&q->mutex);
}


static void mutexFifoUnlock(MutexFifo *q) {
pthread_mutex_unlock(&q->mutex);
}


static void increaseFunctionCapacity(void) {
functionsCapacity *= 2;
functions = zrealloc(functions, functionsCapacity * sizeof(bjmJobFunc));
jobsByFunc = zrealloc(jobsByFunc, functionsCapacity * sizeof(Joblist*));
}


// Find the function's index. Adds the function if it's a new one.
static int getFuncIdx(bjmJobFunc func) {
// It's expected that the function count is small, probably spanning only 1 or 2 cache lines.
// A simple linear search will be faster than a complex structure like hash.
for (int i = 0; i < functionsCount; i++) {
if (functions[i] == func) return i;
}

// At this point, we know that the function isn't in the list. Insert at end.
if (functionsCount == functionsCapacity) increaseFunctionCapacity();
int idx = functionsCount++;
functions[idx] = func;
jobsByFunc[idx] = zmalloc(sizeof(Joblist));
mutexFifoInit(&jobsByFunc[idx]->jobs);
jobsByFunc[idx]->func = func;
atomicSet(jobsByFunc[idx]->job_count, 0);
return idx;
}


/* Pull one job from the active joblists. Synchronously waits for a job if none available.
* privdata_ptr - returns the caller supplied privdata.
* joblist_ptr - returns the joblist that the job was taken from. This is needed by the caller
* in order to (later) decrement the job_count.
* Returns:
* Returns the bjmJobFunc to be called.
*/
static bjmJobFunc waitForJob(void **privdata_ptr, Joblist **joblist_ptr) {
bjmJobFunc func = NULL;

mutexFifoLock(&activeJoblists);
while (fifoLength(activeJoblists.fifo) == 0) {
pthread_cond_wait(&wakeup_cond, &activeJoblists.mutex);
}

Joblist *joblist = fifoPeek(activeJoblists.fifo);
func = joblist->func;
*joblist_ptr = joblist;

mutexFifoLock(&joblist->jobs);
*privdata_ptr = fifoPop(joblist->jobs.fifo);

if (fifoLength(joblist->jobs.fifo) == 0) {
// No jobs left for this function
fifoPop(activeJoblists.fifo);
} else if (fifoLength(activeJoblists.fifo) > 1) {
// Rotate the joblist for this function to the end
fifoPop(activeJoblists.fifo);
fifoPush(activeJoblists.fifo, joblist);
}
// Keep the lock on the individual joblist until it is properly handled in
// the activeJobLists. Can't have the size changing.
mutexFifoUnlock(&joblist->jobs);
mutexFifoUnlock(&activeJoblists);

return func;
}


static void *pthreadFunction(void *arg) {
int threadNum = (intptr_t)arg;

const int MAX_THREAD_NAME = 16;
char thread_name[MAX_THREAD_NAME];
snprintf(thread_name, MAX_THREAD_NAME, "bjm thread %d", threadNum);
valkey_set_thread_title(thread_name);

serverSetCpuAffinity(server.bio_cpulist);

makeThreadKillable();

/* Block SIGALRM so only the main thread will receive the watchdog signal. */
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) {
serverLog(LL_WARNING, "Warning: can't mask SIGALRM in BJM thread: %s", strerror(errno));
}

while (1) {
void *privdata;
Joblist *joblist;
bjmJobFunc func = waitForJob(&privdata, &joblist);

func(privdata); // Execute the callback
atomicDecr(joblist->job_count, 1); // Decrement count AFTER callback finishes
atomicDecr(queuedJobCount, 1);
atomicIncr(processedJobCount, 1);
}

return NULL;
}


// API
void bjmInit(int numThreads) {
if (threadCount == numThreads) return; // Silently skip to support testing
assert(threadCount == 0); // But don't allow changing the number of threads

functionsCount = 0;
functionsCapacity = INITIAL_FUNCTION_CAPACITY;

functions = zmalloc(functionsCapacity * sizeof(bjmJobFunc));
jobsByFunc = zmalloc(functionsCapacity * sizeof(Joblist*));

atomicSet(queuedJobCount, 0);
atomicSet(processedJobCount, 0);

mutexFifoInit(&activeJoblists);

pthread_cond_init(&wakeup_cond, NULL);

threadCount = numThreads;
threads = zmalloc(threadCount * sizeof(pthread_t));

pthread_attr_t attr;
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr, &stacksize);
if (stacksize < BJM_THREAD_STACK_SIZE) stacksize = BJM_THREAD_STACK_SIZE;
pthread_attr_setstacksize(&attr, stacksize);

for (int i = 0; i < threadCount; i++) {
void *arg = (void*)(intptr_t)i;
if (pthread_create(&threads[i], &attr, pthreadFunction, arg) != 0) {
serverLog(LL_WARNING, "Fatal: Can't initialize background jobs.");
exit(1);
}
}
}


// API
bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func) {
return getFuncIdx(func) + 1; // +1 to avoid 0 (uninitialized static) being a valid value
}


// API
void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata) {
Joblist *joblist = jobsByFunc[funcHandle - 1];

mutexFifoLock(&joblist->jobs);
fifoPush(joblist->jobs.fifo, privdata);
atomicIncr(joblist->job_count, 1);
atomicIncr(queuedJobCount, 1);
if (fifoLength(joblist->jobs.fifo) == 1) {
// Reader threads take the activeJobists lock before the joblist lock. But this can't
// cause deadlock because this joblist isn't in the active joblist yet.
mutexFifoLock(&activeJoblists);
fifoPush(activeJoblists.fifo, joblist);
mutexFifoUnlock(&activeJoblists);
}
pthread_cond_signal(&wakeup_cond);
mutexFifoUnlock(&joblist->jobs);
}


// API
void bjmKillThreads(void) {
for (int i = 0; i < threadCount; i++) {
if (threads[i] == pthread_self()) continue;
if (pthread_cancel(threads[i]) == 0) {
int err = pthread_join(threads[i], NULL);
if (err == 0) {
serverLog(LL_WARNING, "BJM thread #%d terminated", i);
} else {
serverLog(LL_WARNING, "BJM thread #%d can not be joined: %s", i, strerror(err));
}
}
}
}


// API
long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle) {
long jobCount;
atomicGet(jobsByFunc[funcHandle - 1]->job_count, jobCount);
return jobCount;
}


// API
sds bjmCatInfo(sds info) {
long queuedJobs, processedJobs;
atomicGet(queuedJobCount, queuedJobs);
atomicGet(processedJobCount, processedJobs);

info = sdscatprintf(info,
"# BackgroundJobManager\r\n"
"bjm_num_threads:%d\r\n"
"bjm_num_callbacks:%d\r\n"
"bjm_jobs_in_queue:%ld\r\n"
"bjm_processed_jobs:%ld\r\n",
threadCount,
functionsCount,
queuedJobs,
processedJobs
);
return info;
}
63 changes: 63 additions & 0 deletions src/bjm.h
@@ -0,0 +1,63 @@
/*
* Background Job Manager - submit jobs to a background thread.
*/

#ifndef __BJM_H__
#define __BJM_H__

#include "sds.h"


/* Initialize BJM with the requested number of background threads.
*/
void bjmInit(int numThreads);


/* Provided job functions will be executed on a different thread and passed the provided privdata.
*/
typedef void (*bjmJobFunc)(void *privdata);

/* After registering a function, the returned function handle can be used to submit jobs.
*/
typedef int bjmJobFuncHandle;


/* Register a job function which can process background jobs. A handle is returned for submitting
* jobs & gathering metrics. This function is idempotent - submitting the same function again will
* return the same handle. Handle values will be > 0, so this pattern can be used:
*
* static bjmJobFuncHandle myHandle;
* if (!myHandle) myHandle = bjmRegisterJobFunc(myFunc);
* bjmSubmitJob(myHandle, ...);
*
* This co-locates a static variable at the point of job submission, and avoids repeated
* registration calls.
*/
bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func);


/* Submit a job to BJM. The provided function will be executed on a background thread. privdata
* will be provided as a parameter to the provided function. For fairness, jobs with different
* callback functions will be executed in round-robin fashion. Since jobs are executed across
* multiple threads, there is no guarantee as to ordering or exclusion between jobs.
*/
void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata);


/* Kill all threads in an unclean way. Non-recoverable.
* Only used during collection of debug information.
*/
void bjmKillThreads(void);


/* Count the number of pending/active jobs for the given job function.
* Note that this value is highly volatile as background threads are processing the jobs.
*/
long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle);


/* Provide metrics data for INFO
*/
sds bjmCatInfo(sds info);

#endif

0 comments on commit b0d6261

Please sign in to comment.