aboutsummaryrefslogtreecommitdiffstats
path: root/P2
diff options
context:
space:
mode:
Diffstat (limited to 'P2')
-rw-r--r--P2/include/utils.h4
-rw-r--r--P2/lib/utils.c26
-rw-r--r--P2/src/mapreduce.c3
3 files changed, 12 insertions, 21 deletions
diff --git a/P2/include/utils.h b/P2/include/utils.h
index 6fa4fa8..4f82435 100644
--- a/P2/include/utils.h
+++ b/P2/include/utils.h
@@ -24,8 +24,8 @@ struct msgBuffer {
//Open Queue as a function because writing this once is probably better than four times.
//Hopefully it works properly.
-int openQueue();
-void closeQueue();
+int openQueue(char* path);
+int closeQueue(int id);
// mapper side
int validChar(char c);
diff --git a/P2/lib/utils.c b/P2/lib/utils.c
index 56bd3cc..a565717 100644
--- a/P2/lib/utils.c
+++ b/P2/lib/utils.c
@@ -1,20 +1,10 @@
#include "utils.h"
-int openQueue() {
- int id = msgget(ftok("4061 Project 2 SS", 'S'), 0666 | IPC_CREAT);
- if (id < 0) {
- perror("Cannot open queue.\n");
- return -1;
- }
- return id;
+int openQueue(char* path) {
+ return msgget(ftok(path, 253), 0666 | IPC_CREAT);
}
-void closeQueue() {
- int msgid = msgget(ftok("4061 Project 2 SS", 'S'), 0666);
- if (msgid < 0) {
- perror("Cannot open queue. It may already exist.\n");
- exit(-1);
- }
- msgctl(msgid, IPC_RMID, NULL);
+int closeQueue(int id) {
+ return msgctl(id, IPC_RMID, NULL);
}
char *getChunkData(int mapperID) {
@@ -22,7 +12,7 @@ char *getChunkData(int mapperID) {
//Message
struct msgBuffer message;
//Queue ID
- int mid = openQueue();
+ int mid = openQueue("map");
msgrcv(mid, &message, sizeof(message.msgText), mapperID, 0);
if (strncmp("END", message.msgText, 3)) {
return NULL;
@@ -36,7 +26,7 @@ void sendChunkData(char *inputFile, int nMappers) {
printf("SENDING CHUNK DATA\n");
struct msgBuffer message;
// open message queue
- int msgid = openQueue();
+ int msgid = openQueue("map");
int map = 1;
// message.msgText = 1;
int fd = open(inputFile, O_RDONLY);
@@ -88,7 +78,7 @@ int hashFunction(char* Qkey, int reducers){
int getInterData(char *Qkey, int reducerID) {
struct msgBuffer message;
//make sure it work.
- int id = openQueue();
+ int id = openQueue("reduce");
msgrcv(id, &message, chunkSize, reducerID, 0);
Qkey = message.msgText;
return strncmp("END", message.msgText, 3);
@@ -103,7 +93,7 @@ int getInterData(char *Qkey, int reducerID) {
void shuffle(int nMappers, int nReducers) {
struct msgBuffer message;
//Once again, MAKE SURE THIS WORKS PROPERLY!
- int id = openQueue();
+ int id = openQueue("reduce");
for (int i = 1; i <= nMappers; i++) {
//Extra for loop traversing directory
//TODO: Actually traverse directory
diff --git a/P2/src/mapreduce.c b/P2/src/mapreduce.c
index 290c1c5..7f5e8c4 100644
--- a/P2/src/mapreduce.c
+++ b/P2/src/mapreduce.c
@@ -77,6 +77,7 @@ int main(int argc, char *argv[]) {
// wait for all children to complete execution
while (wait(&status) > 0);
- closeQueue();
+ closeQueue(openQueue("map"));
+ closeQueue(openQueue("reduce"));
return 0;
} \ No newline at end of file