From 618f648a0d18c24ee47294f422f4d3dda858c526 Mon Sep 17 00:00:00 2001 From: zhaohe Date: Tue, 17 Jun 2025 22:28:05 +0800 Subject: [PATCH] =?UTF-8?q?v1.4|=20=E5=A2=9E=E5=BC=BAunixScoket=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/components/linuxsocket/unix_socket.cpp | 53 ++++++++++--- src/components/linuxsocket/unix_socket.hpp | 1 + src/configs/version.hpp | 2 +- src/main.cpp | 21 ++++- test/auto.sh | 8 ++ test/receiver.c | 110 ++++++++++++++------------ test/sender.c | 120 +++++++++++++++-------------- 7 files changed, 197 insertions(+), 118 deletions(-) create mode 100755 test/auto.sh diff --git a/src/components/linuxsocket/unix_socket.cpp b/src/components/linuxsocket/unix_socket.cpp index c84cb3e..3088ec5 100644 --- a/src/components/linuxsocket/unix_socket.cpp +++ b/src/components/linuxsocket/unix_socket.cpp @@ -2,36 +2,71 @@ using namespace iflytop; using namespace std; using namespace core; - -void UnixScoket::start() { - logger->info("Creating UNIX domain socket at path: {}", m_path); - logger->info("Connecting to opposite path: {}", m_opposite_path); - +int UnixScoket::initRxSocket() { sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (sock_fd == -1) { logger->error("socket creation failed: {}", strerror(errno)); - exit(EXIT_FAILURE); + return -1; } // Remove socket file if it already exists unlink(m_path.c_str()); if (bind(sock_fd, (struct sockaddr *)&rx_addr, sizeof(rx_addr)) == -1) { - perror("bind"); + logger->error("bind failed: {}", strerror(errno)); + close(sock_fd); + sock_fd = -1; + return -1; + } + + // 设置超时时间为5秒 + struct timeval tv; + tv.tv_sec = 1; // 秒 + tv.tv_usec = 0; // 微秒 + + // 设置SO_RCVTIMEO选项 + if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + logger->error("setsockopt failed: {}", strerror(errno)); close(sock_fd); - exit(EXIT_FAILURE); + sock_fd = -1; + return -1; } + return 0; +} +void UnixScoket::start() { + logger->info("Creating UNIX domain socket at path: {}", m_path); + logger->info("Connecting to opposite path: {}", m_opposite_path); + + initRxSocket(); m_thread.reset(new Thread("UnixSocketThread", [this]() { uint8_t rxbuffer[1024]; struct sockaddr_un client_addr; ThisThread thisThread; while (!thisThread.getExitFlag()) { + if (sock_fd < 0) { + usleep(100 * 1000); + logger->error("Socket is closed, reinitializing..."); + initRxSocket(); + continue; + } + memset(rxbuffer, 0, sizeof(rxbuffer)); socklen_t client_len = sizeof(client_addr); ssize_t num_bytes = recvfrom(sock_fd, rxbuffer, sizeof(rxbuffer) - 1, 0, (struct sockaddr *)&client_addr, &client_len); if (num_bytes > 0) { onPacket(rxbuffer, num_bytes); } + if (num_bytes < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { // 超时错误 + logger->info("recvfrom timed out, no data received"); + continue; + } else { + logger->error("recvfrom failed: {}", strerror(errno)); + close(sock_fd); + sock_fd = -1; + continue; + } + } } close(sock_fd); unlink(m_path.c_str()); @@ -41,5 +76,5 @@ void UnixScoket::sendPacket(uint8_t *data, size_t len) { int ret = sendto(sock_fd, data, len, 0, (struct sockaddr *)&tx_addr, sizeof(tx_addr)); if (ret == -1) { logger->error("sendto failed: {}", strerror(errno)); - } + } } \ No newline at end of file diff --git a/src/components/linuxsocket/unix_socket.hpp b/src/components/linuxsocket/unix_socket.hpp index 067f13b..40ee0af 100644 --- a/src/components/linuxsocket/unix_socket.hpp +++ b/src/components/linuxsocket/unix_socket.hpp @@ -64,5 +64,6 @@ class UnixScoket { void start(); void sendPacket(uint8_t* data, size_t len); + int initRxSocket(); }; } // namespace iflytop \ No newline at end of file diff --git a/src/configs/version.hpp b/src/configs/version.hpp index 915ecf7..cae227f 100644 --- a/src/configs/version.hpp +++ b/src/configs/version.hpp @@ -1,2 +1,2 @@ #pragma once -#define VERSION "1.3" \ No newline at end of file +#define VERSION "1.4" \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 5bbaf00..e54f0f0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,10 +26,29 @@ int main(int argc, char* argv[]) { mainLogger->info("# company:{}", "ifytop"); mainLogger->info("# version:{}", VERSION); mainLogger->info("#"); - App app; + + App app; app.initialize(); while (true) { sleep(1); } + + // unique_ptr server; + // server.reset(new UnixScoket(UnixScoket::Server, "zexcan")); + // server->start(); + // server->onPacket.connect([mainLogger](uint8_t* data, size_t len) { + // if (len > 0) { + // mainLogger->info("Received packet: {}", string((char*)data, len)); + // } + // }); + + // unique_ptr client; + // client.reset(new UnixScoket(UnixScoket::Client, "zexcan")); + // client->start(); + + // while (true) { + // client->sendPacket((uint8_t*)"hello", 5); + // sleep(1); + // } } diff --git a/test/auto.sh b/test/auto.sh new file mode 100755 index 0000000..76ad755 --- /dev/null +++ b/test/auto.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# 循环指令./sender 随即休眠0.1,0.2,0.3 后杀死 进程 +while true; do + ./sender & + sleep $(printf "0.%03d" $(( RANDOM % 100 ))) + killall sender + echo "Killed sender process" +done diff --git a/test/receiver.c b/test/receiver.c index 46761b5..e25e8bd 100644 --- a/test/receiver.c +++ b/test/receiver.c @@ -1,63 +1,75 @@ +#include #include #include #include -#include #include #include +#include #define SOCKET_PATH "/tmp/unix_dgram_demo" int main() { - int sock_fd; - struct sockaddr_un server_addr, client_addr; - socklen_t client_len; - char buffer[256]; - - // Create socket - sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); - if (sock_fd == -1) { - perror("socket"); - exit(EXIT_FAILURE); - } - - // Remove socket file if it already exists - unlink(SOCKET_PATH); - - // Configure server address - memset(&server_addr, 0, sizeof(server_addr)); - server_addr.sun_family = AF_UNIX; - strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1); - - // Bind socket to address - if (bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { - perror("bind"); - close(sock_fd); - exit(EXIT_FAILURE); - } - - printf("Receiver is waiting for messages on %s...\n", SOCKET_PATH); - + int sock_fd; + struct sockaddr_un server_addr, client_addr; + socklen_t client_len; + char buffer[10240]; + + // Create socket + sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if (sock_fd == -1) { + perror("socket"); + exit(EXIT_FAILURE); + } + + // Remove socket file if it already exists + unlink(SOCKET_PATH); + + // Configure server address + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sun_family = AF_UNIX; + strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1); + + // Bind socket to address + if (bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { + perror("bind"); + close(sock_fd); + exit(EXIT_FAILURE); + } + + // 设置超时时间为5秒 + struct timeval tv; + tv.tv_sec = 1; // 秒 + tv.tv_usec = 0; // 微秒 + + // 设置SO_RCVTIMEO选项 + if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt failed"); + return -1; + } + + printf("Receiver is waiting for messages on %s...\n", SOCKET_PATH); + + while (1) { // Receive message - client_len = sizeof(client_addr); - ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, - (struct sockaddr *)&client_addr, &client_len); + client_len = sizeof(client_addr); + ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&client_addr, &client_len); if (num_bytes == -1) { - perror("recvfrom"); + if (errno == EAGAIN || errno == EWOULDBLOCK) { // 超时错误 + printf("No message received within the timeout period.\n"); + continue; + } + + perror("!!!recvfrom"); + exit(EXIT_FAILURE); } else { - buffer[num_bytes] = '\0'; - printf("Received message: %s\n", buffer); - - // Send response - const char *response = "Hello from receiver!"; - if (sendto(sock_fd, response, strlen(response), 0, - (struct sockaddr *)&client_addr, client_len) == -1) { - perror("sendto"); - } + buffer[num_bytes] = '\0'; + printf("Received message: %d\n", num_bytes); } - - // Clean up - close(sock_fd); - unlink(SOCKET_PATH); - - return 0; + } + + // Clean up + close(sock_fd); + unlink(SOCKET_PATH); + + return 0; } \ No newline at end of file diff --git a/test/sender.c b/test/sender.c index ab4defc..b9fdb92 100644 --- a/test/sender.c +++ b/test/sender.c @@ -1,70 +1,74 @@ #include #include #include -#include #include #include +#include #define SOCKET_PATH "/tmp/unix_dgram_demo" #define SENDER_PATH "/tmp/unix_dgram_sender" int main() { - int sock_fd; - struct sockaddr_un server_addr, client_addr; - char buffer[256]; - - // Create socket - sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); - if (sock_fd == -1) { - perror("socket"); - exit(EXIT_FAILURE); - } - - // Configure client address (we need to bind to send/receive) - memset(&client_addr, 0, sizeof(client_addr)); - client_addr.sun_family = AF_UNIX; - strncpy(client_addr.sun_path, SENDER_PATH, sizeof(client_addr.sun_path) - 1); - - // Remove socket file if it already exists - unlink(SENDER_PATH); - - // Bind to our own address (required for receiving response) - if (bind(sock_fd, (struct sockaddr *)&client_addr, sizeof(client_addr)) == -1) { - perror("bind"); - close(sock_fd); - exit(EXIT_FAILURE); - } - - // Configure server address - memset(&server_addr, 0, sizeof(server_addr)); - server_addr.sun_family = AF_UNIX; - strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1); - - // Send message to server - const char *message = "Hello from sender!"; - if (sendto(sock_fd, message, strlen(message), 0, - (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { - perror("sendto"); - close(sock_fd); - exit(EXIT_FAILURE); - } - - printf("Message sent to receiver.\n"); - - // Receive response - socklen_t server_len = sizeof(server_addr); - ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, - (struct sockaddr *)&server_addr, &server_len); - if (num_bytes == -1) { - perror("recvfrom"); - } else { - buffer[num_bytes] = '\0'; - printf("Received response: %s\n", buffer); - } - - // Clean up + int sock_fd; + struct sockaddr_un server_addr, client_addr; + char buffer[256]; + + // Create socket + sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); + if (sock_fd == -1) { + perror("socket"); + exit(EXIT_FAILURE); + } + + // Configure client address (we need to bind to send/receive) + memset(&client_addr, 0, sizeof(client_addr)); + client_addr.sun_family = AF_UNIX; + strncpy(client_addr.sun_path, SENDER_PATH, sizeof(client_addr.sun_path) - 1); + + // Remove socket file if it already exists + unlink(SENDER_PATH); + + // Bind to our own address (required for receiving response) + if (bind(sock_fd, (struct sockaddr *)&client_addr, sizeof(client_addr)) == -1) { + perror("bind"); close(sock_fd); - unlink(SENDER_PATH); - - return 0; + exit(EXIT_FAILURE); + } + + // Configure server address + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sun_family = AF_UNIX; + strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1); + + // Send message to server + char txbuf[10240] = {0}; + for (int i = 0; i < sizeof(txbuf) - 1; i++) { + txbuf[i] = 'a' + (i % 26); // Fill with a-z characters + } + + while (1) { + if (sendto(sock_fd, txbuf, 10240, 0, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { + perror("sendto"); + close(sock_fd); + exit(EXIT_FAILURE); + } + } + + printf("Message sent to receiver.\n"); + + // Receive response + socklen_t server_len = sizeof(server_addr); + ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&server_addr, &server_len); + if (num_bytes == -1) { + perror("recvfrom"); + } else { + buffer[num_bytes] = '\0'; + printf("Received response: %s\n", buffer); + } + + // Clean up + close(sock_fd); + unlink(SENDER_PATH); + + return 0; } \ No newline at end of file