aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--P2/lib/utils.c65
1 files changed, 33 insertions, 32 deletions
diff --git a/P2/lib/utils.c b/P2/lib/utils.c
index 3b4f829..3ec7526 100644
--- a/P2/lib/utils.c
+++ b/P2/lib/utils.c
@@ -2,7 +2,8 @@
int openQueue() {
char cwd [50];
- getcwd(cwd, 50);
+ if(getcwd(cwd, 50) == NULL)
+ return NULL;
return msgget(ftok(cwd, 4061), 0666 | IPC_CREAT);
}
int closeQueue(int id) {
@@ -21,56 +22,51 @@ char *getChunkData(int mapperID) {
struct msgBuffer message = makeMessage();
//Queue ID
int mid = openQueue();
- //printf("MAPPER ID:%d\n", mapperID);
- msgrcv(mid, &message, MSGSIZE, mapperID, 0);
- // printf("\n%s\n", message.msgText);
- // printf("%d\n", strncmp("END", message.msgText, 3));
+ if (msgrcv(mid, &message, MSGSIZE, mapperID, 0) == -1)
+ exit(-1);
if (strncmp("END", message.msgText, 3) == 0)
return NULL;
- // char* value = message.msgText;
- // return value;
-
- // DEBUG! malloc a buffer/return
char* value = malloc(1024); // chunkSize or MSGSIZE?
strcpy(value, message.msgText);
return value;
- // Free memory outside of getChunkData?
-
- // printf("%s\n", message.msgText);
- //printf("RECEIVED CHUNK:%s\nRECEIVED VALUE:%ld\n", value, message.msgType);
-
- //return &(message.msgText);
}
// sends chunks of size 1024 to the mappers in RR fashion
void sendChunkData(char *inputFile, int nMappers) {
struct msgBuffer message = makeMessage();
// open message queue
- int msgid = openQueue();
- closeQueue(msgid);
+ int msgid;
+ if (msgid == NULL)
+ exit(-1);
+ if (closeQueue(msgid) == -1)
+ exit(-1);
msgid = openQueue();
- // DEBUG! Remove if already exists when opening queue for the first time
+ if (msgid == NULL)
+ exit(-1);
int map = 0;
FILE* file = fopen(inputFile, "r");
+ if (file == NULL)
+ exit(-1);
// construct chunks of 1024 bytes
while(fgets(message.msgText, chunkSize + 1, file) != NULL) {
int i = 1023;
while(validChar(message.msgText[i])) {
- message.msgText[i] = '\0';
- i--;
+ message.msgText[i--] = '\0';
+ }
+ if (fseek(file, (i - 1023), SEEK_CUR) == -1) {
+ break;
}
- // DEBUG!
-
- fseek(file, (i - 1023), SEEK_CUR);
message.msgType = (map++ % nMappers) + 1;
- //printf("SENT CHUNK: %s\nSENT CHUNK MAPPER: %ld\n",message.msgText, message.msgType);
- msgsnd(msgid, &message, MSGSIZE, 0);
+
+ if (msgsnd(msgid, &message, MSGSIZE, 0) == -1)
+ exit(-1);
}
for (int i = 1; i <= nMappers; i++) {
struct msgBuffer END = {i, "END"};
- msgsnd(msgid, &END, MSGSIZE, 0);
- }
+ if (msgsnd(msgid, &END, MSGSIZE, 0) == -1)
+ exit(-1);
+ }
fclose(file);
}
@@ -90,9 +86,11 @@ int getInterData(char *Qkey, int reducerID) {
struct msgBuffer message= makeMessage();
//DEBUG! make sure it work.
int id = openQueue();
- msgrcv(id, &message, MSGSIZE, reducerID, 0);
+ if (id == NULL)
+ exit(-1);
+ if (msgrcv(id, &message, MSGSIZE, reducerID, 0) == -1)
+ exit(-1);
strcpy(Qkey, message.msgText);
- printf("INTER DATA: %s\nREDUCER ID:%ld\n", Qkey, message.msgType);
return (strncmp("END", message.msgText, 3) != 0);
}
@@ -105,20 +103,23 @@ void shuffle(int nMappers, int nReducers) {
char newpath[100];
sprintf(newpath, "output/MapOut/Map_%d", i); // Removed /, add to current dir
DIR *dir = opendir(newpath);
+ if (dir == NULL)
+ break;
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
continue;
sprintf(message.msgText, "%s/%s", newpath, entry -> d_name);
- printf("%s\n%d\n", entry->d_name, hashFunction(entry->d_name, nReducers)+1);
message.msgType = (hashFunction(entry -> d_name, nReducers)+1);
- msgsnd(id, &message, MSGSIZE, 0);
+ if (msgsnd(id, &message, MSGSIZE, 0) == NULL)
+ exit(-1);
}
closedir(dir);
}
for (int i = 1; i <= nReducers; i++) {
struct msgBuffer END = {i, "END"};
- msgsnd(id, &END, MSGSIZE, 0);
+ if (msgsnd(id, &END, MSGSIZE, 0))
+ break;
}
}