Today, we're going to build a threadpool in C. I remember when I first implemented a threadpool for a project at my first job, it was quite an adventure, threading can be a bit tricky, but I assure you, it's worth the journey. So, let's dive in!
What's a Threadpool?
In concurrent programming, a threadpool is a collection of worker threads that efficiently execute asynchronous tasks. Threadpools are crucial for tasks that involve blocking operations such as I/O, as they help to avoid blocking the main thread and causing the application to become unresponsive.
Where are Threadpools Used?
Threadpools are widely used in scenarios where you have to deal with many short-lived tasks, especially when their execution is I/O bound or involves waiting, like in web servers, GUI applications, or background tasks processors.
Building a Threadpool in C: The Step-by-Step Guide
Here's a simplified program that illustrates how to create and use a threadpool. We will build it incrementally, explaining each step along the way. We'll be using the POSIX threads library (pthread.h).
1. The Task Structure
To kick things off, let's talk about a fundamental concept that we'll use throughout this tutorial, and that's the notion of maintaining a 'state' while we're processing data. In our context, this 'state' is going to be represented by a struct in C. Let's call it State.
typedef struct
{
// Add any data which your multithreading program will hold
} State;
Then we proceed to defining a structure to hold the data needed for our tasks. Let's call it JobData.
typedef struct
{
const unsigned char *content;
size_t length;
State *state;
pthread_mutex_t *mutex;
} JobData;
This structure holds the data that we'll process (content and length), a state that will be shared among tasks (state), and a mutex for thread-safe printing to the standard output (mutex).
2. The Worker Function
Next, we define the function that the worker threads will execute. This function will take a JobData structure as its argument, process the data, and update the shared state.
void execute_job(const unsigned char *content, size_t length, State *state, pthread_mutex_t *mutex)
{
pthread_mutex_lock(mutex);
// Execute the job using the state here
pthread_mutex_unlock(mutex);
}
void *load_job_in_pool(void *arg)
{
JobData *job_data = (JobData *)arg;
execute_job(job_data->content, job_data->length, job_data->state, job_data->mutex);
return NULL;
}
In this function, we lock the mutex before executing the job to ensure that only one thread can update the state and print to the standard output at a time.
3. Creating the Threadpool
We're now ready to create the threadpool. In the main function, we start by parsing command-line options and initializing our shared state and mutex.
int main(int argc, char *argv[])
{
// Parse command-line options
// ...
// Initialize shared state and mutex
State state = NULL; //Initialize the state here to your needs
pthread_t *workers = malloc(num_workers * sizeof(pthread_t));
pthread_mutex_t print_mutex;
pthread_mutex_init(&print_mutex, NULL);
// ...
}
4. Dispatching Jobs to the Threadpool
Next, we open each input file, map it into memory, divide it into chunks, and dispatch these chunks to our threadpool. You probably want to define a chunk size specifically ahead such that
#define CHUNK_SIZE 1024
Then begin our code to dispatch jobs to the threadpool.
for (int i = optind; i < argc; i++)
{
// Open the file and map it into memory
// ...
size_t num_sections = (file_size + CHUNK_SIZE - 1) / CHUNK_SIZE;
// If it's the first file, initialize the state
if (first_file)
{
//Initialize your state here
first_file = false;
}
for (size_t section = 0; section < num_sections; section++)
{
size_t remaining_workers = num_workers;
size_t remaining_sections = num_sections - section;
size_t workers_to_use;
if (remaining_workers < remaining_sections)
{
workers_to_use = remaining_workers;
}
else
{
workers_to_use = remaining_sections;
}
JobData *jobs = malloc(workers_to_use * sizeof(JobData));
for (size_t j = 0; j < workers_to_use; j++)
{
jobs[j].content = mapped_file + (section + j) * CHUNK_SIZE;
if ((section + j + 1) * CHUNK_SIZE <= (size_t)file_size)
{
jobs[j].length = CHUNK_SIZE;
}
else
{
jobs[j].length = file_size % CHUNK_SIZE;
}
jobs[j].state = &state;
jobs[j].mutex = &print_mutex;
pthread_create(&workers[j], NULL, load_job_in_pool, &jobs[j]);
}
for (size_t j = 0; j < workers_to_use; j++)
{
pthread_join(workers[j], NULL);
}
free(jobs);
section += workers_to_use - 1;
}
// Unmap the file and close it
// ...
}
Here, we first calculate the number of chunks or sections that the file will be divided into. We then iterate over each section, allocating jobs for as many workers as we can use, creating the worker threads and waiting for them to finish before moving on to the next set of sections.
5. Cleaning Up
Finally, we print the final state, free the memory allocated for the worker threads, and destroy the mutex.
pthread_mutex_lock(&print_mutex);
fwrite(&(state), 1, 1, stdout);
pthread_mutex_unlock(&print_mutex);
free(workers);
pthread_mutex_destroy(&print_mutex);
return 0;
Real-World Applications of Threadpools
Threadpools are used in a wide range of applications. For instance, web servers use threadpools to handle multiple concurrent client connections. Database servers use them to process multiple simultaneous queries. In the scientific computing realm, they're used to parallelize computations and make better use of multicore CPUs.
In this tutorial, we've implemented a simple threadpool for processing file data in parallel. However, the same principles can be applied to create more complex threadpool-based applications.
That's it for this tutorial! I hope this helps you understand threadpools better and enables you to utilize them in your own projects. As always, happy coding!