aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--P2/lib/utils.c21
1 files changed, 16 insertions, 5 deletions
diff --git a/P2/lib/utils.c b/P2/lib/utils.c
index d1da70c..7bdf941 100644
--- a/P2/lib/utils.c
+++ b/P2/lib/utils.c
@@ -45,14 +45,17 @@ void sendChunkData(char *inputFile, int nMappers) {
while(validChar(message.msgText[i])) {
message.msgText[i--] = '\0';
}
- fseek(file, (i - 1023), SEEK_CUR);
+ if (fseek(file, (i - 1023), SEEK_CUR) == -1) {
+ break;
message.msgType = (map++ % nMappers) + 1;
- msgsnd(msgid, &message, MSGSIZE, 0);
+ if (msgsnd(msgid, &message, MSGSIZE, 0) == -1)
+ break;
}
for (int i = 1; i <= nMappers; i++) {
struct msgBuffer END = {i, "END"};
- msgsnd(msgid, &END, MSGSIZE, 0);
+ if (msgsnd(msgid, &END, MSGSIZE, 0) == -1)
+ break;
}
fclose(file);
}
@@ -73,7 +76,10 @@ int getInterData(char *Qkey, int reducerID) {
struct msgBuffer message= makeMessage();
//DEBUG! make sure it work.
int id = openQueue();
- msgrcv(id, &message, MSGSIZE, reducerID, 0);
+ if (id == -1)
+ exit(-1);
+ if (msgrcv(id, &message, MSGSIZE, reducerID, 0) == -1)
+ exit(-1);
strcpy(Qkey, message.msgText);
return (strncmp("END", message.msgText, 3) != 0);
}
@@ -85,19 +91,24 @@ void shuffle(int nMappers, int nReducers) {
char newpath[100];
sprintf(newpath, "output/MapOut/Map_%d", i); // Removed /, add to current dir
DIR *dir = opendir(newpath);
+ if (dir == NULL)
+ break;
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
continue;
sprintf(message.msgText, "%s/%s", newpath, entry -> d_name);
message.msgType = (hashFunction(entry -> d_name, nReducers)+1);
- msgsnd(id, &message, MSGSIZE, 0);
+ if (msgsnd(id, &message, MSGSIZE, 0) == -1)
+ break;
}
closedir(dir);
}
for (int i = 1; i <= nReducers; i++) {
struct msgBuffer END = {i, "END"};
msgsnd(id, &END, MSGSIZE, 0);
+ if (msgsnd(id, &END, MSGSIZE, 0))
+ break;
}
}