diff options
Diffstat (limited to '')
-rw-r--r-- | P2/src/reducer.c | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/P2/src/reducer.c b/P2/src/reducer.c new file mode 100644 index 0000000..d62c2c2 --- /dev/null +++ b/P2/src/reducer.c @@ -0,0 +1,140 @@ +#include "reducer.h" + +finalKeyValueDS *createFinalKeyValueNode(char *word, int count){ + finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS)); + strcpy(newNode -> key, word); + newNode -> value = count; + newNode -> next = NULL; + return newNode; +} + +finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){ + finalKeyValueDS *tempNode = root; + if(root == NULL) + return createFinalKeyValueNode(word, count); + while(tempNode -> next != NULL){ + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + return root; + } + tempNode = tempNode -> next; + } + if(strcmp(tempNode -> key, word) == 0){ + tempNode -> value += count; + } else{ + tempNode -> next = createFinalKeyValueNode(word, count); + } + return root; +} + +void freeFinalDS(finalKeyValueDS *root) { + if(root == NULL) return; + + finalKeyValueDS *tempNode = NULL; + while (root != NULL){ + tempNode = root; + root = root -> next; + free(tempNode); + } +} + +// reduce function +void reduce(char *key) { + + char* wordFileName = key; + int fd = open(wordFileName, O_RDONLY); + if (fd < 0){ + printf("ERROR: Cannot open the file %s\n", wordFileName); + exit(0); + } + + char buffer[MAXKEYSZ]; + char word[MAXKEYSZ]; + memset(word, '\0', sizeof(char) * MAXKEYSZ); + int j = 0; + int foundKey = 0; + while(read(fd, &buffer[j], 1) == 1) { + // read one word at a time + if (buffer[j] == ' ' ||buffer[j] == '\n' || buffer[j] == 0x0) { + buffer[j] = '\0'; + + // just a safety code to avoid empty files or blank lines + if(strlen(buffer) == 0){ + j = 0; + continue; + } + + int count; + if(!foundKey){ + // found the word + foundKey = 1; + strcpy(word, buffer); + count = 0; + } else { + // convert string count to integer count + count = strtol(buffer, NULL, 10); + } + finalDS = insertNewKeyValue(finalDS, word, count); + j = 0; + continue; + } + j++; + } + close(fd); +} + +void writeFinalDS(int reducerID){ + char reduceFileName[MAXKEYSZ]; + memset(reduceFileName, '\0', MAXKEYSZ); + sprintf(reduceFileName, "output/ReduceOut/Reduce_%d.txt", reducerID); + + int fdReduce = open(reduceFileName, O_CREAT | O_WRONLY, 0777); + if (fdReduce < 0){ + printf("ERROR: Cannot open the file %s\n", reduceFileName); + exit(0); + } + + finalKeyValueDS *tempNode = finalDS; + while(tempNode){ + char writeKeyValue[2 * MAXKEYSZ]; + memset(writeKeyValue, '\0', 2 * MAXKEYSZ); + strcpy(writeKeyValue, tempNode -> key); + strcat(writeKeyValue, " "); + char valueStr[MAXKEYSZ]; + memset(valueStr, '\0', MAXKEYSZ); + sprintf(valueStr, "%d", tempNode -> value); + strcat(writeKeyValue, valueStr); + writeKeyValue[strlen(writeKeyValue)] = '\n'; + + // word write + int ret = write(fdReduce, writeKeyValue, strlen(writeKeyValue)); + if(ret < 0){ + printf("ERROR: Cannot write to file %s\n", reduceFileName); + exit(0); + } + tempNode = tempNode -> next; + } + close(fdReduce); +} + +int main(int argc, char *argv[]) { + + if(argc < 2){ + printf("Less number of arguments.\n"); + printf("./reducer reducerID"); + } + + // initialize + int reducerID = strtol(argv[1], NULL, 10); + finalDS = NULL; + + // master will continuously send the word.txt files alloted to the reducer + char key[MAXKEYSZ]; + while(getInterData(key, reducerID)) + reduce(key); + + writeFinalDS(reducerID); + + freeFinalDS(finalDS); + return 0; +}
\ No newline at end of file |