aboutsummaryrefslogtreecommitdiffstats
path: root/P2/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--P2/src/mapper.c221
-rw-r--r--P2/src/mapreduce.c83
-rw-r--r--P2/src/reducer.c140
3 files changed, 444 insertions, 0 deletions
diff --git a/P2/src/mapper.c b/P2/src/mapper.c
new file mode 100644
index 0000000..469b32e
--- /dev/null
+++ b/P2/src/mapper.c
@@ -0,0 +1,221 @@
+#include "mapper.h"
+
+// combined value list corresponding to a word <1,1,1,1....>
+valueList *createNewValueListNode(char *value){
+ valueList *newNode = (valueList *)malloc (sizeof(valueList));
+ strcpy(newNode -> value, value);
+ newNode -> next = NULL;
+ return newNode;
+}
+
+// insert new count to value list
+valueList *insertNewValueToList(valueList *root, char *count){
+ valueList *tempNode = root;
+ if(root == NULL)
+ return createNewValueListNode(count);
+ while(tempNode -> next != NULL)
+ tempNode = tempNode -> next;
+ tempNode -> next = createNewValueListNode(count);
+ return root;
+}
+
+// free value list
+void freeValueList(valueList *root) {
+ if(root == NULL) return;
+
+ valueList *tempNode = NULL;
+ while (root != NULL){
+ tempNode = root;
+ root = root -> next;
+ free(tempNode);
+ }
+}
+
+// create <word, value list>
+intermediateDS *createNewInterDSNode(char *word, char *count){
+ intermediateDS *newNode = (intermediateDS *)malloc (sizeof(intermediateDS));
+ strcpy(newNode -> key, word);
+ newNode -> value = NULL;
+ newNode -> value = insertNewValueToList(newNode -> value, count);
+ newNode -> next = NULL;
+ return newNode;
+}
+
+// insert <word, value> to intermediate DS
+intermediateDS *insertPairToInterDS(intermediateDS *root, char *word, char *count){
+ intermediateDS *tempNode = root;
+ if(root == NULL)
+ return createNewInterDSNode(word, count);
+ while(tempNode -> next != NULL) {
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value = insertNewValueToList(tempNode -> value, count);
+ return root;
+ }
+ tempNode = tempNode -> next;
+
+ }
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value = insertNewValueToList(tempNode -> value, count);
+ } else {
+ tempNode -> next = createNewInterDSNode(word, count);
+ }
+ return root;
+}
+
+// free intermediate DS
+void freeInterDS(intermediateDS *root) {
+ if(root == NULL) return;
+
+ intermediateDS *tempNode = NULL;
+ while (root != NULL){
+ tempNode = root;
+ root = root -> next;
+ freeValueList(tempNode -> value);
+ free(tempNode);
+ }
+}
+
+// emit the <key, value> into intermediate DS
+void emit(char *key, char *value) {
+ // printf("To be emitted: %s, %s\n", (char *)key, value);
+ interDS = insertPairToInterDS(interDS, key, value);
+}
+
+// map function
+void map(char *chunkData){
+ char *cData = chunkData;
+ char *buffer;
+ int i = 0;
+
+ while((buffer = getWord(cData, &i)) != NULL){
+ emit(buffer, "1");
+ free(buffer);
+ buffer = NULL;
+ }
+}
+
+// generate the file name for word.txt
+char *generateWordFileName(char *word){
+ char fileName[MAXKEYSZ];
+ memset(fileName, '\0', MAXKEYSZ);
+ strcpy(fileName, word);
+ strcat(fileName, ".txt");
+ fileName[strlen(fileName)] = '\0';
+
+ char *wordFileName = (char*)malloc(sizeof(char) * (2 * MAXKEYSZ));
+ memset(wordFileName, '\0', (2 * MAXVALUESZ));
+ strcpy(wordFileName, mapOutDir);
+ strcat(wordFileName, "/");
+ strcat(wordFileName, fileName);
+ wordFileName[strlen(wordFileName)] = '\0';
+
+ return wordFileName;
+
+}
+
+// write intermediate data to separate word.txt files
+// Each file will have only one line <word 1 1 1 1 1 ...>
+void writeIntermediateDS() {
+ intermediateDS *travNode = interDS;
+
+ while(travNode != NULL) {
+ // create file word.txt
+ // content : word 1 1 1 1 1 1.....
+ char *wordFileName = generateWordFileName(travNode -> key);
+ int fd = open(wordFileName, O_CREAT | O_WRONLY, 0777);
+ if (fd < 0){
+ printf("ERROR: Cannot open the file %s\n", wordFileName);
+ exit(0);
+ }
+
+ // word write
+ int ret = write(fd, travNode -> key, strlen(travNode -> key));
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+
+ // append space
+ ret = write(fd, " ", 1);
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+
+ // append value list
+ valueList *tNode = travNode -> value;
+ while(tNode -> next != NULL){
+ ret = write(fd, tNode -> value, strlen(tNode -> value)); // space after the word
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+
+ // append space
+ ret = write(fd, " ", 1);
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+ tNode = tNode -> next;
+ }
+
+ // last value
+ ret = write(fd, tNode -> value, strlen(tNode -> value)); // space after the word
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+
+ // append newline
+ ret = write(fd, "\n", 1);
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", wordFileName);
+ exit(0);
+ }
+ //close file
+ close(fd);
+
+ // go to next word in intermediate DS
+ travNode = travNode -> next;
+ }
+}
+
+int main(int argc, char *argv[]) {
+
+ if (argc < 2) {
+ printf("Less number of arguments.\n");
+ printf("./mapper mapperID\n");
+ exit(0);
+ }
+
+ // initializing global variables
+ mapperID = strtol(argv[1], NULL, 10);
+ interDS = NULL;
+
+ //create folder specifically for this mapper in output/MapOut
+ mapOutDir = createMapDir(mapperID);
+
+
+ int count = 0;
+ while(1) {
+ char chunkData[chunkSize + 1];
+ memset(chunkData, '\0', chunkSize + 1);
+
+ char *retChunk = getChunkData(mapperID);
+ if(retChunk == NULL) {
+ break;
+ }
+ count++;
+ strcpy(chunkData, retChunk);
+ free(retChunk);
+
+ map(chunkData);
+ }
+
+ //student code
+ writeIntermediateDS();
+ freeInterDS(interDS);
+
+ return 0;
+} \ No newline at end of file
diff --git a/P2/src/mapreduce.c b/P2/src/mapreduce.c
new file mode 100644
index 0000000..c44adb6
--- /dev/null
+++ b/P2/src/mapreduce.c
@@ -0,0 +1,83 @@
+#include "mapreduce.h"
+
+// execute executables using execvp
+void execute(char **argv, int nProcesses){
+ pid_t pid;
+
+ int i;
+ for (i = 0; i < nProcesses; i++){
+ pid = fork();
+ if (pid < 0) {
+ printf("ERROR: forking child process failed\n");
+ exit(1);
+ } else if (pid == 0) {
+ char *processID = (char *) malloc(sizeof(char) * 5); // memory leak
+ sprintf(processID, "%d", i+1);
+ argv[1] = processID;
+ if (execvp(*argv, argv) < 0) {
+ printf("ERROR: exec failed\n");
+ exit(1);
+ }
+ }
+ }
+}
+
+int main(int argc, char *argv[]) {
+
+ if(argc < 4) {
+ printf("Less number of arguments.\n");
+ printf("./mapreduce #mappers #reducers inputFile\n");
+ exit(0);
+ }
+
+ int nMappers = strtol(argv[1], NULL, 10);
+ int nReducers = strtol(argv[2], NULL, 10);
+
+ if(nMappers < nReducers){
+ printf("ERROR: Number of mappers should be greater than or equal to number of reducers...\n");
+ exit(0);
+ }
+
+ if(nMappers == 0 || nReducers == 0){
+ printf("ERROR: Mapper and Reducer count should be grater than zero...\n");
+ exit(0);
+ }
+
+ char *inputFile = argv[3];
+
+ bookeepingCode();
+
+ int status;
+ pid_t pid = fork();
+ if(pid == 0){
+ //send chunks of data to the mappers in RR fashion
+ sendChunkData(inputFile, nMappers);
+ exit(0);
+ }
+ sleep(1);
+
+ // spawn mappers
+ char *mapperArgv[] = {"./mapper", NULL, NULL};
+ execute(mapperArgv, nMappers);
+
+ // wait for all children to complete execution
+ while (wait(&status) > 0);
+
+ // shuffle sends the word.txt files generated by mapper
+ // to reducer based on a hash function
+ pid = fork();
+ if(pid == 0){
+ shuffle(nMappers, nReducers);
+ exit(0);
+ }
+ sleep(1);
+
+ // spawn reducers
+ char *reducerArgv[] = {"./reducer", NULL, NULL};
+ execute(reducerArgv, nReducers);
+
+ // wait for all children to complete execution
+ while (wait(&status) > 0);
+
+ return 0;
+} \ No newline at end of file
diff --git a/P2/src/reducer.c b/P2/src/reducer.c
new file mode 100644
index 0000000..d62c2c2
--- /dev/null
+++ b/P2/src/reducer.c
@@ -0,0 +1,140 @@
+#include "reducer.h"
+
+finalKeyValueDS *createFinalKeyValueNode(char *word, int count){
+ finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS));
+ strcpy(newNode -> key, word);
+ newNode -> value = count;
+ newNode -> next = NULL;
+ return newNode;
+}
+
+finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){
+ finalKeyValueDS *tempNode = root;
+ if(root == NULL)
+ return createFinalKeyValueNode(word, count);
+ while(tempNode -> next != NULL){
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ return root;
+ }
+ tempNode = tempNode -> next;
+ }
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ } else{
+ tempNode -> next = createFinalKeyValueNode(word, count);
+ }
+ return root;
+}
+
+void freeFinalDS(finalKeyValueDS *root) {
+ if(root == NULL) return;
+
+ finalKeyValueDS *tempNode = NULL;
+ while (root != NULL){
+ tempNode = root;
+ root = root -> next;
+ free(tempNode);
+ }
+}
+
+// reduce function
+void reduce(char *key) {
+
+ char* wordFileName = key;
+ int fd = open(wordFileName, O_RDONLY);
+ if (fd < 0){
+ printf("ERROR: Cannot open the file %s\n", wordFileName);
+ exit(0);
+ }
+
+ char buffer[MAXKEYSZ];
+ char word[MAXKEYSZ];
+ memset(word, '\0', sizeof(char) * MAXKEYSZ);
+ int j = 0;
+ int foundKey = 0;
+ while(read(fd, &buffer[j], 1) == 1) {
+ // read one word at a time
+ if (buffer[j] == ' ' ||buffer[j] == '\n' || buffer[j] == 0x0) {
+ buffer[j] = '\0';
+
+ // just a safety code to avoid empty files or blank lines
+ if(strlen(buffer) == 0){
+ j = 0;
+ continue;
+ }
+
+ int count;
+ if(!foundKey){
+ // found the word
+ foundKey = 1;
+ strcpy(word, buffer);
+ count = 0;
+ } else {
+ // convert string count to integer count
+ count = strtol(buffer, NULL, 10);
+ }
+ finalDS = insertNewKeyValue(finalDS, word, count);
+ j = 0;
+ continue;
+ }
+ j++;
+ }
+ close(fd);
+}
+
+void writeFinalDS(int reducerID){
+ char reduceFileName[MAXKEYSZ];
+ memset(reduceFileName, '\0', MAXKEYSZ);
+ sprintf(reduceFileName, "output/ReduceOut/Reduce_%d.txt", reducerID);
+
+ int fdReduce = open(reduceFileName, O_CREAT | O_WRONLY, 0777);
+ if (fdReduce < 0){
+ printf("ERROR: Cannot open the file %s\n", reduceFileName);
+ exit(0);
+ }
+
+ finalKeyValueDS *tempNode = finalDS;
+ while(tempNode){
+ char writeKeyValue[2 * MAXKEYSZ];
+ memset(writeKeyValue, '\0', 2 * MAXKEYSZ);
+ strcpy(writeKeyValue, tempNode -> key);
+ strcat(writeKeyValue, " ");
+ char valueStr[MAXKEYSZ];
+ memset(valueStr, '\0', MAXKEYSZ);
+ sprintf(valueStr, "%d", tempNode -> value);
+ strcat(writeKeyValue, valueStr);
+ writeKeyValue[strlen(writeKeyValue)] = '\n';
+
+ // word write
+ int ret = write(fdReduce, writeKeyValue, strlen(writeKeyValue));
+ if(ret < 0){
+ printf("ERROR: Cannot write to file %s\n", reduceFileName);
+ exit(0);
+ }
+ tempNode = tempNode -> next;
+ }
+ close(fdReduce);
+}
+
+int main(int argc, char *argv[]) {
+
+ if(argc < 2){
+ printf("Less number of arguments.\n");
+ printf("./reducer reducerID");
+ }
+
+ // initialize
+ int reducerID = strtol(argv[1], NULL, 10);
+ finalDS = NULL;
+
+ // master will continuously send the word.txt files alloted to the reducer
+ char key[MAXKEYSZ];
+ while(getInterData(key, reducerID))
+ reduce(key);
+
+ writeFinalDS(reducerID);
+
+ freeFinalDS(finalDS);
+ return 0;
+} \ No newline at end of file