Browse Source

v1.4| 增强unixScoket稳定性

master
zhaohe 2 months ago
parent
commit
618f648a0d
  1. 53
      src/components/linuxsocket/unix_socket.cpp
  2. 1
      src/components/linuxsocket/unix_socket.hpp
  3. 2
      src/configs/version.hpp
  4. 21
      src/main.cpp
  5. 8
      test/auto.sh
  6. 110
      test/receiver.c
  7. 120
      test/sender.c

53
src/components/linuxsocket/unix_socket.cpp

@ -2,36 +2,71 @@
using namespace iflytop; using namespace iflytop;
using namespace std; using namespace std;
using namespace core; using namespace core;
void UnixScoket::start() {
logger->info("Creating UNIX domain socket at path: {}", m_path);
logger->info("Connecting to opposite path: {}", m_opposite_path);
int UnixScoket::initRxSocket() {
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0); sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd == -1) { if (sock_fd == -1) {
logger->error("socket creation failed: {}", strerror(errno)); logger->error("socket creation failed: {}", strerror(errno));
exit(EXIT_FAILURE);
return -1;
} }
// Remove socket file if it already exists // Remove socket file if it already exists
unlink(m_path.c_str()); unlink(m_path.c_str());
if (bind(sock_fd, (struct sockaddr *)&rx_addr, sizeof(rx_addr)) == -1) { if (bind(sock_fd, (struct sockaddr *)&rx_addr, sizeof(rx_addr)) == -1) {
perror("bind");
logger->error("bind failed: {}", strerror(errno));
close(sock_fd);
sock_fd = -1;
return -1;
}
// 设置超时时间为5秒
struct timeval tv;
tv.tv_sec = 1; // 秒
tv.tv_usec = 0; // 微秒
// 设置SO_RCVTIMEO选项
if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
logger->error("setsockopt failed: {}", strerror(errno));
close(sock_fd); close(sock_fd);
exit(EXIT_FAILURE);
sock_fd = -1;
return -1;
} }
return 0;
}
void UnixScoket::start() {
logger->info("Creating UNIX domain socket at path: {}", m_path);
logger->info("Connecting to opposite path: {}", m_opposite_path);
initRxSocket();
m_thread.reset(new Thread("UnixSocketThread", [this]() { m_thread.reset(new Thread("UnixSocketThread", [this]() {
uint8_t rxbuffer[1024]; uint8_t rxbuffer[1024];
struct sockaddr_un client_addr; struct sockaddr_un client_addr;
ThisThread thisThread; ThisThread thisThread;
while (!thisThread.getExitFlag()) { while (!thisThread.getExitFlag()) {
if (sock_fd < 0) {
usleep(100 * 1000);
logger->error("Socket is closed, reinitializing...");
initRxSocket();
continue;
}
memset(rxbuffer, 0, sizeof(rxbuffer)); memset(rxbuffer, 0, sizeof(rxbuffer));
socklen_t client_len = sizeof(client_addr); socklen_t client_len = sizeof(client_addr);
ssize_t num_bytes = recvfrom(sock_fd, rxbuffer, sizeof(rxbuffer) - 1, 0, (struct sockaddr *)&client_addr, &client_len); ssize_t num_bytes = recvfrom(sock_fd, rxbuffer, sizeof(rxbuffer) - 1, 0, (struct sockaddr *)&client_addr, &client_len);
if (num_bytes > 0) { if (num_bytes > 0) {
onPacket(rxbuffer, num_bytes); onPacket(rxbuffer, num_bytes);
} }
if (num_bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { // 超时错误
logger->info("recvfrom timed out, no data received");
continue;
} else {
logger->error("recvfrom failed: {}", strerror(errno));
close(sock_fd);
sock_fd = -1;
continue;
}
}
} }
close(sock_fd); close(sock_fd);
unlink(m_path.c_str()); unlink(m_path.c_str());
@ -41,5 +76,5 @@ void UnixScoket::sendPacket(uint8_t *data, size_t len) {
int ret = sendto(sock_fd, data, len, 0, (struct sockaddr *)&tx_addr, sizeof(tx_addr)); int ret = sendto(sock_fd, data, len, 0, (struct sockaddr *)&tx_addr, sizeof(tx_addr));
if (ret == -1) { if (ret == -1) {
logger->error("sendto failed: {}", strerror(errno)); logger->error("sendto failed: {}", strerror(errno));
}
}
} }

1
src/components/linuxsocket/unix_socket.hpp

@ -64,5 +64,6 @@ class UnixScoket {
void start(); void start();
void sendPacket(uint8_t* data, size_t len); void sendPacket(uint8_t* data, size_t len);
int initRxSocket();
}; };
} // namespace iflytop } // namespace iflytop

2
src/configs/version.hpp

@ -1,2 +1,2 @@
#pragma once #pragma once
#define VERSION "1.3"
#define VERSION "1.4"

21
src/main.cpp

@ -26,10 +26,29 @@ int main(int argc, char* argv[]) {
mainLogger->info("# company:{}", "ifytop"); mainLogger->info("# company:{}", "ifytop");
mainLogger->info("# version:{}", VERSION); mainLogger->info("# version:{}", VERSION);
mainLogger->info("#"); mainLogger->info("#");
App app;
App app;
app.initialize(); app.initialize();
while (true) { while (true) {
sleep(1); sleep(1);
} }
// unique_ptr<UnixScoket> server;
// server.reset(new UnixScoket(UnixScoket::Server, "zexcan"));
// server->start();
// server->onPacket.connect([mainLogger](uint8_t* data, size_t len) {
// if (len > 0) {
// mainLogger->info("Received packet: {}", string((char*)data, len));
// }
// });
// unique_ptr<UnixScoket> client;
// client.reset(new UnixScoket(UnixScoket::Client, "zexcan"));
// client->start();
// while (true) {
// client->sendPacket((uint8_t*)"hello", 5);
// sleep(1);
// }
} }

8
test/auto.sh

@ -0,0 +1,8 @@
#!/bin/bash
# 循环指令./sender 随即休眠0.1,0.2,0.3 后杀死 进程
while true; do
./sender &
sleep $(printf "0.%03d" $(( RANDOM % 100 )))
killall sender
echo "Killed sender process"
done

110
test/receiver.c

@ -1,63 +1,75 @@
#include <errno.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h>
#define SOCKET_PATH "/tmp/unix_dgram_demo" #define SOCKET_PATH "/tmp/unix_dgram_demo"
int main() { int main() {
int sock_fd;
struct sockaddr_un server_addr, client_addr;
socklen_t client_len;
char buffer[256];
// Create socket
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
// Remove socket file if it already exists
unlink(SOCKET_PATH);
// Configure server address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// Bind socket to address
if (bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(sock_fd);
exit(EXIT_FAILURE);
}
printf("Receiver is waiting for messages on %s...\n", SOCKET_PATH);
int sock_fd;
struct sockaddr_un server_addr, client_addr;
socklen_t client_len;
char buffer[10240];
// Create socket
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
// Remove socket file if it already exists
unlink(SOCKET_PATH);
// Configure server address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// Bind socket to address
if (bind(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(sock_fd);
exit(EXIT_FAILURE);
}
// 5
struct timeval tv;
tv.tv_sec = 1; //
tv.tv_usec = 0; //
// SO_RCVTIMEO选项
if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt failed");
return -1;
}
printf("Receiver is waiting for messages on %s...\n", SOCKET_PATH);
while (1) {
// Receive message // Receive message
client_len = sizeof(client_addr);
ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr *)&client_addr, &client_len);
client_len = sizeof(client_addr);
ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&client_addr, &client_len);
if (num_bytes == -1) { if (num_bytes == -1) {
perror("recvfrom");
if (errno == EAGAIN || errno == EWOULDBLOCK) { //
printf("No message received within the timeout period.\n");
continue;
}
perror("!!!recvfrom");
exit(EXIT_FAILURE);
} else { } else {
buffer[num_bytes] = '\0';
printf("Received message: %s\n", buffer);
// Send response
const char *response = "Hello from receiver!";
if (sendto(sock_fd, response, strlen(response), 0,
(struct sockaddr *)&client_addr, client_len) == -1) {
perror("sendto");
}
buffer[num_bytes] = '\0';
printf("Received message: %d\n", num_bytes);
} }
// Clean up
close(sock_fd);
unlink(SOCKET_PATH);
return 0;
}
// Clean up
close(sock_fd);
unlink(SOCKET_PATH);
return 0;
} }

120
test/sender.c

@ -1,70 +1,74 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <unistd.h>
#define SOCKET_PATH "/tmp/unix_dgram_demo" #define SOCKET_PATH "/tmp/unix_dgram_demo"
#define SENDER_PATH "/tmp/unix_dgram_sender" #define SENDER_PATH "/tmp/unix_dgram_sender"
int main() { int main() {
int sock_fd;
struct sockaddr_un server_addr, client_addr;
char buffer[256];
// Create socket
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
// Configure client address (we need to bind to send/receive)
memset(&client_addr, 0, sizeof(client_addr));
client_addr.sun_family = AF_UNIX;
strncpy(client_addr.sun_path, SENDER_PATH, sizeof(client_addr.sun_path) - 1);
// Remove socket file if it already exists
unlink(SENDER_PATH);
// Bind to our own address (required for receiving response)
if (bind(sock_fd, (struct sockaddr *)&client_addr, sizeof(client_addr)) == -1) {
perror("bind");
close(sock_fd);
exit(EXIT_FAILURE);
}
// Configure server address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// Send message to server
const char *message = "Hello from sender!";
if (sendto(sock_fd, message, strlen(message), 0,
(struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("sendto");
close(sock_fd);
exit(EXIT_FAILURE);
}
printf("Message sent to receiver.\n");
// Receive response
socklen_t server_len = sizeof(server_addr);
ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr *)&server_addr, &server_len);
if (num_bytes == -1) {
perror("recvfrom");
} else {
buffer[num_bytes] = '\0';
printf("Received response: %s\n", buffer);
}
// Clean up
int sock_fd;
struct sockaddr_un server_addr, client_addr;
char buffer[256];
// Create socket
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
// Configure client address (we need to bind to send/receive)
memset(&client_addr, 0, sizeof(client_addr));
client_addr.sun_family = AF_UNIX;
strncpy(client_addr.sun_path, SENDER_PATH, sizeof(client_addr.sun_path) - 1);
// Remove socket file if it already exists
unlink(SENDER_PATH);
// Bind to our own address (required for receiving response)
if (bind(sock_fd, (struct sockaddr *)&client_addr, sizeof(client_addr)) == -1) {
perror("bind");
close(sock_fd); close(sock_fd);
unlink(SENDER_PATH);
return 0;
exit(EXIT_FAILURE);
}
// Configure server address
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// Send message to server
char txbuf[10240] = {0};
for (int i = 0; i < sizeof(txbuf) - 1; i++) {
txbuf[i] = 'a' + (i % 26); // Fill with a-z characters
}
while (1) {
if (sendto(sock_fd, txbuf, 10240, 0, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("sendto");
close(sock_fd);
exit(EXIT_FAILURE);
}
}
printf("Message sent to receiver.\n");
// Receive response
socklen_t server_len = sizeof(server_addr);
ssize_t num_bytes = recvfrom(sock_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&server_addr, &server_len);
if (num_bytes == -1) {
perror("recvfrom");
} else {
buffer[num_bytes] = '\0';
printf("Received response: %s\n", buffer);
}
// Clean up
close(sock_fd);
unlink(SENDER_PATH);
return 0;
} }
Loading…
Cancel
Save