aboutsummaryrefslogtreecommitdiffstats
path: root/src/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/client.cpp591
1 files changed, 591 insertions, 0 deletions
diff --git a/src/client.cpp b/src/client.cpp
new file mode 100644
index 0000000..baf2215
--- /dev/null
+++ b/src/client.cpp
@@ -0,0 +1,591 @@
+#include "client.hpp"
+
+// Connection 0: Client to Server data communication
+#define C2S_DATA 0
+// Connection 1: Server to Client data communication
+#define S2C_DATA 1
+// Other: Temporary file transer
+
+#define C2S_SERVICE_FREQ 1.0 // seconds
+#define MAX_CONCURRENCY_LIMIT 1000
+#define POLL_TIMEOUT 333 // poll timeout in ms
+
+int nConns; // number of active connections
+struct pollfd
+ peers[MAX_CONCURRENCY_LIMIT + 1]; // sockets to be monitored by poll()
+struct CONN_STAT
+ connStat[MAX_CONCURRENCY_LIMIT + 1]; // app-layer stats of the sockets
+BYTE *buf[MAX_CONCURRENCY_LIMIT + 1];
+
+struct sockaddr_in serverAddr;
+int id;
+
+// Duplicate exists in server and client
+void RemoveConnection(int i) {
+ close(peers[i].fd);
+ if (i < nConns) {
+ memmove(peers + i, peers + i + 1, (nConns - i) * sizeof(struct pollfd));
+ memmove(connStat + i, connStat + i + 1,
+ (nConns - i) * sizeof(struct CONN_STAT));
+ free(buf[i]);
+ memmove(buf + i, buf + i + 1, (nConns - i) * sizeof(BYTE *));
+ }
+ nConns--;
+ // Log("[%s] %d active connections after removal", SERVER_NAME, nConns);
+}
+
+int Send_Blocking(int sockFD, const BYTE *data, int len) {
+ int nSent = 0;
+ while (nSent < len) {
+ int n = send(sockFD, data + nSent, len - nSent, 0);
+ if (n >= 0) {
+ nSent += n;
+ } else if (n < 0 && (errno == ECONNRESET || errno == EPIPE)) {
+ Log("Connection closed.");
+ close(sockFD);
+ return -1;
+ } else {
+ Error("Unexpected error %d: %s.", errno, strerror(errno));
+ }
+ }
+ return 0;
+}
+
+void DoClient(const char *svrIP, int svrPort, const char *fileName) {
+ memset(&serverAddr, 0, sizeof(serverAddr));
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_port = htons((unsigned short)svrPort);
+ inet_pton(AF_INET, svrIP, &serverAddr.sin_addr);
+
+ // ignore the SIGPIPE signal that may crash the program in some corner cases
+ signal(SIGPIPE, SIG_IGN);
+
+ // Creating unique identifier for this instance
+ // id = (int)(seconds since 1970) ^ (((int)(process id) & 0x0000FFFF) << 16)
+ struct timeb currentTime;
+ ftime(&currentTime);
+ id = currentTime.time ^ (((int)getpid() & 0x0000FFFF) << 16);
+ // Log("My identifier is %d (%08X)", id, id);
+
+ // create the data communication sockets
+ for (int i = 0; i < 2; i++) {
+ int fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == -1)
+ Error("Cannot create initial sockets.");
+
+ if (connect(fd, (const struct sockaddr *)&serverAddr, sizeof(serverAddr)) !=
+ 0) {
+ Error("C2s cannot connect to server %s:%d.", svrIP, svrPort);
+ }
+
+ enum Direction direction;
+ if (i == C2S_DATA) {
+ direction = C2S;
+ } else {
+ direction = S2C;
+ }
+
+ // Notify the server this instance exists
+ BYTE connectBuf[20] = {0};
+ Header header;
+ header.setFlags(direction, SUCCESS, PUB, SIGN);
+ header.m_command = CONNECT;
+ memcpy(header.m_name, "--------", MAX_USERNAME_LEN);
+ header.m_size = 4;
+ header.encode(connectBuf);
+ i2buf(id, connectBuf + HEADER_LEN);
+ // header.displayContents(true);
+
+ if (Send_Blocking(fd, connectBuf, HEADER_LEN + header.m_size) < 0) {
+ Error("Cannot send CONNECT for connection %d", i);
+ }
+
+ SetNonBlockIO(fd); // TODO: Put this in somewhere
+
+ nConns++;
+ peers[i].fd = fd;
+ peers[i].revents = 0;
+
+ buf[i] = (BYTE *)malloc(BUF_LEN);
+
+ memset(&connStat[i], 0, sizeof(struct CONN_STAT));
+ ftime(&connStat[i].lastTime);
+ connStat[i].direction = direction;
+ connStat[i].linkType = MESSAGE_LINK;
+ connStat[i].nBuffered = 0;
+ connStat[i].messageLen = 0;
+
+ if (i == S2C_DATA) {
+ // Log("S2C_DATA is setting POLLRDNORM");
+ connStat[i].expectingHeader = true;
+ connStat[i].nToDo = HEADER_LEN;
+ peers[i].events |= POLLRDNORM; // S2C reads
+ }
+ }
+
+ // Client begins executing commands from the file.
+ FILE *fp = fopen(fileName, "r");
+ if (!fp)
+ Error("Failed to open the file: %s", fileName);
+
+ char *new_line_pos; // Remove '\n' from end of lines
+ char line[LINE_SIZE] = {0};
+ char disposableLine[LINE_SIZE] = {0};
+ char loginName[MAX_USERNAME_LEN + 1] = {0};
+ struct timeb
+ wakeUpTime; // if current time > wakeUpTime, get a line from the file
+ int flag, fd;
+ bool endOfFile = false;
+
+ // Wait a second for server to instanciate both connections.
+ // Sometimes responses aren't sent since the s2c connection hasn't
+ // finished connecting before the c2s connection sends a message.
+ sleep(1);
+
+ while (1) {
+ ftime(&currentTime);
+
+ double timeDiff = timeDifference(currentTime, wakeUpTime);
+ if ((timeDiff > 0) && (!endOfFile)) {
+ if (fgets(line, LINE_SIZE, fp) == NULL) {
+ Log("End of command file.");
+ endOfFile = true;
+ goto DO_POLL;
+ }
+
+ new_line_pos = strchr(line, '\n');
+ if (new_line_pos != NULL)
+ *new_line_pos = 0;
+
+ // TODO: Parse the command here, append messages to buffers as necessary
+ strncpy(disposableLine, line, LINE_SIZE);
+ enum Command command = parse_command(disposableLine);
+ strncpy(disposableLine, line, LINE_SIZE);
+ Header header;
+ char *tok, *rest;
+ int offset, nbytes, filenameLen = 0;
+ BYTE *stagingBuf = (BYTE *)malloc(BUF_LEN);
+
+ switch (command) {
+ case DELAY:
+ wakeUpTime = currentTime;
+ wakeUpTime.time += atoi(&line[6]);
+ break;
+ case REGISTER:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN);
+ header.m_command = REGISTER;
+ tok = strtok(disposableLine, " "); // REGISTER
+ tok = strtok(NULL, " "); // [username]
+ memcpy(header.m_name, tok, strlen(tok));
+ tok = strtok(NULL, " "); // [password]
+ header.m_size = strlen(tok);
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], tok,
+ strlen(tok));
+ prepareSend(C2S_DATA, header);
+ break;
+ case LOGIN:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN);
+ header.m_command = LOGIN;
+ tok = strtok(disposableLine, " "); // LOGIN
+ tok = strtok(NULL, " "); // [username]
+ memcpy(header.m_name, tok, strlen(tok));
+ tok = strtok(NULL, " "); // [password]
+ header.m_size = strlen(tok);
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], tok,
+ strlen(tok));
+ prepareSend(C2S_DATA, header);
+ break;
+ case LOGOUT:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN);
+ header.m_command = LOGOUT;
+ memcpy(header.m_name, "-logout-", 8);
+ header.m_size = 0;
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ prepareSend(C2S_DATA, header);
+ break;
+ case LIST:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN);
+ header.m_command = LIST;
+ memcpy(header.m_name, "--list--", 8);
+ header.m_size = 0;
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ prepareSend(C2S_DATA, header);
+ break;
+ case SEND:
+ case SENDA:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed
+ if (command == SEND)
+ header.m_trace = SIGN;
+ else
+ header.m_trace = ANNON;
+ header.m_command = command;
+ memcpy(header.m_name, "--send--", 8);
+ tok = strtok_r(disposableLine, " ", &rest); // SEND[A]
+ header.m_size = strlen(rest); // Everything after 'SEND[A] '
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], rest,
+ header.m_size);
+ prepareSend(C2S_DATA, header);
+ break;
+ case SEND2:
+ case SENDA2:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed
+ if (command == SEND2)
+ header.m_trace = SIGN;
+ else
+ header.m_trace = ANNON;
+ header.m_command = command;
+ tok = strtok_r(disposableLine, " ", &rest); // SEND2[A]
+ tok = strtok_r(rest, " ", &rest); // [username]
+ memcpy(header.m_name, tok, strlen(tok));
+ header.m_size = strlen(rest); // Everything after 'SEND2[A] [username] '
+ header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]);
+ memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], rest,
+ header.m_size);
+ prepareSend(C2S_DATA, header);
+ break;
+ case SENDF:
+ case SENDF2:
+ header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed
+ header.m_command = command;
+ if (command == SENDF) {
+ header.m_recipient = PUB;
+ memcpy(header.m_name, "--senf--", 8);
+ tok = strtok(disposableLine, " "); // SENDF
+ tok = strtok(NULL, " "); // [file name]
+ } else { // SENDF2
+ header.m_recipient = PRIV;
+ tok = strtok(disposableLine, " "); // SENDF2
+ tok = strtok(NULL, " "); // [username]
+ memcpy(header.m_name, tok, strlen(tok));
+ tok = strtok(NULL, " "); // [file name]
+ }
+
+ offset = 0;
+ i2buf(id, &stagingBuf[offset]); // stagingBuf skips header
+ offset += 4; // unique id
+ filenameLen = strlen(tok);
+ i2buf(filenameLen, &stagingBuf[offset]);
+ offset += 4; // int length of file name
+
+ memcpy(&stagingBuf[offset], tok, strlen(tok));
+ offset += strlen(tok); // file name
+
+ nbytes = file2buf(tok, &stagingBuf[offset + 4]);
+ if (nbytes == -1)
+ break;
+ i2buf(nbytes, &stagingBuf[offset]);
+ offset += 4; // file size int
+ offset += nbytes; // file length (offest now tells how many bytes to
+ // copy into buf)
+
+ header.m_size = offset;
+ // Make a new connection
+ if (nConns >= MAX_CONCURRENCY_LIMIT) {
+ Log("Cannot perform file transfer. Maximum concurency reached.");
+ } else {
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == -1) {
+ Log("Cannot create new socket.");
+ } else if (connect(fd, (const struct sockaddr *)&serverAddr,
+ sizeof(serverAddr)) != 0) {
+ Log("Cannot connect to server for file transfer");
+ } else {
+ if (command == SENDF)
+ Log("[%s] Sending %s to everyone currently logged in.",
+ NAME, tok);
+ else
+ Log("[%s] Sending %s to %s.", NAME, tok, header.m_name);
+
+ SetNonBlockIO(fd);
+ nConns++;
+ peers[nConns - 1].fd = fd;
+ peers[nConns - 1].revents = 0;
+
+ buf[nConns - 1] = (BYTE *)malloc(BUF_LEN);
+ header.encode(buf[nConns - 1]);
+ memcpy(&buf[nConns - 1][HEADER_LEN], stagingBuf, header.m_size);
+
+ memset(&connStat[nConns - 1], 0, sizeof(struct CONN_STAT));
+ ftime(&connStat[nConns - 1].lastTime);
+ connStat[nConns - 1].direction = C2S;
+ connStat[nConns - 1].linkType = FILE_LINK;
+ // TODO: Kill the connection??
+ prepareSend(nConns - 1, header);
+ }
+ }
+ break;
+ default:
+ Log("Cannot execute %s command: %s", com2str(command), line);
+ }
+ free(stagingBuf);
+ }
+
+ DO_POLL:
+ int nReady = poll(peers, nConns, POLL_TIMEOUT);
+ // Log("nReady %d", nReady);
+
+ if (nConns == 0) {
+ Log("[%s] The server has gone offline.", NAME);
+ return;
+ }
+
+ for (int i = 0; i < nConns; i++) {
+ // Handle reading data (s2c)
+ if (peers[i].revents & (POLLRDNORM | POLLERR | POLLHUP)) {
+ // Log("Recieving nonblocking!");
+ if (connStat[i].direction == S2C) {
+ if (Recv_NonBlocking(peers[i].fd, buf[i], &connStat[i], &peers[i]) <
+ 0) {
+ // Log("Line %d: Removing connection %d", __LINE__, i);
+ RemoveConnection(i);
+ goto NEXT_CONNECTION;
+ }
+
+ if (connStat[i].nToDo == 0) {
+ if (processReception(i) != 0) {
+ // Log("Line %d: Removing connection %d", __LINE__, i);
+ RemoveConnection(i);
+ }
+ goto NEXT_CONNECTION;
+ }
+ }
+ }
+
+ // Handle sending data (c2s)
+ if (peers[i].revents & (POLLWRNORM | POLLERR | POLLHUP)) {
+ // Log("Sending nonblocking!");
+ if (connStat[i].direction == C2S) {
+ if (Send_NonBlocking(peers[i].fd, buf[i], &connStat[i], &peers[i]) <
+ 0) {
+ // Log("Line %d: Removing connection %d", __LINE__, i);
+ RemoveConnection(i);
+ goto NEXT_CONNECTION;
+ }
+ }
+ }
+
+ // Check that the C2S connection is still open by sending a PING packet
+ if ((connStat[i].direction == C2S) &&
+ (timeDifference(currentTime, connStat[i].lastTime) >
+ C2S_SERVICE_FREQ) &&
+ (connStat[i].nToDo == 0) && (connStat[i].messageLen == 0) &&
+ (connStat[i].nBuffered == 0)) {
+ // Log("[%s] Pinging server on connection %d due to timeout",
+ // SERVER_NAME, i);
+ Header header;
+ header.setFlags(C2S, SUCCESS, PRIV, SIGN);
+ memcpy(header.m_name, "-client-", MAX_USERNAME_LEN);
+ header.m_command = PING;
+ header.m_size = 0;
+
+ header.encode(&buf[i][connStat[i].messageLen]);
+ connStat[i].nBuffered = HEADER_LEN + header.m_size;
+ connStat[i].messageLen = HEADER_LEN + header.m_size;
+ connStat[i].nToDo = HEADER_LEN + header.m_size;
+
+ peers[i].events |= POLLWRNORM;
+ }
+
+ NEXT_CONNECTION:
+ asm("nop");
+ }
+ }
+
+ // TODO: Close all open sockets
+ fclose(fp);
+}
+
+void prepareSend(int i, Header header) {
+ if (connStat[i].nBuffered == 0) {
+ connStat[i].messageLen = HEADER_LEN + header.m_size;
+ connStat[i].nToDo = HEADER_LEN + header.m_size;
+ }
+ connStat[i].nBuffered += HEADER_LEN + header.m_size;
+ peers[i].events |= POLLWRNORM;
+}
+
+int processReception(int i) {
+ Header header;
+ header.decode(buf[i]);
+
+ if (connStat[i]
+ .expectingHeader) { // Expecting a header read sequence was completed
+ // Log("\nProcessing header reception.");
+ // header.displayContents(true);
+ if (header.m_size > 0) {
+ connStat[i].expectingHeader = false;
+ connStat[i].nToDo = header.m_size;
+ } else if (header.m_size == 0) {
+ // printMessage(i);
+ doClientCommand(i);
+ connStat[i].expectingHeader = true;
+ connStat[i].nBuffered = 0;
+ connStat[i].nToDo = HEADER_LEN;
+ memset(buf[i], 0, BUF_LEN);
+ } else {
+ return -1; // Error, signal to caller to end connection
+ }
+ } else { // expecting a data read sequence was completed
+ // Log("\nProcessing data reception.");
+ // printMessage(i);
+ doClientCommand(i);
+ connStat[i].expectingHeader = true;
+ connStat[i].nBuffered = 0;
+ connStat[i].nToDo = HEADER_LEN;
+ memset(buf[i], 0, BUF_LEN);
+ }
+
+ if (connStat[i].shouldClose) {
+ // Log("[%s] Line %d: Removing connection %d due to shouldClose", SERVER_NAME,
+ // __LINE__, i);
+ RemoveConnection(i);
+ }
+
+ return 0;
+}
+
+void doClientCommand(int i) {
+ Header header;
+ header.decode(buf[i]);
+
+ int file_id;
+ int fd;
+ int offset;
+ char filename[5000];
+ int filenameLen, fileLen;
+
+ switch (header.m_command) {
+ case REGISTER:
+ case LOGIN:
+ case LOGOUT:
+ case LIST:
+ Log("[%s] %s", header.m_name, &buf[i][HEADER_LEN]);
+ break;
+ case SEND:
+ case SENDA:
+ Log("[%s] %s", header.m_name, &buf[i][HEADER_LEN]);
+ break;
+ case SEND2:
+ case SENDA2:
+ Log("<%s> %s", header.m_name, &buf[i][HEADER_LEN]);
+ break;
+ case SENDF:
+ case SENDF2:
+ // "[person] File transfer: <file name>"
+ // First 4 bytes are file id
+ buf2i(&buf[i][HEADER_LEN], file_id);
+ if (header.m_command == SENDF)
+ Log("[%s] Initiating file transfer: %s", header.m_name,
+ &buf[i][HEADER_LEN + 4]);
+ else
+ Log("<%s> Initiating file transfer: %s", header.m_name,
+ &buf[i][HEADER_LEN + 4]);
+ // Open new S2C connection with server for file transfer
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == -1)
+ Log("[%s] Warning: Cannot create file transfer socket.", NAME);
+ else if (connect(fd, (const struct sockaddr *)&serverAddr,
+ sizeof(serverAddr)) != 0)
+ Log("[%s] Warning: File transfer cannot connect to server", NAME);
+ else {
+ // Usual setup
+ SetNonBlockIO(fd);
+
+ if (nConns < MAX_CONCURRENCY_LIMIT) {
+ nConns++;
+ peers[nConns - 1].fd = fd;
+ peers[nConns - 1].revents = 0;
+
+ buf[nConns - 1] = (BYTE *)malloc(BUF_LEN);
+
+ memset(&connStat[nConns - 1], 0, sizeof(struct CONN_STAT));
+ ftime(&connStat[nConns - 1].lastTime);
+ connStat[nConns - 1].direction =
+ C2S; // will flip on completion of this send
+ connStat[nConns - 1].linkType = FILE_LINK;
+ connStat[nConns - 1].changeDirection = true;
+
+ Header outHeader;
+ outHeader.setFlags(S2C, SUCCESS, PUB,
+ SIGN); // S2C used by server to set direction
+ if (header.m_command == SENDF2)
+ outHeader.m_recipient = PRIV;
+ outHeader.m_command = GETF;
+ memcpy(outHeader.m_name, header.m_name,
+ MAX_USERNAME_LEN); // pass along the name of the original sender
+ outHeader.m_size =
+ 4 + header.m_size; // [client id] [file id] [file name]
+ outHeader.encode(buf[nConns - 1]);
+ offset = HEADER_LEN;
+ i2buf(id, buf[nConns - 1] + offset); // encode client id
+ offset += 4;
+ memcpy(&buf[nConns - 1][offset], &buf[i][HEADER_LEN],
+ header.m_size); // encode file id and file name
+ offset += header.m_size;
+
+ prepareSend(nConns - 1, outHeader);
+ } else
+ Log("[%s] Error: Cannot perform file transfer. Max concurrency "
+ "reached.",
+ NAME);
+ }
+ break;
+ case GETF:
+ if (header.m_flag == SUCCESS) {
+ buf2i(&buf[i][HEADER_LEN], filenameLen);
+ memcpy(filename, &buf[i][HEADER_LEN + 4], filenameLen);
+ buf2i(&buf[i][HEADER_LEN + 4 + filenameLen], fileLen);
+ buf2file(&buf[i][HEADER_LEN + 4 + filenameLen + 4], fileLen, filename);
+ if (header.m_recipient == PUB)
+ Log("[%s] File transfer complete: %s", header.m_name, filename);
+ else
+ Log("<%s> File transfer complete: %s", header.m_name, filename);
+ } else {
+ if (header.m_recipient == PUB)
+ Log("[%s] File transfer failed.", header.m_name);
+ else
+ Log("<%s> File transfer failed.", header.m_name);
+ }
+ connStat[i].shouldClose = true;
+ break;
+ case PING:
+ // Log("Ping from %s", header.m_name);
+ asm("nop"); // do nothing
+ break;
+ default:
+ Log("[%s] No doClientCommand() for %s", NAME,
+ com2str(header.m_command));
+ }
+}
+
+void printMessage(int i) {
+ Header h;
+ h.decode(buf[i]);
+
+ Log("[%s, %s, %s, %s] [%s] [%s] [%d]", !h.m_direction ? "C2S" : "S2C",
+ !h.m_flag ? "SUCCESS" : "FAIL", !h.m_recipient ? "PUB" : "PRIV",
+ !h.m_trace ? "SIGN" : "ANNON", com2str(h.m_command), h.m_name, h.m_size);
+
+ if (h.m_size > 0)
+ Log("\tData: [%s]", buf[i] + HEADER_LEN);
+}
+
+int main(int argc, char **argv) {
+
+ if (argc != 4) {
+ Log("Usage: %s [server IP] [server Port] [command script name]", argv[0]);
+ return -1;
+ }
+
+ const char *serverIP = argv[1];
+ int port = atoi(argv[2]);
+ const char *fileName = argv[3];
+
+ srand(time(NULL));
+
+ DoClient(serverIP, port, fileName);
+ return 0;
+}