#include "comms.hpp" int Send_NonBlocking(int sockFD, BYTE *data, struct CONN_STAT *pStat, struct pollfd *pPeer) { while (pStat->nToDo > 0) { // pStat keeps tracks of how many bytes have been sent, allowing us to // "resume" when a previously non-writable socket becomes writable. ftime(&(pStat->lastTime)); int n = send(sockFD, data + pStat->messageLen - pStat->nToDo, pStat->nToDo, 0); // Log("Send_NonBlocking n: %d", n); if (n >= 0) { pStat->nToDo -= n; } else if (n < 0 && (errno == ECONNRESET || errno == EPIPE)) { // Log("Connection closed."); close(sockFD); return -1; } else if (n < 0 && (errno == EWOULDBLOCK)) { // The socket becomes non-writable. Exit now to prevent blocking. // OS will notify us when we can write // Log("Writing would block. Waiting for OS to notify send."); pPeer->events |= POLLWRNORM; return 0; } else { Error("Unexpected send error %d: %s", errno, strerror(errno)); } } // Log("Message was completely written out (messagelen was message written // out)"); Log("What's currently in there: nBuffered %d, messageLen: %d, nToDo // %d", pStat->nBuffered, pStat->messageLen, pStat->nToDo); memcpy(data, data + pStat->messageLen, BUF_LEN - pStat->messageLen); // Good memset(data + pStat->nBuffered - pStat->messageLen, 0, BUF_LEN - pStat->nBuffered + pStat->messageLen); pStat->nBuffered -= pStat->messageLen; if (pStat->nBuffered > 0) { // start off the next send if one is queued Header nextHeader; nextHeader.decode(data); // Log("Another message is queued of size 13 + %d", nextHeader.m_size); pStat->messageLen = HEADER_LEN + nextHeader.m_size; pStat->nToDo = HEADER_LEN + nextHeader.m_size; } else { // Log("No other messages are queued"); pStat->messageLen = 0; pStat->nToDo = 0; pPeer->events &= ~POLLWRNORM; // no more bytes to send? Stop listening for // the writable cue. } // Log("What's in there now: nBuffered %d, messageLen: %d, nToDo %d", // pStat->nBuffered, pStat->messageLen, pStat->nToDo); // Change the connection from a sender to a reciever (used by client) if (pStat->changeDirection && (pStat->nToDo == 0)) { // Log("Changing direction!: nBuffered: %d", pStat->nBuffered); pStat->direction = (pStat->direction == C2S) ? S2C : C2S; pStat->expectingHeader = true; pStat->nToDo = HEADER_LEN; pStat->changeDirection = false; pPeer->events &= ~POLLWRNORM; // stop listening for writable cue pPeer->events |= POLLRDNORM; // start listening for readable cue } return 0; } // TODO: Question: Why isn't the pPeer datastructure used? int Recv_NonBlocking(int sockFD, BYTE *data, struct CONN_STAT *pStat, struct pollfd *pPeer) { // pStat keeps tracks of how many bytes have been rcvd, allowing us to // "resume" when a previously non-readable socket becomes readable. Log("Recv: // pStat->nToDo %d",pStat->nToDo); while (pStat->nToDo > 0) { int n = recv(sockFD, data + pStat->nBuffered, pStat->nToDo, 0); // Log("Rev_NonBlocking recieved %d bytes", n); if (n > 0) { pStat->nBuffered += n; pStat->nToDo -= n; } else if (n == 0 || (n < 0 && errno == ECONNRESET)) { // Log("\nConnection closed. n = %d", n); close(sockFD); return -1; } else if (n < 0 && (errno == EWOULDBLOCK)) { // Log("\nWould block waiting for read..."); // The socket becomes non-readable. Exit now to prevent blocking. // OS will notify us when we can read return 0; } else { Error("Unexpected recv error %d: %s.", errno, strerror(errno)); } } return 0; } void SetNonBlockIO(int fd) { int val = fcntl(fd, F_GETFL, 0); if (fcntl(fd, F_SETFL, val | O_NONBLOCK) != 0) { Error("Cannot set nonblocking I/O."); } } Header::Header() { m_direction = C2S; m_flag = SUCCESS; m_recipient = PUB; m_trace = SIGN; m_command = INVALID; memset(m_name, 0, MAX_USERNAME_LEN + 1); m_size = 0; } // encode the message to the buffer void Header::encode(BYTE *buf) { memset(buf, 0, HEADER_LEN); int index = 0; BYTE dir_b = (BYTE)(m_direction << 7); BYTE flag_b = (BYTE)(m_flag << 6); BYTE recip_b = (BYTE)(m_recipient << 5); BYTE trace_b = (BYTE)(m_trace << 4); BYTE com_b = (BYTE)m_command & 0x0F; buf[index++] = dir_b | flag_b | recip_b | trace_b | com_b; memcpy(&buf[index], m_name, 8); index += 8; // NOTE: There was a problem encoding very large sizes. i2buf(m_size, &buf[index]); index += 4; } // decode a buffer into a message object void Header::decode(BYTE *buf) { int index = 0; m_direction = (enum Direction)((buf[index] >> 7) & 0x01); m_flag = (enum Flag)((buf[index] >> 6) & 0x01); m_recipient = (enum Recipient)((buf[index] >> 5) & 0x01); m_trace = (enum Trace)((buf[index] >> 4) & 0x01); m_command = (enum Command)(buf[index] & 0x0F); index += 1; for (int x = 0; x < MAX_USERNAME_LEN; x++) m_name[x] = buf[index++]; buf2i(&buf[index], m_size); index += 4; } void Header::setFlags(enum Direction direction, enum Flag flag, enum Recipient recipient, enum Trace trace) { m_direction = direction; m_flag = flag; m_recipient = recipient; m_trace = trace; } // Helper to print contents of class void Header::displayContents(bool tab = false) { Log("%sDirection: %s, Flag: %s, Recipient: %s, Trace: %s", tab ? "\t" : "", !m_direction ? "C2S" : "S2C", !m_flag ? "SUCCESS" : "FAIL", !m_recipient ? "PUB" : "PRIV", !m_trace ? "SIGN" : "ANNON"); Log("%sCommand: %s", tab ? "\t" : "", com2str(m_command)); char name[9] = {0}; memcpy(name, m_name, MAX_USERNAME_LEN); Log("%sUsername: |%s|", tab ? "\t" : "", name); Log("%sSize: %d", tab ? "\t" : "", m_size); }