diff options
Diffstat (limited to 'P2/src')
-rw-r--r-- | P2/src/mapper.c | 221 | ||||
-rw-r--r-- | P2/src/mapreduce.c | 83 | ||||
-rw-r--r-- | P2/src/reducer.c | 140 |
3 files changed, 444 insertions, 0 deletions
diff --git a/P2/src/mapper.c b/P2/src/mapper.c new file mode 100644 index 0000000..469b32e --- /dev/null +++ b/P2/src/mapper.c @@ -0,0 +1,221 @@ +#include "mapper.h" + +// combined value list corresponding to a word <1,1,1,1....> +valueList *createNewValueListNode(char *value){ + valueList *newNode = (valueList *)malloc (sizeof(valueList)); + strcpy(newNode -> value, value); + newNode -> next = NULL; + return newNode; +} + +// insert new count to value list +valueList *insertNewValueToList(valueList *root, char *count){ + valueList *tempNode = root; + if(root == NULL) + return createNewValueListNode(count); + while(tempNode -> next != NULL) + tempNode = tempNode -> next; + tempNode -> next = createNewValueListNode(count); + return root; +} + +// free value list +void freeValueList(valueList *root) { + if(root == NULL) return; + + valueList *tempNode = NULL; + while (root != NULL){ + tempNode = root; + root = root -> next; + free(tempNode); + } +} + +// create <word, value list> +intermediateDS *createNewInterDSNode(char *word, char *count){ + intermediateDS *newNode = (intermediateDS *)malloc (sizeof(intermediateDS)); + strcpy(newNode -> key, word); + newNode -> value = NULL; + newNode -> value = insertNewValueToList(newNode -> value, count); + newNode -> next = NULL; + return newNode; +} + +// insert <word, value> to intermediate DS +intermediateDS *insertPairToInterDS(intermediateDS *root, char *word, char *count){ + intermediateDS *tempNode = root; + if(root == NULL) + return createNewInterDSNode(word, count); + while(tempNode -> next != NULL) { + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value = insertNewValueToList(tempNode -> value, count); + return root; + } + tempNode = tempNode -> next; + + } + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value = insertNewValueToList(tempNode -> value, count); + } else { + tempNode -> next = createNewInterDSNode(word, count); + } + return root; +} + +// free intermediate DS +void freeInterDS(intermediateDS *root) { + if(root == NULL) return; + + intermediateDS *tempNode = NULL; + while (root != NULL){ + tempNode = root; + root = root -> next; + freeValueList(tempNode -> value); + free(tempNode); + } +} + +// emit the <key, value> into intermediate DS +void emit(char *key, char *value) { + // printf("To be emitted: %s, %s\n", (char *)key, value); + interDS = insertPairToInterDS(interDS, key, value); +} + +// map function +void map(char *chunkData){ + char *cData = chunkData; + char *buffer; + int i = 0; + + while((buffer = getWord(cData, &i)) != NULL){ + emit(buffer, "1"); + free(buffer); + buffer = NULL; + } +} + +// generate the file name for word.txt +char *generateWordFileName(char *word){ + char fileName[MAXKEYSZ]; + memset(fileName, '\0', MAXKEYSZ); + strcpy(fileName, word); + strcat(fileName, ".txt"); + fileName[strlen(fileName)] = '\0'; + + char *wordFileName = (char*)malloc(sizeof(char) * (2 * MAXKEYSZ)); + memset(wordFileName, '\0', (2 * MAXVALUESZ)); + strcpy(wordFileName, mapOutDir); + strcat(wordFileName, "/"); + strcat(wordFileName, fileName); + wordFileName[strlen(wordFileName)] = '\0'; + + return wordFileName; + +} + +// write intermediate data to separate word.txt files +// Each file will have only one line <word 1 1 1 1 1 ...> +void writeIntermediateDS() { + intermediateDS *travNode = interDS; + + while(travNode != NULL) { + // create file word.txt + // content : word 1 1 1 1 1 1..... + char *wordFileName = generateWordFileName(travNode -> key); + int fd = open(wordFileName, O_CREAT | O_WRONLY, 0777); + if (fd < 0){ + printf("ERROR: Cannot open the file %s\n", wordFileName); + exit(0); + } + + // word write + int ret = write(fd, travNode -> key, strlen(travNode -> key)); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + + // append space + ret = write(fd, " ", 1); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + + // append value list + valueList *tNode = travNode -> value; + while(tNode -> next != NULL){ + ret = write(fd, tNode -> value, strlen(tNode -> value)); // space after the word + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + + // append space + ret = write(fd, " ", 1); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + tNode = tNode -> next; + } + + // last value + ret = write(fd, tNode -> value, strlen(tNode -> value)); // space after the word + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + + // append newline + ret = write(fd, "\n", 1); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", wordFileName); + exit(0); + } + //close file + close(fd); + + // go to next word in intermediate DS + travNode = travNode -> next; + } +} + +int main(int argc, char *argv[]) { + + if (argc < 2) { + printf("Less number of arguments.\n"); + printf("./mapper mapperID\n"); + exit(0); + } + + // initializing global variables + mapperID = strtol(argv[1], NULL, 10); + interDS = NULL; + + //create folder specifically for this mapper in output/MapOut + mapOutDir = createMapDir(mapperID); + + + int count = 0; + while(1) { + char chunkData[chunkSize + 1]; + memset(chunkData, '\0', chunkSize + 1); + + char *retChunk = getChunkData(mapperID); + if(retChunk == NULL) { + break; + } + count++; + strcpy(chunkData, retChunk); + free(retChunk); + + map(chunkData); + } + + //student code + writeIntermediateDS(); + freeInterDS(interDS); + + return 0; +}
\ No newline at end of file diff --git a/P2/src/mapreduce.c b/P2/src/mapreduce.c new file mode 100644 index 0000000..c44adb6 --- /dev/null +++ b/P2/src/mapreduce.c @@ -0,0 +1,83 @@ +#include "mapreduce.h" + +// execute executables using execvp +void execute(char **argv, int nProcesses){ + pid_t pid; + + int i; + for (i = 0; i < nProcesses; i++){ + pid = fork(); + if (pid < 0) { + printf("ERROR: forking child process failed\n"); + exit(1); + } else if (pid == 0) { + char *processID = (char *) malloc(sizeof(char) * 5); // memory leak + sprintf(processID, "%d", i+1); + argv[1] = processID; + if (execvp(*argv, argv) < 0) { + printf("ERROR: exec failed\n"); + exit(1); + } + } + } +} + +int main(int argc, char *argv[]) { + + if(argc < 4) { + printf("Less number of arguments.\n"); + printf("./mapreduce #mappers #reducers inputFile\n"); + exit(0); + } + + int nMappers = strtol(argv[1], NULL, 10); + int nReducers = strtol(argv[2], NULL, 10); + + if(nMappers < nReducers){ + printf("ERROR: Number of mappers should be greater than or equal to number of reducers...\n"); + exit(0); + } + + if(nMappers == 0 || nReducers == 0){ + printf("ERROR: Mapper and Reducer count should be grater than zero...\n"); + exit(0); + } + + char *inputFile = argv[3]; + + bookeepingCode(); + + int status; + pid_t pid = fork(); + if(pid == 0){ + //send chunks of data to the mappers in RR fashion + sendChunkData(inputFile, nMappers); + exit(0); + } + sleep(1); + + // spawn mappers + char *mapperArgv[] = {"./mapper", NULL, NULL}; + execute(mapperArgv, nMappers); + + // wait for all children to complete execution + while (wait(&status) > 0); + + // shuffle sends the word.txt files generated by mapper + // to reducer based on a hash function + pid = fork(); + if(pid == 0){ + shuffle(nMappers, nReducers); + exit(0); + } + sleep(1); + + // spawn reducers + char *reducerArgv[] = {"./reducer", NULL, NULL}; + execute(reducerArgv, nReducers); + + // wait for all children to complete execution + while (wait(&status) > 0); + + return 0; +}
\ No newline at end of file diff --git a/P2/src/reducer.c b/P2/src/reducer.c new file mode 100644 index 0000000..d62c2c2 --- /dev/null +++ b/P2/src/reducer.c @@ -0,0 +1,140 @@ +#include "reducer.h" + +finalKeyValueDS *createFinalKeyValueNode(char *word, int count){ + finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS)); + strcpy(newNode -> key, word); + newNode -> value = count; + newNode -> next = NULL; + return newNode; +} + +finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){ + finalKeyValueDS *tempNode = root; + if(root == NULL) + return createFinalKeyValueNode(word, count); + while(tempNode -> next != NULL){ + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + return root; + } + tempNode = tempNode -> next; + } + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + } else{ + tempNode -> next = createFinalKeyValueNode(word, count); + } + return root; +} + +void freeFinalDS(finalKeyValueDS *root) { + if(root == NULL) return; + + finalKeyValueDS *tempNode = NULL; + while (root != NULL){ + tempNode = root; + root = root -> next; + free(tempNode); + } +} + +// reduce function +void reduce(char *key) { + + char* wordFileName = key; + int fd = open(wordFileName, O_RDONLY); + if (fd < 0){ + printf("ERROR: Cannot open the file %s\n", wordFileName); + exit(0); + } + + char buffer[MAXKEYSZ]; + char word[MAXKEYSZ]; + memset(word, '\0', sizeof(char) * MAXKEYSZ); + int j = 0; + int foundKey = 0; + while(read(fd, &buffer[j], 1) == 1) { + // read one word at a time + if (buffer[j] == ' ' ||buffer[j] == '\n' || buffer[j] == 0x0) { + buffer[j] = '\0'; + + // just a safety code to avoid empty files or blank lines + if(strlen(buffer) == 0){ + j = 0; + continue; + } + + int count; + if(!foundKey){ + // found the word + foundKey = 1; + strcpy(word, buffer); + count = 0; + } else { + // convert string count to integer count + count = strtol(buffer, NULL, 10); + } + finalDS = insertNewKeyValue(finalDS, word, count); + j = 0; + continue; + } + j++; + } + close(fd); +} + +void writeFinalDS(int reducerID){ + char reduceFileName[MAXKEYSZ]; + memset(reduceFileName, '\0', MAXKEYSZ); + sprintf(reduceFileName, "output/ReduceOut/Reduce_%d.txt", reducerID); + + int fdReduce = open(reduceFileName, O_CREAT | O_WRONLY, 0777); + if (fdReduce < 0){ + printf("ERROR: Cannot open the file %s\n", reduceFileName); + exit(0); + } + + finalKeyValueDS *tempNode = finalDS; + while(tempNode){ + char writeKeyValue[2 * MAXKEYSZ]; + memset(writeKeyValue, '\0', 2 * MAXKEYSZ); + strcpy(writeKeyValue, tempNode -> key); + strcat(writeKeyValue, " "); + char valueStr[MAXKEYSZ]; + memset(valueStr, '\0', MAXKEYSZ); + sprintf(valueStr, "%d", tempNode -> value); + strcat(writeKeyValue, valueStr); + writeKeyValue[strlen(writeKeyValue)] = '\n'; + + // word write + int ret = write(fdReduce, writeKeyValue, strlen(writeKeyValue)); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", reduceFileName); + exit(0); + } + tempNode = tempNode -> next; + } + close(fdReduce); +} + +int main(int argc, char *argv[]) { + + if(argc < 2){ + printf("Less number of arguments.\n"); + printf("./reducer reducerID"); + } + + // initialize + int reducerID = strtol(argv[1], NULL, 10); + finalDS = NULL; + + // master will continuously send the word.txt files alloted to the reducer + char key[MAXKEYSZ]; + while(getInterData(key, reducerID)) + reduce(key); + + writeFinalDS(reducerID); + + freeFinalDS(finalDS); + return 0; +}
\ No newline at end of file |