From 741eb8467ca0b8464261f37a6ab3ec130eb8078c Mon Sep 17 00:00:00 2001 From: Matt Strapp Date: Sat, 5 Dec 2020 11:09:22 -0600 Subject: Start P4 --- P4/server.c | 554 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 554 insertions(+) create mode 100644 P4/server.c (limited to 'P4/server.c') diff --git a/P4/server.c b/P4/server.c new file mode 100644 index 0000000..b306def --- /dev/null +++ b/P4/server.c @@ -0,0 +1,554 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "util.h" +#include +#include +#include + +#define MAX_THREADS 100 +#define MAX_queue_len 100 +#define MAX_CE 100 +#define INVALID -1 +#define BUFF_SIZE 1024 + +// structs: +typedef struct request_queue { + int fd; + char *request; +} request_t; + +typedef struct cache_entry { + int len; + char *request; + char *content; +} cache_entry_t; + + +// globals: +FILE *logfile; + +// Dispatcher Threads +pthread_t dispatch_threads[MAX_THREADS]; +// Worker Threads +pthread_t worker_threads[MAX_THREADS]; +// Thread ID array +int wid_arr[MAX_THREADS]; +int did_arr[MAX_THREADS]; + + +pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t req_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t req_queue_notfull = PTHREAD_COND_INITIALIZER; // wait until queue isn't full (dispatch) +pthread_cond_t req_queue_notempty = PTHREAD_COND_INITIALIZER; // wait until queue isn't empty (worker) +request_t requests[MAX_queue_len]; + +int req_remove_index = 0; +int req_insert_index = 0; +int curr_queue_len = 0; +int queue_len; +int cache_size = INVALID; +int cache_evict_index = 0; +int num_worker = INVALID; + +// Cache +cache_entry_t *cache; + +void *dispatch(void *arg); + +void *worker(void *arg); + +// For grace termination +static volatile sig_atomic_t doneflag = 0; +static void setdoneflag(int signo) { + doneflag = 1; +} + +/* ************************ Dynamic Pool Code ***********************************/ +bool alive_pool[MAX_THREADS]; + +// Extra Credit: This function implements the policy to change the worker thread pool dynamically +// depending on the number of requests +void *dynamic_pool_size_update(void *arg) { + while (1) { + usleep(500000); + // Lock the mutex to dynamically change thread pool + if (pthread_mutex_lock(&req_queue_mutex) < 0) { + printf("Failed to lock queue! %s\n", strerror(errno)); + continue; + } + + int i; + // Cancel some threads + // printf("curr_queue_len: %d --- num_worker: %d\n", curr_queue_len, num_worker); + if (curr_queue_len < num_worker) { + if (num_worker > 1) { + printf("Deleted %d worker threads because server load decreased\n", num_worker - curr_queue_len - 1); + } + for (i = 0; (i < MAX_THREADS) && (curr_queue_len < num_worker) && (num_worker > 1); i++) { + if (alive_pool[i] == true) { + alive_pool[i] = false; + num_worker--; + } + } + pthread_cond_broadcast(&req_queue_notempty); + } + + // Create new threads + else if (curr_queue_len > num_worker) { + pthread_attr_t attr; + int s; + s = pthread_attr_init(&attr); + if (s != 0) { + fprintf(stderr, "error - attribute init failed.\n"); + exit(EXIT_FAILURE); + } + s = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (s != 0) { + fprintf(stderr, "error - setting detached state failed.\n"); + exit(EXIT_FAILURE); + } + printf("Created %d worker threads because server load increased\n", curr_queue_len - num_worker); + for (i = 0; i < MAX_THREADS && curr_queue_len > num_worker; i++) { + if (alive_pool[i] == false) { + alive_pool[i] = true; + s = pthread_create(worker_threads + i, &attr, worker, (void *) &wid_arr[i]); + if (s != 0) { + fprintf(stderr, "error - pthread_create for worker threads dynamically.\n"); + } + num_worker++; + } + } + } + + if (pthread_mutex_unlock(&req_queue_mutex) < 0) { + printf("Failed to unlock queue! %s\n", strerror(errno)); + } + } +} +/**********************************************************************************/ + +/* ************************************ Cache Code ********************************/ + +// Function to check whether the given request is present in cache +int getCacheIndex(char *request) { + //printf("Checking cache\n"); + int i; + for (i = 0; i < cache_size; i++) { + if (cache[i].request != NULL && strcmp(cache[i].request, request) == 0) { + //printf("Found in cache %d\n",i); + return i; + } + } + //printf("Not in cache\n"); + return INVALID; +} + +// Function to add the request and its file content into the cache +// It should add the request at an index according to the cache replacement policy you are using +void addIntoCache(char *mybuf, char *memory, int memory_size) { + //printf("Adding to cache\n"); + if (cache[cache_evict_index].request != NULL) { + free(cache[cache_evict_index].request); + } + if (cache[cache_evict_index].content != NULL) { + free(cache[cache_evict_index].content); + } + + //printf("Allocating cache entry\n"); + cache[cache_evict_index].request = (char *) malloc(BUFF_SIZE * sizeof(char)); + cache[cache_evict_index].content = (char *) malloc(memory_size); + + //printf("Filling cache entry\n"); + strcpy(cache[cache_evict_index].request, mybuf); + memcpy(cache[cache_evict_index].content, memory, memory_size); + cache[cache_evict_index].len = memory_size; + + cache_evict_index = (cache_evict_index + 1) % cache_size; + //printf("Cache add done\n"); +} + +// clear the memory allocated to the cache +void deleteCache() { + int i; + for (i = 0; i < cache_size; i++) { + free(cache[i].request); + free(cache[i].content); + } + free(cache); +} + +// Function to initialize the cache +void initCache() { + cache = malloc(sizeof(cache_entry_t) * cache_size); + int i; + for (i = 0; i < cache_size; i++) { + cache[i].len = 0; + cache[i].request = NULL; + cache[i].content = NULL; + } +} + + +/**********************************************************************************/ + +// Function to get the content type from the request +char *getContentType(char *mybuf) { + int requeue_len = strlen(mybuf); + char *contenttype; + if (strcmp(&mybuf[requeue_len - 5], ".html") == 0) { + contenttype = "text/html"; + } else if (strcmp(&mybuf[requeue_len - 4], ".jpg") == 0) { + contenttype = "image/jpeg"; + } else if (strcmp(&mybuf[requeue_len - 4], ".gif") == 0) { + contenttype = "image/gif"; + } else { + contenttype = "text/plain"; + } + return contenttype; +} + + +// Function to open and read the file from the disk into the memory +// You can return the file size if it is useful +int readFromDisk(int fd, char *mybuf, void **memory, int id, int myreqnum) { + int filefd; + if ((filefd = open(mybuf + 1, O_RDONLY)) == -1) { + // it is mybuf + 1 because get request will return the path with a '/' in the front + return_error(fd, "Requested file not found."); + // Stop recording the time + pthread_mutex_lock(&log_mutex); + printf("[%d][%d][%d][%s][%s][%s]\n", id, myreqnum, fd, mybuf, "Requested file not found.", "MISS"); + fprintf(logfile, "[%d][%d][%d][%s][%s][%s]\n", id, myreqnum, fd, mybuf, "Requested file not found.", "MISS"); + fflush(logfile); + pthread_mutex_unlock(&log_mutex); + return -1; + } + struct stat st; + fstat(filefd, &st); + int filesize = st.st_size; + *memory = malloc(filesize); + read(filefd, *memory, filesize); + close(filefd); + return filesize; +} + + +/**********************************************************************************/ +// Function to receive the request from the client and put into the queue +void *dispatch(void *arg) { + int fd; + char mybuf[BUFF_SIZE]; + char *reqptr; + + while (1) { + fd = accept_connection(); + if (fd < 0) { return NULL; } + if (get_request(fd, mybuf) != 0) { continue; /*back to top of loop!*/ } + + reqptr = (char *) malloc(strlen(mybuf) + 1); + if (reqptr == NULL) { + printf("Failed to allocate memory: %s", strerror(errno)); + } + strcpy(reqptr, mybuf); + + // put request in queue + if (pthread_mutex_lock(&req_queue_mutex) < 0) { + printf("Failed to lock queue! %s\n", strerror(errno)); + } + while (curr_queue_len == queue_len) { // the queue is full! + pthread_cond_wait(&req_queue_notfull, &req_queue_mutex); + } + + requests[req_insert_index].fd = fd; + requests[req_insert_index].request = reqptr; + + curr_queue_len++; + req_insert_index = (req_insert_index + 1) % queue_len; // ring buffer + + pthread_cond_signal(&req_queue_notempty); + if (pthread_mutex_unlock(&req_queue_mutex) < 0) { + printf("Failed to unlock queue! %s\n", strerror(errno)); + } + } + return NULL; +} + +/**********************************************************************************/ + +// Function to take the request from the queue, process it and then return a result to the client +void *worker(void *arg) { + int id = *(int *) arg; + int fd, filesize, myreqnum; + void *memory; + char mybuf[BUFF_SIZE]; + myreqnum = 0; + + while (1) { + // get request from queue + if (pthread_mutex_lock(&req_queue_mutex) < 0) { + printf("Failed to lock! %s\n", strerror(errno)); + continue; + } + + while (curr_queue_len == 0) { // the queue is empty! + if (pthread_cond_wait(&req_queue_notempty, &req_queue_mutex) < 0) { + printf("Failed to wait! %s\n", strerror(errno)); + } + if (alive_pool[id] == false) { + if (pthread_mutex_unlock(&req_queue_mutex) < 0) { + printf("Failed to unlock queue! %s\n", strerror(errno)); + } + return NULL; + } + } + + if (alive_pool[id] == false) { + if (pthread_mutex_unlock(&req_queue_mutex) < 0) { + printf("Failed to unlock queue! %s\n", strerror(errno)); + } + return NULL; + } + + // Code to get the request from the queue + fd = requests[req_remove_index].fd; + strcpy(mybuf, requests[req_remove_index].request); + free(requests[req_remove_index].request); + + curr_queue_len--; + req_remove_index = (req_remove_index + 1) % queue_len; // ring buffer + + if (pthread_cond_signal(&req_queue_notfull) < 0) { + printf("Failed to signal! %s\n", strerror(errno)); + } + + if (pthread_mutex_unlock(&req_queue_mutex) < 0) { + printf("Failed to unlock! %s\n", strerror(errno)); + } + + myreqnum++; + + if (strcmp(mybuf, "/") == 0) { + strcpy(mybuf, "/index.html"); + } + + // Code to get the data from cache or from the disk + bool hit = false; + if (pthread_mutex_lock(&cache_mutex) < 0) { + printf("Failed to lock cache mutex! %s\n", strerror(errno)); + } + + int index = getCacheIndex(mybuf); + if (index != INVALID) { + filesize = cache[index].len; + memory = malloc(filesize); + memcpy(memory, cache[index].content, filesize); + hit = true; + } else { + filesize = readFromDisk(fd, mybuf, &memory, id, myreqnum); + if (filesize == -1) { + if (pthread_mutex_unlock(&cache_mutex) < 0) { + printf("Failed to unlock cache mutex! %s\n", strerror(errno)); + } + continue; + } + addIntoCache(mybuf, memory, filesize); + } + + if (pthread_mutex_unlock(&cache_mutex) < 0) { + printf("Failed to unlock cache mutex! %s\n", strerror(errno)); + } + + // Code to log the request into the file and terminal + if (pthread_mutex_lock(&log_mutex) < 0) { + printf("Failed to lock log mutex! %s\n", strerror(errno)); + } + + printf("[%d][%d][%d][%s][%d][%s]\n", id, myreqnum, fd, mybuf, filesize, (hit == true) ? "HIT" : "MISS"); + if (fprintf(logfile, "[%d][%d][%d][%s][%d][%s]\n", id, myreqnum, fd, mybuf, filesize, + (hit == true) ? "HIT" : "MISS") < 0) { + printf("Failed to print to log file! %s\n", strerror(errno)); + } + + fflush(logfile); + + if (pthread_mutex_unlock(&log_mutex) < 0) { + printf("Failed to unlock log mutex! %s\n", strerror(errno)); + } + + char *contenttype = getContentType(mybuf); + if (return_result(fd, contenttype, memory, filesize) != 0) { + printf("Couldn't return result, thread id=%d\n", id); + } + + free(memory); + } + return NULL; +} + +/**********************************************************************************/ + +int main(int argc, char **argv) { + + // Error check on number of arguments + // Decided to check if caching is enabled [argc == 8 -> Caching enabled] + if (argc != 8) { + printf("usage: %s port path num_dispatcher num_workers dynamic_flag queue_length cache_size\n", argv[0]); + return -1; + } + + // Init variables + int port = atoi(argv[1]); + int num_dispatch = atoi(argv[3]); + num_worker = atoi(argv[4]); + int dynamic_flag = atoi(argv[5]); + queue_len = atoi(argv[6]); + if (atoi(argv[7]) > 0) { + cache_size = atoi(argv[7]); + } + + // Sanity error checks + if (num_dispatch > MAX_THREADS || num_dispatch < 1) { + fprintf(stderr, "Invalid number of dispatch threads.\n"); + return -2; + } + if (num_worker > MAX_THREADS || num_worker < 1) { + fprintf(stderr, "Invalid number of worker threads.\n"); + return -2; + } + if (queue_len > MAX_queue_len || queue_len < 1) { + fprintf(stderr, "Invalid queue_len size.\n"); + return -2; + } + if (cache_size > MAX_CE || cache_size < 1) { + fprintf(stderr, "Invalid cache_size.\n"); + return -2; + } + + // Change SIGINT action for grace termination + struct sigaction act; + act.sa_handler = setdoneflag; + act.sa_flags = 0; + if ((sigemptyset(&act.sa_mask) == -1) || (sigaction(SIGINT, &act, NULL) == -1)) { + perror("Failed to set SIGINT handler"); + return 1; + } + + // Open log file + logfile = fopen("webserver_log", "w"); + + // Change the current working directory to server root directory + if (chdir(argv[2]) != 0) { + perror("Couldn't change directory to server root."); + return -1; + } + + // Init cache if required + initCache(); + + // Start the server + init(port); + printf("Starting server on port %d: %d disp, %d work\n", port, num_dispatch, num_worker); + + // Create dispatcher and worker threads + int i; + for (i = 0; i < MAX_THREADS; i++) { + alive_pool[i] = false; + } + for (i = 0; i < num_worker; i++) { + wid_arr[i] = i; + } + for (i = 0; i < num_dispatch; i++) { + did_arr[i] = i; + } + + pthread_attr_t attr; + int s; + + s = pthread_attr_init(&attr); + if (s != 0) { + fprintf(stderr, "error - attribute init failed.\n"); + exit(EXIT_FAILURE); + } + s = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (s != 0) { + fprintf(stderr, "error - setting detached state failed.\n"); + exit(EXIT_FAILURE); + } + + for (i = 0; i < num_worker; i++) { + alive_pool[i] = true; + s = pthread_create(worker_threads + i, &attr, worker, (void *) &wid_arr[i]); + /* Alternative */ + // s = pthread_create(worker_threads + i, NULL, worker, (void *) &wid_arr[i]); + // s = pthread_detach(*(worker_threads + i)); + if (s != 0) { + fprintf(stderr, "error - pthread_create for worker threads.\n"); + exit(EXIT_FAILURE); + } + } + for (i = 0; i < num_dispatch; i++) { + s = pthread_create(dispatch_threads + i, &attr, dispatch, (void *) &did_arr[i]); + if (s != 0) { + fprintf(stderr, "error - pthread_create for dispatcher threads.\n"); + exit(EXIT_FAILURE); + } + } + + pthread_t dynamic_pool_deamon; + if (dynamic_flag == 1) { + int dynamic_pool_deamon_id = MAX_THREADS + 1; + s = pthread_create(&dynamic_pool_deamon, &attr, dynamic_pool_size_update, (void *) &dynamic_pool_deamon_id); + if (s != 0) { + fprintf(stderr, "error - pthread_create for dynamic pool management.\n"); + exit(EXIT_FAILURE); + } + } + + // Terminate server gracefully + while (!doneflag) { + sleep(1); + } + + printf("Server terminating ...\n"); + + // Print the number of pending requests in the request queue + printf("The number of pending requests in the request queue: %d\n", curr_queue_len); + + + if (pthread_mutex_lock(&log_mutex) < 0) { + printf("Failed to lock log mutex! %s\n", strerror(errno)); + } + + // close log file + fclose(logfile); + + if (pthread_mutex_unlock(&log_mutex) < 0) { + printf("Failed to unlock log mutex! %s\n", strerror(errno)); + } + + + + if (pthread_mutex_lock(&cache_mutex) < 0) { + printf("Failed to lock cache mutex! %s\n", strerror(errno)); + } + + // Remove cache + deleteCache(); + + if (pthread_mutex_unlock(&cache_mutex) < 0) { + printf("Failed to unlock cache mutex! %s\n", strerror(errno)); + } + + printf("Main thread exiting.\n"); + return 0; +} -- cgit v1.2.3