aboutsummaryrefslogtreecommitdiffstats
path: root/P1/src
diff options
context:
space:
mode:
Diffstat (limited to 'P1/src')
-rw-r--r--P1/src/mapper.c134
-rw-r--r--P1/src/mapreduce.c54
-rw-r--r--P1/src/reducer.c79
3 files changed, 267 insertions, 0 deletions
diff --git a/P1/src/mapper.c b/P1/src/mapper.c
new file mode 100644
index 0000000..66ac2ef
--- /dev/null
+++ b/P1/src/mapper.c
@@ -0,0 +1,134 @@
+#include "mapper.h"
+
+// combined value list corresponding to a word <1,1,1,1....>
+valueList *createNewValueListNode(char *value){
+ valueList *newNode = (valueList *)malloc (sizeof(valueList));
+ strcpy(newNode -> value, value);
+ newNode -> next = NULL;
+ return newNode;
+}
+
+// insert new count to value list
+valueList *insertNewValueToList(valueList *root, char *count){
+ valueList *tempNode = root;
+ if(root == NULL)
+ return createNewValueListNode(count);
+ while(tempNode -> next != NULL)
+ tempNode = tempNode -> next;
+ tempNode -> next = createNewValueListNode(count);
+ return root;
+}
+
+// free value list
+void freeValueList(valueList *root) {
+ if(root == NULL) return;
+
+ valueList *tempNode = root -> next;;
+ while (tempNode != NULL){
+ free(root);
+ root = tempNode;
+ tempNode = tempNode -> next;
+ }
+}
+
+// create <word, value list>
+intermediateDS *createNewInterDSNode(char *word, char *count){
+ intermediateDS *newNode = (intermediateDS *)malloc (sizeof(intermediateDS));
+ strcpy(newNode -> key, word);
+ newNode -> value = NULL;
+ newNode -> value = insertNewValueToList(newNode -> value, count);
+ newNode -> next = NULL;
+ return newNode;
+}
+
+// insert or update a <word, value> to intermediate DS
+intermediateDS *insertPairToInterDS(intermediateDS *root, char *word, char *count){
+ intermediateDS *tempNode = root;
+ if(root == NULL)
+ return createNewInterDSNode(word, count);
+ while(tempNode -> next != NULL) {
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value = insertNewValueToList(tempNode -> value, count);
+ return root;
+ }
+ tempNode = tempNode -> next;
+
+ }
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value = insertNewValueToList(tempNode -> value, count);
+ } else {
+ tempNode -> next = createNewInterDSNode(word, count);
+ }
+ return root;
+}
+
+// free the DS after usage. Call this once you are done with the writing of DS into file
+void freeInterDS(intermediateDS *root) {
+ if(root == NULL) return;
+
+ intermediateDS *tempNode = root -> next;;
+ while (tempNode != NULL){
+ freeValueList(root -> value);
+ free(root);
+ root = tempNode;
+ tempNode = tempNode -> next;
+ }
+}
+
+// emit the <key, value> into intermediate DS
+void emit(char *key, char *value) {
+
+}
+
+// map function
+void map(char *chunkData){
+
+ // you can use getWord to retrieve words from the
+ // chunkData one by one. Example usage in utils.h
+}
+
+// write intermediate data to separate word.txt files
+// Each file will have only one line : word 1 1 1 1 1 ...
+void writeIntermediateDS() {
+
+}
+
+int main(int argc, char *argv[]) {
+
+ if (argc < 2) {
+ printf("Less number of arguments.\n");
+ printf("./mapper mapperID\n");
+ exit(0);
+ }
+ // ###### DO NOT REMOVE ######
+ mapperID = strtol(argv[1], NULL, 10);
+
+ // ###### DO NOT REMOVE ######
+ // create folder specifically for this mapper in output/MapOut
+ // mapOutDir has the path to the folder where the outputs of
+ // this mapper should be stored
+ mapOutDir = createMapDir(mapperID);
+
+ // ###### DO NOT REMOVE ######
+ while(1) {
+ // create an array of chunkSize=1024B and intialize all
+ // elements with '\0'
+ char chunkData[chunkSize + 1]; // +1 for '\0'
+ memset(chunkData, '\0', chunkSize + 1);
+
+ char *retChunk = getChunkData(mapperID);
+ if(retChunk == NULL) {
+ break;
+ }
+
+ strcpy(chunkData, retChunk);
+ free(retChunk);
+
+ map(chunkData);
+ }
+
+ // ###### DO NOT REMOVE ######
+ writeIntermediateDS();
+
+ return 0;
+} \ No newline at end of file
diff --git a/P1/src/mapreduce.c b/P1/src/mapreduce.c
new file mode 100644
index 0000000..5b63f3f
--- /dev/null
+++ b/P1/src/mapreduce.c
@@ -0,0 +1,54 @@
+#include "mapreduce.h"
+
+int main(int argc, char *argv[]) {
+
+ if(argc < 4) {
+ printf("Less number of arguments.\n");
+ printf("./mapreduce #mappers #reducers inputFile\n");
+ exit(0);
+ }
+
+ // ###### DO NOT REMOVE ######
+ int nMappers = strtol(argv[1], NULL, 10);
+ int nReducers = strtol(argv[2], NULL, 10);
+ char *inputFile = argv[3];
+
+ // ###### DO NOT REMOVE ######
+ bookeepingCode();
+
+ // ###### DO NOT REMOVE ######
+ pid_t pid = fork();
+ if(pid == 0){
+ //send chunks of data to the mappers in RR fashion
+ sendChunkData(inputFile, nMappers);
+ exit(0);
+ }
+ sleep(1);
+
+
+ // To do
+ // spawn mappers processes and run 'mapper' executable using exec
+
+ // To do
+ // wait for all children to complete execution
+
+
+ // ###### DO NOT REMOVE ######
+ // shuffle sends the word.txt files generated by mapper
+ // to reducer based on a hash function
+ pid = fork();
+ if(pid == 0){
+ shuffle(nMappers, nReducers);
+ exit(0);
+ }
+ sleep(1);
+
+
+ // To do
+ // spawn reducer processes and run 'reducer' executable using exec
+
+ // To do
+ // wait for all children to complete execution
+
+ return 0;
+} \ No newline at end of file
diff --git a/P1/src/reducer.c b/P1/src/reducer.c
new file mode 100644
index 0000000..bdf093b
--- /dev/null
+++ b/P1/src/reducer.c
@@ -0,0 +1,79 @@
+#include "reducer.h"
+
+// create a key value node
+finalKeyValueDS *createFinalKeyValueNode(char *word, int count){
+ finalKeyValueDS *newNode = (finalKeyValueDS *)malloc (sizeof(finalKeyValueDS));
+ strcpy(newNode -> key, word);
+ newNode -> value = count;
+ newNode -> next = NULL;
+ return newNode;
+}
+
+// insert or update an key value
+finalKeyValueDS *insertNewKeyValue(finalKeyValueDS *root, char *word, int count){
+ finalKeyValueDS *tempNode = root;
+ if(root == NULL)
+ return createFinalKeyValueNode(word, count);
+ while(tempNode -> next != NULL){
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ return root;
+ }
+ tempNode = tempNode -> next;
+ }
+ if(strcmp(tempNode -> key, word) == 0){
+ tempNode -> value += count;
+ } else{
+ tempNode -> next = createFinalKeyValueNode(word, count);
+ }
+ return root;
+}
+
+// free the DS after usage. Call this once you are done with the writing of DS into file
+void freeFinalDS(finalKeyValueDS *root) {
+ if(root == NULL) return;
+
+ finalKeyValueDS *tempNode = root -> next;;
+ while (tempNode != NULL){
+ free(root);
+ root = tempNode;
+ tempNode = tempNode -> next;
+ }
+}
+
+// reduce function
+void reduce(char *key) {
+
+}
+
+// write the contents of the final intermediate structure
+// to output/ReduceOut/Reduce_reducerID.txt
+void writeFinalDS(int reducerID){
+
+}
+
+int main(int argc, char *argv[]) {
+
+ if(argc < 2){
+ printf("Less number of arguments.\n");
+ printf("./reducer reducerID");
+ }
+
+ // ###### DO NOT REMOVE ######
+ // initialize
+ int reducerID = strtol(argv[1], NULL, 10);
+
+ // ###### DO NOT REMOVE ######
+ // master will continuously send the word.txt files
+ // alloted to the reducer
+ char key[MAXKEYSZ];
+ while(getInterData(key, reducerID))
+ reduce(key);
+
+ // You may write this logic. You can somehow store the
+ // <key, value> count and write to Reduce_reducerID.txt file
+ // So you may delete this function and add your logic
+ writeFinalDS(reducerID);
+
+ return 0;
+} \ No newline at end of file