aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--P2/lib/utils.c53
1 files changed, 52 insertions, 1 deletions
diff --git a/P2/lib/utils.c b/P2/lib/utils.c
index 17b3104..4d815a0 100644
--- a/P2/lib/utils.c
+++ b/P2/lib/utils.c
@@ -1,10 +1,61 @@
#include "utils.h"
char *getChunkData(int mapperID) {
+ //Message
+ struct msgBuffer message;
+ //Queue ID, not sure what it actually does
+ int mid;
+ //Queue Key
+ key_t key = 10;
+ mid = msgget(key, 0666 | IPC_CREAT);
+ if (mid < 0) {
+ perror("Cannot open queue.\n");
+ return NULL;
+ }
+ msgrcv(mid, &message, MSGSIZE, mapperID, 0);
+ if (strcmp("END", message.msgText)) {
+ struct msgBuffer ACK = {mapperID, "ACK"};
+ msgsnd(mid, &ACK, MSGSIZE, 0);
+ }
+ // msgctl(mid, IPC_RMID, 0);
+ return message.msgText;
}
// sends chunks of size 1024 to the mappers in RR fashion
void sendChunkData(char *inputFile, int nMappers) {
+ struct msgBuffer message;
+ key_t key = 10;
+ int msgid;
+
+ // open message queue
+ msgid = msgget(key, 0666 | IPC_CREAT);
+ if (msgid < 0) {
+ perror("Cannot open queue.\n");
+ exit(-1);
+ }
+ // message.msgText = 1;
+ FILE *fptr = fopen(inputFile, "r");
+
+ // construct chunks of 1024 bytes
+ while(fgets(&message, chunkSize, fptr) != EOF) {
+
+ /* Go to the end of the chunk, check if final character
+ is a space if character is a space, do nothing
+ else cut off before that word and put back file */
+ // TODO! help
+
+ msgsnd(msgid, &message, mapperID);
+ }
+
+ for (long i = 1; i < nMappers; i++) {
+ struct msgBuffer END = {i, "END"};
+ msgsnd(msgid, &END, MSGSIZE, 0);
+
+ // TODO! does this need to be in another loop or is blocking good enough?
+ msgrcv(msgid, &message, MSGSIZE, i, 0);
+ }
+
+ // msgctl(msgid, IPC_RMID, 0); // close that bih
}
// hash function to divide the list of word.txt files across reducers
@@ -89,4 +140,4 @@ void bookeepingCode(){
removeOutputDir();
sleep(1);
createOutputDir();
-} \ No newline at end of file
+}