From 8dbe34494cef11aa1a0f7dc819084a0567a83bfd Mon Sep 17 00:00:00 2001 From: Matt Strapp Date: Wed, 25 Nov 2020 12:22:17 -0600 Subject: add finishing "touches" --- P3/README.md | 27 +++++++++--- P3/server.c | 141 +++++++++++++++++++++-------------------------------------- 2 files changed, 71 insertions(+), 97 deletions(-) diff --git a/P3/README.md b/P3/README.md index 3bd3501..7dd3e29 100644 --- a/P3/README.md +++ b/P3/README.md @@ -2,14 +2,18 @@ The third project in CSCI 4061: Intro to Operating Systems. -## Fall 2020 +## Fall 2020 - **Test machine:** atlas.cselabs.umn.edu - **Date:** 11/15/2020 - **Name:** Andrea Smith, Matt Strapp - **x500:** smit9523, strap012 -The purpose of this program is to create a multi-threaded web server by using POSIX threads. In this project, we wrote the mapper and reducer threads, a logging system +The purpose of this program is to create a multi-threaded web server by using POSIX threads. In this project, we wrote the worker and dispatcher threads, a logging system, and helper functions that implement caching to improve runtime performance. + +In order to run this program, compile with the included makefile (run ```make``` in the directory with no additional arguments). After that, run ```./web_server``` with the following additional arguments seperated by only a space (with no quotes or brackets): +[Port used (between 1025 and 65535)] [Web server directory] [Number of dispatch threads] [Number of worker threads] [Maximum worker queue length] [Unimplemented Flag (Use 0)] [Maximum cache length] + #### Program structure @@ -19,24 +23,33 @@ Repeatedly receives the client request and adds the requests to the queue. ```worker():``` Monitors the queue, retrieve new requests as they come in, and send the result back to the client. -## Extra Credit A +```readFromDisk()``` +Opens and reads the request's file from the disk into memory. -```dynamic_pool_size_update():``` -Changes the worker thread pool dynamically depending on the number of requests +```getContentType()``` +Gets the content type of the file in the request. -## Extra Credit B +## Extra Credit B (Helper functions for worker() to implement a cache) ```initCache():``` +Allocates memory and intializes the cache array. ```isInCache():``` +Checks whether the given request is present in the cache and on success, returns the index of the request. ```readFromCache():``` +Traverses the cache queue to find the needed cache. ```addIntoCache():``` +Adds the request and its file content into the cache. ```deleteCache():``` +Clears/frees up the memory allocated to the cache. + +```getFileSize()``` +Gets the size of the file in the queue in order to be able to allocate buffers for the cache. #### Team Contributions: -For the first draft, Andrea primarily contributed to sendChunkData() and getChunkData() and Matt wrote shuffle() and getInterData(), but the debugging process (the majority of the work) was entirely a joint effort. \ No newline at end of file +We work on projects together using LiveShare on VSCode. Because of this, it is difficult to say exactly how the work is distributed-- there are only two of us, so we typically are working at the same by going back and forth over a chunk of code together. The workload for worker() and dispatch() was evenly distributed. Matt also primarily wrote the Cache Code for Extra Credit B. Andrea handled the interim report, README, and other code documentation. \ No newline at end of file diff --git a/P3/server.c b/P3/server.c index b28a772..531fc89 100644 --- a/P3/server.c +++ b/P3/server.c @@ -22,8 +22,6 @@ // Global variables int port, workers, dispatchers, dynFlag, maxQlen, maxCSiz, curQSiz, cacheLength, wIndex; -int tempNodeCounter = 0, counter2 = 0; -pthread_t *wID; char *path; pthread_mutex_t Qlock, logLock, cacheLock; pthread_cond_t QisEmpty, QisFull, cacheIsEmpty; @@ -34,7 +32,7 @@ void *worker(void *arg); THE CODE STRUCTURE GIVEN BELOW IS JUST A SUGGESTION. FEEL FREE TO MODIFY AS NEEDED */ -// structs: +// Structs: typedef struct request_queue { int fd; char *request; @@ -54,45 +52,15 @@ cache_entry_t *dynQ = NULL; //The cache queue // Extra Credit: This function implements the policy to change the worker thread pool dynamically // depending on the number of requests void *dynamic_pool_size_update(void *arg) { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - while (1) { - // # of active worker thrids / curQSiz = the server load - // If the server load is over 90% usage, spawn MAX_THREADS - # of active worker thrids (wIndex?). This will create the max number of threads possible. - // If server load is between 10% and 90%, spawn (50 threads - # of active workers) - // If server load is below 10%, kill all but 2 thrids - if (curQSiz == 0) - continue; - float serverLoad = 100*((float) curQSiz / (float)wIndex); - int actualLoad = serverLoad; - printf("\n\nCurrent # of worker thrids: %d\n", wIndex); - printf("Current Q size: %d\n", curQSiz); - printf("Server load: %d\n\n", actualLoad); - int *Wargs = (int *)malloc(sizeof(int) * MAX_THREADS); - if (actualLoad > 50) { - wID = (pthread_t *) realloc(wID, MAX_THREADS); - for (int i = wIndex; i < MAX_THREADS; i++) { - wIndex++; - Wargs[i] = i; - pthread_create(&wID[i], &attr, worker, (void *)&Wargs[i]); //TODO: Worker arguments - char threadName[16]; - sprintf(threadName, "Worker %d", i); - pthread_setname_np(wID[i], threadName); - } - } - free(Wargs); - - //This doesn't work ._. - - } + return NULL; } + /**********************************************************************************/ /* ************************ Cache Code [Extra Credit B] **************************/ // Function to check whether the given request is present in cache -// return the index if the request is present in the cache +// return the index of the request is present in the cache int isInCache(char *request) { cache_entry_t *traverse = dynQ; if (dynQ == NULL) @@ -146,8 +114,6 @@ void addIntoCache(char *mybuf, char *memory, int memory_size) { free(temp->content); free(temp); free(silence); - // pthread_mutex_lock(&cacheLock); - // pthread_mutex_unlock(&cacheLock); addIntoCache(mybuf, memory, memory_size); } else { //Cache is not full cache_entry_t *temp = (cache_entry_t *)calloc(1, sizeof(cache_entry_t)); @@ -165,7 +131,7 @@ void addIntoCache(char *mybuf, char *memory, int memory_size) { } } -// clear the memory allocated to the cache +// Clear the memory allocated to the cache void deleteCache() { request_t *tempReq = NULL; @@ -175,7 +141,6 @@ void deleteCache() Q = Q->next; free(tempReq->request); free(tempReq); - tempNodeCounter--; } pthread_mutex_unlock(&Qlock); // De-allocate/free the cache memory @@ -189,11 +154,9 @@ void deleteCache() free(tempCache); } pthread_mutex_unlock(&cacheLock); - free(wID); } // Function to initialize the cache - void initCache(){ // Allocating memory and initializing the cache array dynQ = (cache_entry_t *)calloc(1, sizeof(cache_entry_t)); @@ -228,32 +191,39 @@ char *getContentType(char *mybuf) { } } -// Function to get the size of the file in the queue +// Function to get the size of the file in the queue. Will be used to allocate memory for the cache // (Thanks Matt from stackOverflow) long getFileSize(char *file){ char *temp = (char *)malloc(BUFF_SIZE); sprintf(temp, ".%s", file); FILE *f = fopen(temp, "r"); free(temp); - if (f == NULL) - return 0; + if (f == NULL) { + perror("File size read error"); + return 0; + } fseek(f, 0, SEEK_END); long len = ftell(f); fclose(f); return len; } -// Function to open and read the file from the disk into the memory -// Add necessary arguments as needed +// Function to open and read the file from the disk into memory int readFromDisk(char *fileName, char *buffer, long fileSize) { char *temp = (char *)malloc(BUFF_SIZE + 1); sprintf(temp, ".%s", fileName); // Open and read the contents of file given the request int requestFile = open(temp, O_RDONLY); free(temp); - if (requestFile == -1) + if (requestFile == -1) { + perror("Disk read error"); return -1; // Error handle - read(requestFile, buffer, fileSize); + } + if (read(requestFile, buffer, fileSize) == -1) { + perror("File read error"); + close(requestFile); + return -1; + } close(requestFile); return 0; } @@ -275,11 +245,9 @@ void *dispatch(void *arg){ if (traverse == NULL || traverse->next == NULL) { //Add things to queue. Lock & unlock to prevent a deadlock request_t *tempNode = (request_t *)malloc(sizeof(request_t) + 1); - tempNodeCounter++; char *dispatchBuf = (char *)malloc(BUFF_SIZE); // Buffer to store the requested filename if (get_request(newReq, dispatchBuf) != 0) { free(tempNode); - tempNodeCounter--; free(dispatchBuf); pthread_mutex_unlock(&Qlock); break; // If get_request fails, ignore the request @@ -317,11 +285,6 @@ void *worker(void *arg) { pthread_mutex_lock(&Qlock); while (curQSiz == 0) pthread_cond_wait(&QisEmpty, &Qlock); - // if (Q == NULL) - // { - // pthread_mutex_unlock(&Qlock); - // continue; - // } start_t = clock(); request_t *request = (request_t *)malloc(sizeof(request_t)); request->fd = Q->fd; @@ -344,9 +307,9 @@ void *worker(void *arg) { fail = true; sprintf(bytesError, "%s", strerror(errno)); } - char *cacheTest; //HIT/MISS only + char *cacheTest; // HIT/MISS only if (!fail) { - //Make sure nothing is fiddling with cache before fiddling with cache + // Make sure nothing is fiddling with cache before fiddling with cache pthread_mutex_lock(&cacheLock); int test = isInCache(request->request); if (test != -1) { @@ -358,7 +321,7 @@ void *worker(void *arg) { pthread_mutex_unlock(&cacheLock); cacheTest = "MISS"; if (readFromDisk(request->request, workerBuf, numbytes) == -1) { - //Not in cache, disk read failed + // If request not in cache, disk read failed, set flag fail = true; } else { @@ -368,6 +331,9 @@ void *worker(void *arg) { pthread_mutex_unlock(&cacheLock); } } + } else { + //Failsafe to add cache being missed + cacheTest = "MISS"; } // Log the request into the file and terminal @@ -381,28 +347,23 @@ void *worker(void *arg) { // Return the result if (fail) { - //ERR + // ERROR return_error(request->fd, request->request); } else { - //SUCC + // SUCCESS return_result(request->fd, getContentType(request->request), workerBuf, numbytes); } - //Free things no longer needed - // free(request->request); + // Free things no longer needed free(request); - tempNodeCounter--; free(workerBuf); - end_t = clock(); - total_t = (double)(end_t - start_t); - printf("Total time taken by CPU: %ld\n", total_t); } return NULL; } /**********************************************************************************/ -//Flag for when server needs to die nicely +// Flag for when server needs to die nicely static volatile sig_atomic_t exitFlag = 0; //Sets e flag so process can die happily and not sad. @@ -434,21 +395,26 @@ int main(int argc, char **argv) { /* -- ERROR CHECKING -- */ if (port < 1025 || port > 65535) { - printf("Invalid port. Port must be greater than 1025 or less than 65535 (inclusive).\n"); + printf("Invalid port. Port must be between 1025 and 65535 (inclusive).\n"); return -1; } if (dispatchers > MAX_THREADS || dispatchers < 1) { - printf("Number of dispatchers is invalid. It must be greater than 1 or less than %d (inclusive).\n", MAX_THREADS); + printf("Number of dispatchers is invalid. It must be between 1 and %d (inclusive).\n", MAX_THREADS); return -1; } if (workers > MAX_THREADS || workers < 1) { - printf("Number of dispatchers is invalid. It must be greater than 1 or less than %d (inclusive).\n", MAX_THREADS); + printf("Number of dispatchers is invalid. It must be between 1 and %d (inclusive).\n", MAX_THREADS); return -1; } if (maxQlen > MAX_queue_len || maxQlen < 1) { - printf("Queue length is invalid It must be greater than 1 or less than %d (inclusive).\n", MAX_queue_len); + printf("Queue length is invalid. It must be between 1 and %d (inclusive).\n", MAX_queue_len); return -1; } + if (maxCSiz > MAX_queue_len || maxCSiz < 1) { + printf("Cache size is invalid. It must be between 1 and %d (inclusive).\n", MAX_queue_len); + return -1; + } + /* -- END ERROR CHECKING -- */ // Change SIGINT action for graceful termination @@ -459,8 +425,12 @@ int main(int argc, char **argv) { perror("SIGINT Handler Error"); return -2; } - // Create instance of logfil - FILE *logfile = fopen("webserver_log", "w") + // Create instance of logfile + FILE *logfile = fopen("webserver_log", "w"); + if (logfile == NULL) { + perror("Logfile creation error"); + return -2; + } fclose(logfile); // Change the current working directory to server root directory @@ -471,42 +441,34 @@ int main(int argc, char **argv) { // Initialize cache (extra credit B) initCache(); - //Make sure threads are all detached + // Make sure threads are all detached pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // Start the server init(port); char threadName[16]; - // Create dispatcher threads (make detachable????) + // Create dispatcher threads pthread_t dThreads[dispatchers]; for (int i = 0; i < dispatchers; i++) { - pthread_create(&dThreads[i], &attr, dispatch, NULL); // DEBUG! figure out last arg + pthread_create(&dThreads[i], &attr, dispatch, NULL); sprintf(threadName, "Dispatch %d", i); pthread_setname_np(dThreads[i], threadName); } - //Create workers (make detachable?????) + // Create workers int *Wargs = (int *)malloc(sizeof(int) * workers); - pthread_t *throwaway = (pthread_t *)calloc(workers, sizeof(pid_t)); - wID = throwaway; + pthread_t wID[workers]; for (int i = 0; i < workers; i++) { wIndex++; Wargs[i] = i; - pthread_create(&wID[i], &attr, worker, (void *)&Wargs[i]); //TODO: Worker arguments + pthread_create(&wID[i], &attr, worker, (void *)&Wargs[i]); sprintf(threadName, "Worker %d", i); pthread_setname_np(wID[i], threadName); } free(Wargs); - // Create dynamic pool manager thread (extra credit A) - if (dynFlag) { - pthread_t pThread; - pthread_create(&pThread, &attr, dynamic_pool_size_update, NULL); //TODO: possible arguments - pthread_setname_np(pThread, "Pool Manager"); - } - //Server loop (RUNS FOREVER) + // Server loop (RUNS FOREVER) while (1) { - //TODO: Add something else? // Terminate server gracefully if (exitFlag) { printf("\nSIGINT caught, exiting now.\n"); @@ -520,7 +482,6 @@ int main(int argc, char **argv) { // Remove cache (extra credit B) deleteCache(); printf("Cache has been deleted.\n"); - printf("%d\n", tempNodeCounter); return 0; } } -- cgit v1.2.3