diff options
author | Matt Strapp <strap012@umn.edu> | 2020-09-24 07:24:19 -0500 |
---|---|---|
committer | Matt Strapp <strap012@umn.edu> | 2020-09-24 07:24:19 -0500 |
commit | 7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221 (patch) | |
tree | 73142dd8d25f71467fc0fa602e3caaf0f6b8b34f /P1/src | |
parent | Add project 1 files (diff) | |
download | csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar.gz csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar.bz2 csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar.lz csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar.xz csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.tar.zst csci4061-7f2f7f7c3e5d4c626d7e8d4d45fee2b11b6d5221.zip |
rearrange
Diffstat (limited to 'P1/src')
-rw-r--r-- | P1/src/mapper.c | 134 | ||||
-rw-r--r-- | P1/src/mapreduce.c | 54 | ||||
-rw-r--r-- | P1/src/reducer.c | 79 |
3 files changed, 267 insertions, 0 deletions
diff --git a/P1/src/mapper.c b/P1/src/mapper.c new file mode 100644 index 0000000..66ac2ef --- /dev/null +++ b/P1/src/mapper.c @@ -0,0 +1,134 @@ +#include "mapper.h" + +// combined value list corresponding to a word <1,1,1,1....> +valueList *createNewValueListNode(char *value){ + valueList *newNode = (valueList *)malloc (sizeof(valueList)); + strcpy(newNode -> value, value); + newNode -> next = NULL; + return newNode; +} + +// insert new count to value list +valueList *insertNewValueToList(valueList *root, char *count){ + valueList *tempNode = root; + if(root == NULL) + return createNewValueListNode(count); + while(tempNode -> next != NULL) + tempNode = tempNode -> next; + tempNode -> next = createNewValueListNode(count); + return root; +} + +// free value list +void freeValueList(valueList *root) { + if(root == NULL) return; + + valueList *tempNode = root -> next;; + while (tempNode != NULL){ + free(root); + root = tempNode; + tempNode = tempNode -> next; + } +} + +// create <word, value list> +intermediateDS *createNewInterDSNode(char *word, char *count){ + intermediateDS *newNode = (intermediateDS *)malloc (sizeof(intermediateDS)); + strcpy(newNode -> key, word); + newNode -> value = NULL; + newNode -> value = insertNewValueToList(newNode -> value, count); + newNode -> next = NULL; + return newNode; +} + +// insert or update a <word, value> to intermediate DS +intermediateDS *insertPairToInterDS(intermediateDS *root, char *word, char *count){ + intermediateDS *tempNode = root; + if(root == NULL) + return createNewInterDSNode(word, count); + while(tempNode -> next != NULL) { + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value = insertNewValueToList(tempNode -> value, count); + return root; + } + tempNode = tempNode -> next; + + } + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value = insertNewValueToList(tempNode -> value, count); + } else { + tempNode -> next = createNewInterDSNode(word, count); + } + return root; +} + +// free the DS after usage. Call this once you are done with the writing of DS into file +void freeInterDS(intermediateDS *root) { + if(root == NULL) return; + + intermediateDS *tempNode = root -> next;; + while (tempNode != NULL){ + freeValueList(root -> value); + free(root); + root = tempNode; + tempNode = tempNode -> next; + } +} + +// emit the <key, value> into intermediate DS +void emit(char *key, char *value) { + +} + +// map function +void map(char *chunkData){ + + // you can use getWord to retrieve words from the + // chunkData one by one. Example usage in utils.h +} + +// write intermediate data to separate word.txt files +// Each file will have only one line : word 1 1 1 1 1 ... +void writeIntermediateDS() { + +} + +int main(int argc, char *argv[]) { + + if (argc < 2) { + printf("Less number of arguments.\n"); + printf("./mapper mapperID\n"); + exit(0); + } + // ###### DO NOT REMOVE ###### + mapperID = strtol(argv[1], NULL, 10); + + // ###### DO NOT REMOVE ###### + // create folder specifically for this mapper in output/MapOut + // mapOutDir has the path to the folder where the outputs of + // this mapper should be stored + mapOutDir = createMapDir(mapperID); + + // ###### DO NOT REMOVE ###### + while(1) { + // create an array of chunkSize=1024B and intialize all + // elements with '\0' + char chunkData[chunkSize + 1]; // +1 for '\0' + memset(chunkData, '\0', chunkSize + 1); + + char *retChunk = getChunkData(mapperID); + if(retChunk == NULL) { + break; + } + + strcpy(chunkData, retChunk); + free(retChunk); + + map(chunkData); + } + + // ###### DO NOT REMOVE ###### + writeIntermediateDS(); + + return 0; +}
\ No newline at end of file diff --git a/P1/src/mapreduce.c b/P1/src/mapreduce.c new file mode 100644 index 0000000..5b63f3f --- /dev/null +++ b/P1/src/mapreduce.c @@ -0,0 +1,54 @@ +#include "mapreduce.h" + +int main(int argc, char *argv[]) { + + if(argc < 4) { + printf("Less number of arguments.\n"); + printf("./mapreduce #mappers #reducers inputFile\n"); + exit(0); + } + + // ###### DO NOT REMOVE ###### + int nMappers = strtol(argv[1], NULL, 10); + int nReducers = strtol(argv[2], NULL, 10); + char *inputFile = argv[3]; + + // ###### DO NOT REMOVE ###### + bookeepingCode(); + + // ###### DO NOT REMOVE ###### + pid_t pid = fork(); + if(pid == 0){ + //send chunks of data to the mappers in RR fashion + sendChunkData(inputFile, nMappers); + exit(0); + } + sleep(1); + + + // To do + // spawn mappers processes and run 'mapper' executable using exec + + // To do + // wait for all children to complete execution + + + // ###### DO NOT REMOVE ###### + // shuffle sends the word.txt files generated by mapper + // to reducer based on a hash function + pid = fork(); + if(pid == 0){ + shuffle(nMappers, nReducers); + exit(0); + } + sleep(1); + + + // To do + // spawn reducer processes and run 'reducer' executable using exec + + // To do + // wait for all children to complete execution + + return 0; +}
\ No newline at end of file diff --git a/P1/src/reducer.c b/P1/src/reducer.c new file mode 100644 index 0000000..bdf093b --- /dev/null +++ b/P1/src/reducer.c @@ -0,0 +1,79 @@ +#include "reducer.h" + +// create a key value node +finalKeyValueDS *createFinalKeyValueNode(char *word, int count){ + finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS)); + strcpy(newNode -> key, word); + newNode -> value = count; + newNode -> next = NULL; + return newNode; +} + +// insert or update an key value +finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){ + finalKeyValueDS *tempNode = root; + if(root == NULL) + return createFinalKeyValueNode(word, count); + while(tempNode -> next != NULL){ + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + return root; + } + tempNode = tempNode -> next; + } + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + } else{ + tempNode -> next = createFinalKeyValueNode(word, count); + } + return root; +} + +// free the DS after usage. Call this once you are done with the writing of DS into file +void freeFinalDS(finalKeyValueDS *root) { + if(root == NULL) return; + + finalKeyValueDS *tempNode = root -> next;; + while (tempNode != NULL){ + free(root); + root = tempNode; + tempNode = tempNode -> next; + } +} + +// reduce function +void reduce(char *key) { + +} + +// write the contents of the final intermediate structure +// to output/ReduceOut/Reduce_reducerID.txt +void writeFinalDS(int reducerID){ + +} + +int main(int argc, char *argv[]) { + + if(argc < 2){ + printf("Less number of arguments.\n"); + printf("./reducer reducerID"); + } + + // ###### DO NOT REMOVE ###### + // initialize + int reducerID = strtol(argv[1], NULL, 10); + + // ###### DO NOT REMOVE ###### + // master will continuously send the word.txt files + // alloted to the reducer + char key[MAXKEYSZ]; + while(getInterData(key, reducerID)) + reduce(key); + + // You may write this logic. You can somehow store the + // <key, value> count and write to Reduce_reducerID.txt file + // So you may delete this function and add your logic + writeFinalDS(reducerID); + + return 0; +}
\ No newline at end of file |