diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0dce1a5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +bin/ +output.log diff --git a/servers/coap_pit.c b/servers/coap_pit.c index 4fdcf4f..c06d731 100644 --- a/servers/coap_pit.c +++ b/servers/coap_pit.c @@ -25,6 +25,13 @@ #define MAX_BUF_LEN 1024 #define SERVER_ID "CoAP" +static volatile sig_atomic_t keepRunning = 1; + +void handleSignal(int sig) { + (void)sig; + keepRunning = 0; +} + struct coapClient *clients = NULL; int port = 5683; @@ -59,7 +66,7 @@ int sendCoapBlockResponse(uint16_t messageId, uint8_t* token, uint8_t tkl, uint3 uint32_t block_opt_value = (*blockNumber << 4) | (0b1 << 3) | 0x02; uint8_t block_len = (block_opt_value <= 0xFF) ? 1 : (block_opt_value <= 0xFFFF) ? 2 : 3; - + int payloadLength = 5; int responseLength = 4 // base CoAP header + tkl // token length @@ -69,7 +76,7 @@ int sendCoapBlockResponse(uint16_t messageId, uint8_t* token, uint8_t tkl, uint3 + 1 // payload marker + payloadLength; // actual payload char response[responseLength]; - + // Version (1) | Type (CON) | TKL response[0] = (0b01 << 6) | (0b0 << 4) | (tkl & 0b1111);; // class (2) | detail (5). Content response @@ -122,6 +129,8 @@ int sendPing(uint16_t messageId, struct sockaddr_in* addr, socklen_t addrLen) { int main(int argc, char* argv[]) { setbuf(stdout, NULL); + int timeout = 1000; + // testing // char msg[256]; // snprintf(msg, sizeof(msg), "%s connect %s\n", @@ -172,14 +181,23 @@ int main(int argc, char* argv[]) { pollFd.fd = sockFd; pollFd.events = POLLIN; - while (1) { + struct sigaction sa; + sa.sa_handler = handleSignal; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + signal(SIGPIPE, SIG_IGN); + + while (keepRunning) { long long now = currentTimeMs(); + timeout = 1000; while (clientQueueCoap.size > 0) { if(clientQueueCoap.heapArray[0]->sendNext <= now){ struct baseClient *bc = heap_pop(&clientQueueCoap); struct coapClient *c = (struct coapClient *)bc; - + // Handle retransmits if(!c->receivedAck || !c->receivedRst) { if(c->retransmits < MAX_RETRANSMIT) { @@ -212,8 +230,8 @@ int main(int argc, char* argv[]) { deleteClient(c); continue; } - } - + } + if (c->receivedGet) { sendCoapBlockResponse(c->messageId, c->token, c->tkl, &c->blockNumber, &c->clientAddr, c->addrLen); c->blockNumber += 1; @@ -221,8 +239,8 @@ int main(int argc, char* argv[]) { } else if (c->receivedRst) { sendPing(c->messageId, &c->clientAddr, c->addrLen); c->receivedRst = false; - } - + } + c->base.timeConnected += delay; c->messageId += 1; c->base.sendNext = now + delay; @@ -236,6 +254,7 @@ int main(int argc, char* argv[]) { int pollResult = poll(&pollFd, 1, timeout); now = currentTimeMs(); if (pollResult < 0) { + if (errno == EINTR) continue; fprintf(stderr, "Poll error with error %s", strerror(errno)); continue; } @@ -269,7 +288,7 @@ int main(int argc, char* argv[]) { printf(" TKL : %u\n", tkl); printf(" Code : 0x%02X (Class: %u, Detail: %u)\n", code, class, detail); printf(" Msg ID : %u\n", msgId); - + // Token (if any) printf(" Token : "); for (int i = 0; i < tkl; i++) { @@ -284,7 +303,7 @@ int main(int argc, char* argv[]) { // Malformed request. Send 4.00 Bad Request uint8_t response[4]; uint8_t resp_type = (type == TYPE_CONFIRMABLE) ? TYPE_ACK : TYPE_NON_CONFIRMABLE; - + response[0] = (0b01 << 6) | (resp_type << 4) | 0; // Ver=1, Type=ACK/NON, TKL=0 response[1] = (0b100 << 5) | 0b0; // Code 4.00 (Bad Request) response[2] = msgId >> 8; @@ -293,7 +312,7 @@ int main(int argc, char* argv[]) { sendto(sockFd, response, resp_len, 0, (struct sockaddr *)&clientAddr, addrLen); continue; - } + } else if (version != 1){ // Must be silently ignored continue; @@ -302,7 +321,7 @@ int main(int argc, char* argv[]) { } // TODO: Ignore extended methods (send "method not allowed" response) - // TODO: Handle requests while the client is still receiving blocks. + // TODO: Handle requests while the client is still receiving blocks. struct coapClient* client = findExistingClient(&clientAddr); if(client == NULL) { client = malloc(sizeof(struct coapClient)); @@ -332,7 +351,7 @@ int main(int argc, char* argv[]) { printf("%s", msg); sendMetric(msg); } - + if (type == TYPE_RST) { client->receivedRst = true; } @@ -343,9 +362,9 @@ int main(int argc, char* argv[]) { else if (class == CLASS_REQUEST && detail == DETAIL_GET) { printf("GET request from %s of type %d with tkl=%d and msgId1=%u\n", inet_ntoa(clientAddr.sin_addr), type, tkl, msgId); client->receivedGet = true; - } + } - // If a CON (Confirmable) request, first send seperate ACK response. + // If a CON (Confirmable) request, first send seperate ACK response. // The response does not need to be confirmable. (5.2.2 and 5.2.3) if (type == TYPE_CONFIRMABLE) { uint8_t ack[4]; @@ -359,7 +378,19 @@ int main(int argc, char* argv[]) { } } } + printf("Shutting down CoAP server gracefully\n"); + while (clientQueueCoap.size > 0) { + struct baseClient *bc = heap_pop(&clientQueueCoap); + struct coapClient *c = (struct coapClient *)bc; + char msg[256]; + snprintf(msg, sizeof(msg), "%s disconnect %s %lld\n", + SERVER_ID, c->base.ipaddr, c->base.timeConnected); + printf("%s", msg); + sendMetric(msg); + deleteClient(c); + } close(sockFd); + printf("CoAP server shut down successfully\n"); return 0; -} \ No newline at end of file +} diff --git a/servers/mqtt_pit.c b/servers/mqtt_pit.c index 13bff74..a5e2a0c 100644 --- a/servers/mqtt_pit.c +++ b/servers/mqtt_pit.c @@ -24,6 +24,12 @@ // #define FD_LIMIT 4096 #define SERVER_ID "MQTT" +static volatile sig_atomic_t keepRunning = 1; +void handleSignal(int sig){ + (void)sig; + keepRunning = 0; +} + int port; int maxEvents; int epollTimeoutInterval; @@ -89,7 +95,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct if (offset + 2 > packetEnd) { fprintf(stderr, "CONNECT request too small for fixed header"); return 0x80; // Unspecified error - } + } uint16_t protocolName = (buffer[offset] << 8) | buffer[offset + 1]; offset += 2; @@ -100,7 +106,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct memcpy(wrong, &buffer[offset], protocolName < 7 ? protocolName : 4); fprintf(stderr, "Malformed CONNECT request. Expected \"MQTT\" or \"MQIsdp\" but got \"%s\"", wrong); return 0x01; // Unacceptable protocol version - } + } if(isV31) { offset += 6; } else { @@ -145,7 +151,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct if (offset + 2 > packetEnd){ fprintf(stderr, "No keep-alive value supplied"); return 0x80; - } + } int keepAlive = (buffer[offset] << 8) | buffer[offset + 1]; if(keepAlive < 0) { fprintf(stderr, "Negative keep-alive value received: %d", keepAlive); @@ -186,7 +192,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct if (offset + 2 > packetEnd) { fprintf(stderr, "Username flag supplied, but with no username"); return 0x80; - } + } uint16_t user_len = (buffer[offset] << 8) | buffer[offset + 1]; offset += 2; @@ -207,7 +213,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct if (offset + 2 > packetEnd) { fprintf(stderr, "Password flag supplied, but with no password"); return 0x80; - } + } uint16_t passwordLength = (buffer[offset] << 8) | buffer[offset + 1]; offset += 2; if (offset + passwordLength > packetEnd){ @@ -215,7 +221,7 @@ uint8_t readConnreq(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, struct return 0x80; } - + uint16_t safeLength = passwordLength < 255 ? passwordLength : 255; memcpy(password, &buffer[offset], safeLength); offset += passwordLength; @@ -314,7 +320,7 @@ bool sendConnack(struct mqttClient* client, uint8_t reasonCode) { if (!arr) { fprintf(stderr, "malloc failed for connack packet"); return false; - } + } if (client->version == V5) { arr[0] = 0x20; // CONNACK fixed header @@ -375,13 +381,13 @@ void readPublish(uint8_t* buffer, uint32_t packetEnd, uint32_t offset, enum Mqtt offset += 2; // packet id (don't care) } - if(version == V5) { + if(version == V5) { uint32_t varint; bool decodeSuccess = decodeVarint(buffer, packetEnd, &offset, &varint); if(!decodeSuccess) { return; } - + // Skip properties offset += varint; } @@ -505,7 +511,7 @@ bool sendPublish(struct mqttClient* client, const char* topic, const char* messa } else { // syslog(LOG_INFO, "Sent PUBLISH to client (fd=%d), topic=%s\n", client->fd, topic); } - + return true; } @@ -609,7 +615,7 @@ bool sendPubrel(struct mqttClient* client, uint16_t packetId) { arr[1] = 0x04; // Remaining Length arr[2] = packetId >> 8; // packetId arr[3] = packetId & 0xFF; - + arr[4] = 0x00; // Reason Code: Success arr[5] = 0x00; // Property Length } else { @@ -706,7 +712,7 @@ void calculateTotalPacketLengths(uint8_t *buffer, uint32_t bytesWrittenToBuffer, while (offset < bytesWrittenToBuffer) { if (bytesWrittenToBuffer - offset < 2) { fprintf(stderr, "CALCULATE: Not enough data for fixed header"); - break; + break; } if (*packetCount == maxPacketsPerClient) { @@ -730,7 +736,7 @@ void calculateTotalPacketLengths(uint8_t *buffer, uint32_t bytesWrittenToBuffer, encodedBytes++; if ((byte & 0b10000000) == 0) { - break; + break; } } @@ -761,7 +767,7 @@ void cleanupBuffer(struct mqttClient* client, uint32_t packetLength){ int main(int argc, char* argv[]) { setbuf(stdout, NULL); - + // testing // char msg[256]; // snprintf(msg, sizeof(msg), "%s connect %s\n", @@ -779,13 +785,13 @@ int main(int argc, char* argv[]) { initializeStats(); setFdLimit(maxNoClients); signal(SIGPIPE, SIG_IGN); - + int serverSock = createServer(port); if (serverSock < 0) { fprintf(stderr, "Invalid server socket fd: %d", serverSock); exit(EXIT_FAILURE); } - + struct sockaddr_in clientAddr; socklen_t addrLen = sizeof(clientAddr); @@ -803,8 +809,16 @@ int main(int argc, char* argv[]) { exit(EXIT_FAILURE); } + struct sigaction sa; + sa.sa_handler = handleSignal; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + signal(SIGPIPE, SIG_IGN); + // long long lastHeartbeat = currentTimeMs(); - while(true) { + while(keepRunning) { long long now = currentTimeMs(); // if (now - lastHeartbeat >= HEARTBEAT_INTERVAL_MS) { @@ -814,11 +828,12 @@ int main(int argc, char* argv[]) { int nfds = epoll_wait(epollfd, eventsQueue, maxEvents, epollTimeoutInterval); if (nfds == -1) { + if (errno == EINTR) continue; fprintf(stderr, "epoll_wait"); exit(EXIT_FAILURE); } - // Update now, since epoll_wait made the value outdated. + // Update now, since epoll_wait made the value outdated. now = currentTimeMs(); for (int n = 0; n < nfds; ++n) { int currentFd = eventsQueue[n].data.fd; @@ -835,7 +850,7 @@ int main(int argc, char* argv[]) { continue; } - + statsMqtt.totalConnects += 1; newClient->fd = clientFd; strncpy(newClient->ipaddr, inet_ntoa(clientAddr.sin_addr), INET_ADDRSTRLEN); @@ -857,7 +872,7 @@ int main(int argc, char* argv[]) { free(newClient); continue; } - + addClient(newClient); char msg[256]; snprintf(msg, sizeof(msg), "%s connect %s\n", @@ -896,13 +911,13 @@ int main(int argc, char* argv[]) { calculateTotalPacketLengths(client->buffer, client->bytesWrittenToBuffer, packetLengths, packetStarts, &packetCount); - + uint32_t processedPackets = 0; for (uint32_t i = 0; i < packetCount; i++) { uint32_t packetLength = packetLengths[i]; uint32_t packetStart = packetStarts[i]; uint32_t packetEnd = packetStart + packetLength; - + if (packetLength == 0 || processedPackets + packetLength > client->bytesWrittenToBuffer) { // syslog(LOG_INFO, "Incomplete packet"); break; // Incomplete packet @@ -975,9 +990,9 @@ int main(int argc, char* argv[]) { } client->bytesWrittenToBuffer = leftover; } - + } - + // Detect dead clients and disconnect them for (struct mqttClient *c = clients, *tmp = NULL; c != NULL; c = tmp) { long long timeSinceLastActivityMs = now - c->lastActivityMs; @@ -995,8 +1010,24 @@ int main(int argc, char* argv[]) { } } } + printf("Shutting down MQTT server gracefully\n"); + struct mqttClient *c, *tmp; + HASH_ITER(hh, clients, c, tmp) { + long long timeTrapped = currentTimeMs() - c->timeOfConnection; + char msg[256]; + snprintf(msg, sizeof(msg), "%s disconnect %s %lld", + SERVER_ID, c->ipaddr, timeTrapped); + printf("%s", msg); + sendMetric(msg); + epoll_ctl(epollfd, EPOLL_CTL_DEL, c->fd, NULL); + close(c->fd); + HASH_DEL(clients, c); + free(c); + } + close(epollfd); + printf("MQTT server shutdown successfully\n"); // closelog(); close(serverSock); return 0; -} \ No newline at end of file +} diff --git a/servers/telnet_pit.c b/servers/telnet_pit.c index ebd2013..40d62fc 100644 --- a/servers/telnet_pit.c +++ b/servers/telnet_pit.c @@ -29,13 +29,20 @@ int port; int delay; int maxNoClients; +static volatile sig_atomic_t keepRunning = 1; + +void handleSignal(int sig) { + (void)sig; //Suppresses unused parameter warning + keepRunning = 0; +} + // Telnet negotiation options unsigned char negotiations[][3] = { - {IAC, WILL, 1}, - {IAC, DO, 3}, + {IAC, WILL, 1}, + {IAC, DO, 3}, {IAC, DONT, 5}, - {IAC, WILL, 31}, - {IAC, DO, 24}, + {IAC, WILL, 31}, + {IAC, DO, 24}, {IAC, WONT, 39} }; int num_options = sizeof(negotiations) / sizeof(negotiations[0]); @@ -53,7 +60,7 @@ void initializeStats(){ int main(int argc, char *argv[]) { setbuf(stdout, NULL); - + // testing // char msg[256]; // snprintf(msg, sizeof(msg), "%s connect %s\n", @@ -61,32 +68,45 @@ int main(int argc, char *argv[]) { // fprintf(stderr, "%s", msg); // sendMetric(msg); (void)argc; + if (argc < 4) { + fprintf(stderr, "Usage: %s \n", argv[0]); + exit(EXIT_FAILURE); + } port = atoi(argv[1]); delay = atoi(argv[2]); maxNoClients = atoi(argv[3]); initializeStats(); setFdLimit(maxNoClients); - signal(SIGPIPE, SIG_IGN); // Ignore + + // Setup signal handlers for graceful shutdown + struct sigaction sa; + sa.sa_handler = handleSignal; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + + signal(SIGPIPE, SIG_IGN); // Ignore queue_init(&clientQueueTelnet); - + int serverSock = createServer(port); if (serverSock < 0) { fprintf(stderr, "Invalid server socket fd: %d", serverSock); exit(EXIT_FAILURE); } - + struct sockaddr_in clientAddr; socklen_t addrLen = sizeof(clientAddr); - + struct pollfd fds; memset(&fds, 0, sizeof(fds)); fds.fd = serverSock; fds.events = POLLIN; - + // long long lastHeartbeat = currentTimeMs(); - while (1) { + while (keepRunning) { long long now = currentTimeMs(); - int timeout = -1; + int timeout = 1000; // if (now - lastHeartbeat >= HEARTBEAT_INTERVAL_MS) { // heartbeatLog(); @@ -98,10 +118,10 @@ int main(int argc, char *argv[]) { if(clientQueueTelnet.head->sendNext <= now){ struct baseClient *bc = queue_pop(&clientQueueTelnet); struct telnetAndUpnpClient *c = (struct telnetAndUpnpClient *)bc; - + int optionIndex = rand() % num_options; ssize_t out = write(c->fd, negotiations[optionIndex], sizeof(negotiations[optionIndex])); - + if (out == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // Avoid blocking c->base.sendNext = now + delay; @@ -129,10 +149,11 @@ int main(int argc, char *argv[]) { break; } } - + int pollResult = poll(&fds, 1, timeout); now = currentTimeMs(); // Poll will cause old value to be misrepresenting if (pollResult < 0) { + if (errno == EINTR) continue; // Interrupted by signal handler fprintf(stderr, "Poll error with error %s", strerror(errno)); continue; } @@ -141,6 +162,7 @@ int main(int argc, char *argv[]) { if (fds.revents & POLLIN) { int clientFd = accept(serverSock, (struct sockaddr *)&clientAddr, &addrLen); if(clientFd == -1) { + if (errno == EINTR) continue; fprintf(stderr, "Failed accepting new client with error %s", strerror(errno)); continue; } @@ -172,6 +194,21 @@ int main(int argc, char *argv[]) { } } + printf("Shutting down gracefully...\n"); + // Cleanup: disconnect all clients + while (clientQueueTelnet.head) { + struct baseClient *bc = queue_pop(&clientQueueTelnet); + struct telnetAndUpnpClient *c = (struct telnetAndUpnpClient *)bc; + long long timeTrapped = c->base.timeConnected; + char msg[256]; + snprintf(msg, sizeof(msg), "%s disconnect %s %lld\n", + SERVER_ID, c->base.ipaddr, timeTrapped); + printf("%s", msg); + sendMetric(msg); + close(c->fd); + free(c); + } + close(serverSock); return 0; } diff --git a/servers/upnp_pit.c b/servers/upnp_pit.c index 0c6a7e2..f6a64a6 100644 --- a/servers/upnp_pit.c +++ b/servers/upnp_pit.c @@ -22,6 +22,15 @@ int ssdpPort; int delay; int maxNoClients; +// Global flag for graceful shutdown +static volatile sig_atomic_t keepRunning = 1; + +// Signal handler to flip the flag +void handleSignal(int sig){ + (void)sig; + keepRunning = 0; +} + // Can use Chunked Transfer Coding from rfc 2616 section 3.6.1 // Required to be a HTTP GET request (Section 2.1 from specifications) const char *FAKE_DEVICE_DESCRIPTION = @@ -47,7 +56,7 @@ const char *FAKE_DEVICE_DESCRIPTION = " Philips_Hue_2.00.10966.PVT-OWRT-InsightV2\n" " 1|49153\n" " 8\n" - " \n" + " \n" " \n" " jpg\n" " 100\n" @@ -69,7 +78,7 @@ const char *FAKE_CHUNK = // void heartbeatLog() { // syslog(LOG_INFO, "Server is running with %d connected clients. Number of most concurrent connected clients is %d", clientQueueUpnp.length, statsUpnp.mostConcurrentConnections); -// syslog(LOG_INFO, "Current statistics: wasted time: %lld ms. Total HTTP requests: %ld. Total other HTTP requests: %ld. SSDP responses: %ld. XML requests: %ld", +// syslog(LOG_INFO, "Current statistics: wasted time: %lld ms. Total HTTP requests: %ld. Total other HTTP requests: %ld. SSDP responses: %ld. XML requests: %ld", // statsUpnp.totalWastedTime, statsUpnp.totalHttpRequests, statsUpnp.otherHttpRequests, statsUpnp.ssdpResponses, statsUpnp.totalXmlRequests); // } @@ -133,6 +142,9 @@ void *ssdpListener(void *arg) { exit(EXIT_FAILURE); } + struct timeval tv= {1, 0}; + setsockopt(sockFd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + // Bind to all interfaces for unicast memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; @@ -145,6 +157,11 @@ void *ssdpListener(void *arg) { // mreq.imr_interface.s_addr = INADDR_ANY; // setsockopt(sockFd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + // Allow multiple sockets to bind to the same port + int opt=1; + setsockopt(sockFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(sockFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); + if (bind(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { fprintf(stderr, "SSDP Bind failed"); close(sockFd); @@ -153,38 +170,40 @@ void *ssdpListener(void *arg) { printf("UPnP listener started on port %d\n", ssdpPort); - while (1) { + while (keepRunning) { memset(buffer, 0, sizeof(buffer)); if (recvfrom(sockFd, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &addrLen) <= 0) { + if(errno ==EINTR || errno == EAGAIN) continue; fprintf(stderr, "Error receiving SSDP request"); continue; } char client_ip[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, INET_ADDRSTRLEN); - + char msg[256]; int isMSearch = strstr(buffer, "M-SEARCH") != NULL; if (isMSearch) { sendto(sockFd, response, strlen(response), 0, (struct sockaddr *)&client_addr, sizeof(client_addr)); - - snprintf(msg, sizeof(msg), "%s M-SEARCH %s\n", + + snprintf(msg, sizeof(msg), "%s M-SEARCH %s\n", SERVER_ID, client_ip); } else { - snprintf(msg, sizeof(msg), "%s non-M-SEARCH %s\n", + snprintf(msg, sizeof(msg), "%s non-M-SEARCH %s\n", SERVER_ID, client_ip); } - + printf("%s", msg); sendMetric(msg); } - free(ssdpResponse); + free(response); close(sockFd); + printf("SSDP listener thread exiting\n"); return NULL; } @@ -200,16 +219,16 @@ void *httpServer(void *arg) { struct sockaddr_in clientAddr; socklen_t addrLen = sizeof(clientAddr); - + struct pollfd fds; memset(&fds, 0, sizeof(fds)); fds.fd = serverSock; fds.events = POLLIN; // long long lastHeartbeat = currentTimeMs(); - while (1){ + while (keepRunning){ long long now = currentTimeMs(); - int timeout = -1; + int timeout = 1000; // long long res = now - lastHeartbeat; @@ -228,7 +247,7 @@ void *httpServer(void *arg) { write(c->fd, chunk_size, strlen(chunk_size)); write(c->fd, FAKE_CHUNK, strlen(FAKE_CHUNK)); ssize_t out = write(c->fd, "\r\n", 2); - + if (out == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // Avoid blocking c->base.sendNext = now + delay; @@ -262,6 +281,7 @@ void *httpServer(void *arg) { int pollResult = poll(&fds, 1, timeout); now = currentTimeMs(); // Poll will cause old value to be misrepresenting if (pollResult < 0) { + if(errno ==EINTR) continue; fprintf(stderr, "Poll error with error %s", strerror(errno)); continue; } @@ -284,9 +304,10 @@ void *httpServer(void *arg) { char buffer[1024]; memset(buffer, 0, 1024); - read(clientFd, buffer, 1024-1); - char method[20], url[128]; - sscanf(buffer, "%19s %255s", method, url); + int r= read(clientFd, buffer, 1024-1); + if(r>0){ + char method[20], url[128]; + sscanf(buffer, "%19s %255s", method, url); if (strcmp(url, "/hue-device.xml") == 0 && strcmp(method, "GET") == 0) { // statsUpnp.totalXmlRequests += 1; @@ -295,11 +316,9 @@ void *httpServer(void *arg) { "Transfer-Encoding: chunked\r\n" "Trailer: X-Checksum\r\n" "\r\n"; - + ssize_t out = write(clientFd, responseHeader, strlen(responseHeader)); if(out <= 0){ - fprintf(stderr, "failed to write response header to %s\n", - inet_ntoa(clientAddr.sin_addr)); close(clientFd); free(newClient); continue; @@ -317,10 +336,8 @@ void *httpServer(void *arg) { snprintf(newClient->base.ipaddr, sizeof(newClient->base.ipaddr), "%s", inet_ntoa(clientAddr.sin_addr)); queue_append(&clientQueueUpnp, (struct baseClient*)newClient); - if(statsUpnp.mostConcurrentConnections < clientQueueUpnp.length) { - statsUpnp.mostConcurrentConnections = clientQueueUpnp.length; - } - + snprintf(newClient->base.ipaddr, sizeof(newClient->base.ipaddr), "%s", inet_ntoa(clientAddr.sin_addr)); + queue_append(&clientQueueUpnp, (struct baseClient*)newClient); char msg[256]; snprintf(msg, sizeof(msg), "%s connect %s\n", SERVER_ID, newClient->base.ipaddr); @@ -341,15 +358,33 @@ void *httpServer(void *arg) { close(clientFd); free(newClient); - continue; } + } else { + close(clientFd); + free(newClient); } } - - close(serverSock); - return NULL; +} +printf("Shutting down HTTP server gracefully\n"); +while(clientQueueUpnp.head){ + struct baseClient *bc = queue_pop(&clientQueueUpnp); + struct telnetAndUpnpClient *c = (struct telnetAndUpnpClient *)bc; + long long timeTrapped = c->base.timeConnected; + char msg[256]; + snprintf(msg, sizeof(msg), "%s disconnect %s %lld\n", + SERVER_ID, c->base.ipaddr, timeTrapped); + printf("%s", msg); + sendMetric(msg); + close(c->fd); + free(c); +} +close(serverSock); +printf("HTTP server thread exiting\n"); +return NULL; } + + void initializeStats(){ statsUpnp.totalWastedTime = 0; statsUpnp.otherHttpRequests = 0; @@ -361,7 +396,11 @@ void initializeStats(){ int main(int argc, char* argv[]) { setbuf(stdout, NULL); - + if(argc < 5){ + fprintf(stderr, "Usage: %s \n", argv[0]); + exit(EXIT_FAILURE); + } + // testing // char msg[256]; // snprintf(msg, sizeof(msg), "%s connect %s\n", @@ -372,7 +411,6 @@ int main(int argc, char* argv[]) { // SERVER_ID, "82.211.213.247"); // fprintf(stderr, "%s", msg); // sendMetric(msg); - (void)argc; httpPort = atoi(argv[1]); ssdpPort = atoi(argv[2]); delay = atoi(argv[3]); @@ -380,10 +418,18 @@ int main(int argc, char* argv[]) { // openlog("upnp_tarpit", LOG_PID | LOG_CONS, LOG_USER); initializeStats(); setFdLimit(maxNoClients); + struct sigaction sa; + sa.sa_handler = handleSignal; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + signal(SIGPIPE, SIG_IGN); pthread_t ssdpThread, httpThread; pthread_create(&ssdpThread, NULL, ssdpListener, NULL); pthread_create(&httpThread, NULL, httpServer, NULL); pthread_join(ssdpThread, NULL); pthread_join(httpThread, NULL); + printf("UPnP pit shut down successfully\n"); return 0; }