aboutsummaryrefslogtreecommitdiffstats
path: root/P2/src/reducer.c
diff options
context:
space:
mode:
Diffstat (limited to 'P2/src/reducer.c')
-rw-r--r--P2/src/reducer.c140
1 files changed, 140 insertions, 0 deletions
diff --git a/P2/src/reducer.c b/P2/src/reducer.c
new file mode 100644
index 0000000..d62c2c2
--- /dev/null
+++ b/P2/src/reducer.c
@@ -0,0 +1,140 @@
+#include "reducer.h"
+
+finalKeyValueDS *createFinalKeyValueNode(char *word, int count){
+ finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS));
+ strcpy(newNode -> key, word);
+ newNode -> value = count;
+ newNode -> next = NULL;
+ return newNode;
+}
+
+finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){
+ finalKeyValueDS *tempNode = root;
+ if(root == NULL)
+ return createFinalKeyValueNode(word, count);
+ while(tempNode -> next != NULL){
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ return root;
+ }
+ tempNode = tempNode -> next;
+ }
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ } else{
+ tempNode -> next = createFinalKeyValueNode(word, count);
+ }
+ return root;
+}
+
+void freeFinalDS(finalKeyValueDS *root) {
+ if(root == NULL) return;
+
+ finalKeyValueDS *tempNode = NULL;
+ while (root != NULL){
+ tempNode = root;
+ root = root -> next;
+ free(tempNode);
+ }
+}
+
+// reduce function
+void reduce(char *key) {
+
+ char* wordFileName = key;
+ int fd = open(wordFileName, O_RDONLY);
+ if (fd < 0){
+ printf("ERROR: Cannot open the file %s\n", wordFileName);
+ exit(0);
+ }
+
+ char buffer[MAXKEYSZ];
+ char word[MAXKEYSZ];
+ memset(word, '\0', sizeof(char) * MAXKEYSZ);
+ int j = 0;
+ int foundKey = 0;
+ while(read(fd, &buffer[j], 1) == 1) {
+ // read one word at a time
+ if (buffer[j] == ' ' ||buffer[j] == '\n' || buffer[j] == 0x0) {
+ buffer[j] = '\0';
+
+ // just a safety code to avoid empty files or blank lines
+ if(strlen(buffer) == 0){
+ j = 0;
+ continue;
+ }
+
+ int count;
+ if(!foundKey){
+ // found the word
+ foundKey = 1;
+ strcpy(word, buffer);
+ count = 0;
+ } else {
+ // convert string count to integer count
+ count = strtol(buffer, NULL, 10);
+ }
+ finalDS = insertNewKeyValue(finalDS, word, count);
+ j = 0;
+ continue;
+ }
+ j++;
+ }
+ close(fd);
+}
+
+void writeFinalDS(int reducerID){
+ char reduceFileName[MAXKEYSZ];
+ memset(reduceFileName, '\0', MAXKEYSZ);
+ sprintf(reduceFileName, "output/ReduceOut/Reduce_%d.txt", reducerID);
+
+ int fdReduce = open(reduceFileName, O_CREAT | O_WRONLY, 0777);
+ if (fdReduce < 0){
+ printf("ERROR: Cannot open the file %s\n", reduceFileName);
+ exit(0);
+ }
+
+ finalKeyValueDS *tempNode = finalDS;
+ while(tempNode){
+ char writeKeyValue[2 * MAXKEYSZ];
+ memset(writeKeyValue, '\0', 2 * MAXKEYSZ);
+ strcpy(writeKeyValue, tempNode -> key);
+ strcat(writeKeyValue, " ");
+ char valueStr[MAXKEYSZ];
+ memset(valueStr, '\0', MAXKEYSZ);
+ sprintf(valueStr, "%d", tempNode -> value);
+ strcat(writeKeyValue, valueStr);
+ writeKeyValue[strlen(writeKeyValue)] = '\n';
+
+ // word write
+ int ret = write(fdReduce, writeKeyValue, strlen(writeKeyValue));
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", reduceFileName);
+ exit(0);
+ }
+ tempNode = tempNode -> next;
+ }
+ close(fdReduce);
+}
+
+int main(int argc, char *argv[]) {
+
+ if(argc < 2){
+ printf("Less number of arguments.\n");
+ printf("./reducer reducerID");
+ }
+
+ // initialize
+ int reducerID = strtol(argv[1], NULL, 10);
+ finalDS = NULL;
+
+ // master will continuously send the word.txt files alloted to the reducer
+ char key[MAXKEYSZ];
+ while(getInterData(key, reducerID))
+ reduce(key);
+
+ writeFinalDS(reducerID);
+
+ freeFinalDS(finalDS);
+ return 0;
+} \ No newline at end of file