#include "client.hpp" // Connection 0: Client to Server data communication #define C2S_DATA 0 // Connection 1: Server to Client data communication #define S2C_DATA 1 // Other: Temporary file transer #define C2S_SERVICE_FREQ 1.0 // seconds #define MAX_CONCURRENCY_LIMIT 1000 #define POLL_TIMEOUT 333 // poll timeout in ms int nConns; // number of active connections struct pollfd peers[MAX_CONCURRENCY_LIMIT + 1]; // sockets to be monitored by poll() struct CONN_STAT connStat[MAX_CONCURRENCY_LIMIT + 1]; // app-layer stats of the sockets BYTE *buf[MAX_CONCURRENCY_LIMIT + 1]; struct sockaddr_in serverAddr; int id; // Duplicate exists in server and client void RemoveConnection(int i) { close(peers[i].fd); if (i < nConns) { memmove(peers + i, peers + i + 1, (nConns - i) * sizeof(struct pollfd)); memmove(connStat + i, connStat + i + 1, (nConns - i) * sizeof(struct CONN_STAT)); free(buf[i]); memmove(buf + i, buf + i + 1, (nConns - i) * sizeof(BYTE *)); } nConns--; // Log("[%s] %d active connections after removal", SERVER_NAME, nConns); } int Send_Blocking(int sockFD, const BYTE *data, int len) { int nSent = 0; while (nSent < len) { int n = send(sockFD, data + nSent, len - nSent, 0); if (n >= 0) { nSent += n; } else if (n < 0 && (errno == ECONNRESET || errno == EPIPE)) { Log("Connection closed."); close(sockFD); return -1; } else { Error("Unexpected error %d: %s.", errno, strerror(errno)); } } return 0; } void DoClient(const char *svrIP, int svrPort, const char *fileName) { memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons((unsigned short)svrPort); inet_pton(AF_INET, svrIP, &serverAddr.sin_addr); // ignore the SIGPIPE signal that may crash the program in some corner cases signal(SIGPIPE, SIG_IGN); // Creating unique identifier for this instance // id = (int)(seconds since 1970) ^ (((int)(process id) & 0x0000FFFF) << 16) struct timeb currentTime; ftime(¤tTime); id = currentTime.time ^ (((int)getpid() & 0x0000FFFF) << 16); // Log("My identifier is %d (%08X)", id, id); // create the data communication sockets for (int i = 0; i < 2; i++) { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) Error("Cannot create initial sockets."); if (connect(fd, (const struct sockaddr *)&serverAddr, sizeof(serverAddr)) != 0) { Error("C2s cannot connect to server %s:%d.", svrIP, svrPort); } enum Direction direction; if (i == C2S_DATA) { direction = C2S; } else { direction = S2C; } // Notify the server this instance exists BYTE connectBuf[20] = {0}; Header header; header.setFlags(direction, SUCCESS, PUB, SIGN); header.m_command = CONNECT; memcpy(header.m_name, "--------", MAX_USERNAME_LEN); header.m_size = 4; header.encode(connectBuf); i2buf(id, connectBuf + HEADER_LEN); // header.displayContents(true); if (Send_Blocking(fd, connectBuf, HEADER_LEN + header.m_size) < 0) { Error("Cannot send CONNECT for connection %d", i); } SetNonBlockIO(fd); // TODO: Put this in somewhere nConns++; peers[i].fd = fd; peers[i].revents = 0; buf[i] = (BYTE *)malloc(BUF_LEN); memset(&connStat[i], 0, sizeof(struct CONN_STAT)); ftime(&connStat[i].lastTime); connStat[i].direction = direction; connStat[i].linkType = MESSAGE_LINK; connStat[i].nBuffered = 0; connStat[i].messageLen = 0; if (i == S2C_DATA) { // Log("S2C_DATA is setting POLLRDNORM"); connStat[i].expectingHeader = true; connStat[i].nToDo = HEADER_LEN; peers[i].events |= POLLRDNORM; // S2C reads } } // Client begins executing commands from the file. FILE *fp = fopen(fileName, "r"); if (!fp) Error("Failed to open the file: %s", fileName); char *new_line_pos; // Remove '\n' from end of lines char line[LINE_SIZE] = {0}; char disposableLine[LINE_SIZE] = {0}; char loginName[MAX_USERNAME_LEN + 1] = {0}; struct timeb wakeUpTime; // if current time > wakeUpTime, get a line from the file int flag, fd; bool endOfFile = false; // Wait a second for server to instanciate both connections. // Sometimes responses aren't sent since the s2c connection hasn't // finished connecting before the c2s connection sends a message. sleep(1); while (1) { ftime(¤tTime); double timeDiff = timeDifference(currentTime, wakeUpTime); if ((timeDiff > 0) && (!endOfFile)) { if (fgets(line, LINE_SIZE, fp) == NULL) { Log("End of command file."); endOfFile = true; goto DO_POLL; } new_line_pos = strchr(line, '\n'); if (new_line_pos != NULL) *new_line_pos = 0; // TODO: Parse the command here, append messages to buffers as necessary strncpy(disposableLine, line, LINE_SIZE); enum Command command = parse_command(disposableLine); strncpy(disposableLine, line, LINE_SIZE); Header header; char *tok, *rest; int offset, nbytes, filenameLen = 0; BYTE *stagingBuf = (BYTE *)malloc(BUF_LEN); switch (command) { case DELAY: wakeUpTime = currentTime; wakeUpTime.time += atoi(&line[6]); break; case REGISTER: header.setFlags(C2S, SUCCESS, PUB, SIGN); header.m_command = REGISTER; tok = strtok(disposableLine, " "); // REGISTER tok = strtok(NULL, " "); // [username] memcpy(header.m_name, tok, strlen(tok)); tok = strtok(NULL, " "); // [password] header.m_size = strlen(tok); header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], tok, strlen(tok)); prepareSend(C2S_DATA, header); break; case LOGIN: header.setFlags(C2S, SUCCESS, PUB, SIGN); header.m_command = LOGIN; tok = strtok(disposableLine, " "); // LOGIN tok = strtok(NULL, " "); // [username] memcpy(header.m_name, tok, strlen(tok)); tok = strtok(NULL, " "); // [password] header.m_size = strlen(tok); header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], tok, strlen(tok)); prepareSend(C2S_DATA, header); break; case LOGOUT: header.setFlags(C2S, SUCCESS, PUB, SIGN); header.m_command = LOGOUT; memcpy(header.m_name, "-logout-", 8); header.m_size = 0; header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); prepareSend(C2S_DATA, header); break; case LIST: header.setFlags(C2S, SUCCESS, PUB, SIGN); header.m_command = LIST; memcpy(header.m_name, "--list--", 8); header.m_size = 0; header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); prepareSend(C2S_DATA, header); break; case SEND: case SENDA: header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed if (command == SEND) header.m_trace = SIGN; else header.m_trace = ANNON; header.m_command = command; memcpy(header.m_name, "--send--", 8); tok = strtok_r(disposableLine, " ", &rest); // SEND[A] header.m_size = strlen(rest); // Everything after 'SEND[A] ' header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], rest, header.m_size); prepareSend(C2S_DATA, header); break; case SEND2: case SENDA2: header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed if (command == SEND2) header.m_trace = SIGN; else header.m_trace = ANNON; header.m_command = command; tok = strtok_r(disposableLine, " ", &rest); // SEND2[A] tok = strtok_r(rest, " ", &rest); // [username] memcpy(header.m_name, tok, strlen(tok)); header.m_size = strlen(rest); // Everything after 'SEND2[A] [username] ' header.encode(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered]); memcpy(&buf[C2S_DATA][connStat[C2S_DATA].nBuffered + HEADER_LEN], rest, header.m_size); prepareSend(C2S_DATA, header); break; case SENDF: case SENDF2: header.setFlags(C2S, SUCCESS, PUB, SIGN); // Will update trace as needed header.m_command = command; if (command == SENDF) { header.m_recipient = PUB; memcpy(header.m_name, "--senf--", 8); tok = strtok(disposableLine, " "); // SENDF tok = strtok(NULL, " "); // [file name] } else { // SENDF2 header.m_recipient = PRIV; tok = strtok(disposableLine, " "); // SENDF2 tok = strtok(NULL, " "); // [username] memcpy(header.m_name, tok, strlen(tok)); tok = strtok(NULL, " "); // [file name] } offset = 0; i2buf(id, &stagingBuf[offset]); // stagingBuf skips header offset += 4; // unique id filenameLen = strlen(tok); i2buf(filenameLen, &stagingBuf[offset]); offset += 4; // int length of file name memcpy(&stagingBuf[offset], tok, strlen(tok)); offset += strlen(tok); // file name nbytes = file2buf(tok, &stagingBuf[offset + 4]); if (nbytes == -1) break; i2buf(nbytes, &stagingBuf[offset]); offset += 4; // file size int offset += nbytes; // file length (offest now tells how many bytes to // copy into buf) header.m_size = offset; // Make a new connection if (nConns >= MAX_CONCURRENCY_LIMIT) { Log("Cannot perform file transfer. Maximum concurency reached."); } else { fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) { Log("Cannot create new socket."); } else if (connect(fd, (const struct sockaddr *)&serverAddr, sizeof(serverAddr)) != 0) { Log("Cannot connect to server for file transfer"); } else { if (command == SENDF) Log("[%s] Sending %s to everyone currently logged in.", NAME, tok); else Log("[%s] Sending %s to %s.", NAME, tok, header.m_name); SetNonBlockIO(fd); nConns++; peers[nConns - 1].fd = fd; peers[nConns - 1].revents = 0; buf[nConns - 1] = (BYTE *)malloc(BUF_LEN); header.encode(buf[nConns - 1]); memcpy(&buf[nConns - 1][HEADER_LEN], stagingBuf, header.m_size); memset(&connStat[nConns - 1], 0, sizeof(struct CONN_STAT)); ftime(&connStat[nConns - 1].lastTime); connStat[nConns - 1].direction = C2S; connStat[nConns - 1].linkType = FILE_LINK; // TODO: Kill the connection?? prepareSend(nConns - 1, header); } } break; default: Log("Cannot execute %s command: %s", com2str(command), line); } free(stagingBuf); } DO_POLL: int nReady = poll(peers, nConns, POLL_TIMEOUT); // Log("nReady %d", nReady); if (nConns == 0) { Log("[%s] The server has gone offline.", NAME); return; } for (int i = 0; i < nConns; i++) { // Handle reading data (s2c) if (peers[i].revents & (POLLRDNORM | POLLERR | POLLHUP)) { // Log("Recieving nonblocking!"); if (connStat[i].direction == S2C) { if (Recv_NonBlocking(peers[i].fd, buf[i], &connStat[i], &peers[i]) < 0) { // Log("Line %d: Removing connection %d", __LINE__, i); RemoveConnection(i); goto NEXT_CONNECTION; } if (connStat[i].nToDo == 0) { if (processReception(i) != 0) { // Log("Line %d: Removing connection %d", __LINE__, i); RemoveConnection(i); } goto NEXT_CONNECTION; } } } // Handle sending data (c2s) if (peers[i].revents & (POLLWRNORM | POLLERR | POLLHUP)) { // Log("Sending nonblocking!"); if (connStat[i].direction == C2S) { if (Send_NonBlocking(peers[i].fd, buf[i], &connStat[i], &peers[i]) < 0) { // Log("Line %d: Removing connection %d", __LINE__, i); RemoveConnection(i); goto NEXT_CONNECTION; } } } // Check that the C2S connection is still open by sending a PING packet if ((connStat[i].direction == C2S) && (timeDifference(currentTime, connStat[i].lastTime) > C2S_SERVICE_FREQ) && (connStat[i].nToDo == 0) && (connStat[i].messageLen == 0) && (connStat[i].nBuffered == 0)) { // Log("[%s] Pinging server on connection %d due to timeout", // SERVER_NAME, i); Header header; header.setFlags(C2S, SUCCESS, PRIV, SIGN); memcpy(header.m_name, "-client-", MAX_USERNAME_LEN); header.m_command = PING; header.m_size = 0; header.encode(&buf[i][connStat[i].messageLen]); connStat[i].nBuffered = HEADER_LEN + header.m_size; connStat[i].messageLen = HEADER_LEN + header.m_size; connStat[i].nToDo = HEADER_LEN + header.m_size; peers[i].events |= POLLWRNORM; } NEXT_CONNECTION: asm("nop"); } } // TODO: Close all open sockets fclose(fp); } void prepareSend(int i, Header header) { if (connStat[i].nBuffered == 0) { connStat[i].messageLen = HEADER_LEN + header.m_size; connStat[i].nToDo = HEADER_LEN + header.m_size; } connStat[i].nBuffered += HEADER_LEN + header.m_size; peers[i].events |= POLLWRNORM; } int processReception(int i) { Header header; header.decode(buf[i]); if (connStat[i] .expectingHeader) { // Expecting a header read sequence was completed // Log("\nProcessing header reception."); // header.displayContents(true); if (header.m_size > 0) { connStat[i].expectingHeader = false; connStat[i].nToDo = header.m_size; } else if (header.m_size == 0) { // printMessage(i); doClientCommand(i); connStat[i].expectingHeader = true; connStat[i].nBuffered = 0; connStat[i].nToDo = HEADER_LEN; memset(buf[i], 0, BUF_LEN); } else { return -1; // Error, signal to caller to end connection } } else { // expecting a data read sequence was completed // Log("\nProcessing data reception."); // printMessage(i); doClientCommand(i); connStat[i].expectingHeader = true; connStat[i].nBuffered = 0; connStat[i].nToDo = HEADER_LEN; memset(buf[i], 0, BUF_LEN); } if (connStat[i].shouldClose) { // Log("[%s] Line %d: Removing connection %d due to shouldClose", SERVER_NAME, // __LINE__, i); RemoveConnection(i); } return 0; } void doClientCommand(int i) { Header header; header.decode(buf[i]); int file_id; int fd; int offset; char filename[5000]; int filenameLen, fileLen; switch (header.m_command) { case REGISTER: case LOGIN: case LOGOUT: case LIST: Log("[%s] %s", header.m_name, &buf[i][HEADER_LEN]); break; case SEND: case SENDA: Log("[%s] %s", header.m_name, &buf[i][HEADER_LEN]); break; case SEND2: case SENDA2: Log("<%s> %s", header.m_name, &buf[i][HEADER_LEN]); break; case SENDF: case SENDF2: // "[person] File transfer: " // First 4 bytes are file id buf2i(&buf[i][HEADER_LEN], file_id); if (header.m_command == SENDF) Log("[%s] Initiating file transfer: %s", header.m_name, &buf[i][HEADER_LEN + 4]); else Log("<%s> Initiating file transfer: %s", header.m_name, &buf[i][HEADER_LEN + 4]); // Open new S2C connection with server for file transfer fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) Log("[%s] Warning: Cannot create file transfer socket.", NAME); else if (connect(fd, (const struct sockaddr *)&serverAddr, sizeof(serverAddr)) != 0) Log("[%s] Warning: File transfer cannot connect to server", NAME); else { // Usual setup SetNonBlockIO(fd); if (nConns < MAX_CONCURRENCY_LIMIT) { nConns++; peers[nConns - 1].fd = fd; peers[nConns - 1].revents = 0; buf[nConns - 1] = (BYTE *)malloc(BUF_LEN); memset(&connStat[nConns - 1], 0, sizeof(struct CONN_STAT)); ftime(&connStat[nConns - 1].lastTime); connStat[nConns - 1].direction = C2S; // will flip on completion of this send connStat[nConns - 1].linkType = FILE_LINK; connStat[nConns - 1].changeDirection = true; Header outHeader; outHeader.setFlags(S2C, SUCCESS, PUB, SIGN); // S2C used by server to set direction if (header.m_command == SENDF2) outHeader.m_recipient = PRIV; outHeader.m_command = GETF; memcpy(outHeader.m_name, header.m_name, MAX_USERNAME_LEN); // pass along the name of the original sender outHeader.m_size = 4 + header.m_size; // [client id] [file id] [file name] outHeader.encode(buf[nConns - 1]); offset = HEADER_LEN; i2buf(id, buf[nConns - 1] + offset); // encode client id offset += 4; memcpy(&buf[nConns - 1][offset], &buf[i][HEADER_LEN], header.m_size); // encode file id and file name offset += header.m_size; prepareSend(nConns - 1, outHeader); } else Log("[%s] Error: Cannot perform file transfer. Max concurrency " "reached.", NAME); } break; case GETF: if (header.m_flag == SUCCESS) { buf2i(&buf[i][HEADER_LEN], filenameLen); memcpy(filename, &buf[i][HEADER_LEN + 4], filenameLen); buf2i(&buf[i][HEADER_LEN + 4 + filenameLen], fileLen); buf2file(&buf[i][HEADER_LEN + 4 + filenameLen + 4], fileLen, filename); if (header.m_recipient == PUB) Log("[%s] File transfer complete: %s", header.m_name, filename); else Log("<%s> File transfer complete: %s", header.m_name, filename); } else { if (header.m_recipient == PUB) Log("[%s] File transfer failed.", header.m_name); else Log("<%s> File transfer failed.", header.m_name); } connStat[i].shouldClose = true; break; case PING: // Log("Ping from %s", header.m_name); asm("nop"); // do nothing break; default: Log("[%s] No doClientCommand() for %s", NAME, com2str(header.m_command)); } } void printMessage(int i) { Header h; h.decode(buf[i]); Log("[%s, %s, %s, %s] [%s] [%s] [%d]", !h.m_direction ? "C2S" : "S2C", !h.m_flag ? "SUCCESS" : "FAIL", !h.m_recipient ? "PUB" : "PRIV", !h.m_trace ? "SIGN" : "ANNON", com2str(h.m_command), h.m_name, h.m_size); if (h.m_size > 0) Log("\tData: [%s]", buf[i] + HEADER_LEN); } int main(int argc, char **argv) { if (argc != 4) { Log("Usage: %s [server IP] [server Port] [command script name]", argv[0]); return -1; } const char *serverIP = argv[1]; int port = atoi(argv[2]); const char *fileName = argv[3]; srand(time(NULL)); DoClient(serverIP, port, fileName); return 0; }