mirror of
https://github.com/OpenSolo/OpenSolo.git
synced 2025-04-30 06:34:38 +02:00
643 lines
21 KiB
C++
643 lines
21 KiB
C++
|
|
#include <cstring>
|
|
#include <iostream>
|
|
#include <iomanip>
|
|
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <pthread.h>
|
|
#include <unistd.h>
|
|
#include <stddef.h>
|
|
#include <stdlib.h>
|
|
#include <signal.h>
|
|
#include <syslog.h>
|
|
#include <unistd.h>
|
|
#include <sys/signalfd.h>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/un.h>
|
|
#include <netinet/in.h>
|
|
#include <netinet/tcp.h>
|
|
#include <arpa/inet.h>
|
|
#include "util.h"
|
|
#include "mutex.h"
|
|
#include "ButtonFunctionCfg.h"
|
|
#include "SetShotInfo.h"
|
|
#include "TcpServer.h"
|
|
|
|
//#define NDEBUG
|
|
#include <assert.h>
|
|
|
|
using namespace std;
|
|
|
|
TcpClient::TcpClient(TcpServer *server, int fd_client)
|
|
: _server(server), _fd_client(fd_client), _buf_bytes(0)
|
|
{
|
|
int sockopt = 1;
|
|
if (setsockopt(fd_client, IPPROTO_TCP, TCP_NODELAY, &sockopt, sizeof(sockopt)) != 0)
|
|
syslog(LOG_ERR, "TcpClient: setsockopt TCP_NODELAY: %s", strerror(errno));
|
|
}
|
|
|
|
TcpClient::~TcpClient()
|
|
{
|
|
close(_fd_client);
|
|
_fd_client = -1;
|
|
_server = NULL;
|
|
}
|
|
|
|
// Stuff that is always true
|
|
void TcpClient::invariant(void) const
|
|
{
|
|
|
|
assert(_server != NULL);
|
|
assert(_fd_client > 0);
|
|
assert(_buf_bytes <= _buf_len);
|
|
|
|
} // TcpClient::invariant
|
|
|
|
int TcpClient::send_upstream(const void *data, int data_len)
|
|
{
|
|
invariant();
|
|
// MSG_NOSIGNAL means if the remote end closes the connection, we
|
|
// don't want the SIGPIPE; just return from send with errno EPIPE
|
|
// MSG_DONTWAIT means if the send would block, return EAGAIN or
|
|
// EWOULDBLOCK instead. We will drop the message, but in our cases
|
|
// (low rate small messages) this means there is something wrong
|
|
// with the connection, and it will likely be closed anyway. The
|
|
// absolute worst case is we lose a button event, which should not
|
|
// cause any harm (as if the button were never pressed).
|
|
return send(_fd_client, data, data_len, MSG_NOSIGNAL | MSG_DONTWAIT);
|
|
}
|
|
|
|
// Receive from fd, appending data to buffer.
|
|
// Check for complete message in buffer and send downstream if so.
|
|
// Return true if data received, false if client is gone.
|
|
bool TcpClient::do_recv(void)
|
|
{
|
|
bool status;
|
|
int res;
|
|
|
|
invariant();
|
|
|
|
res = recv(_fd_client, _buf + _buf_bytes, _buf_len - _buf_bytes, 0);
|
|
// cout << "TcpClient:do_recv: recv returned " << res << endl;
|
|
if (res > 0) {
|
|
// new data appended to _buf
|
|
_buf_bytes += res;
|
|
|
|
// don't look at message header until we have enough for it
|
|
while (_buf_bytes >= sizeof(SoloMessage::Hdr)) {
|
|
SoloMessage::Hdr *hdr = (SoloMessage::Hdr *)_buf;
|
|
|
|
// message is SoloMessage::Hdr plus payload
|
|
unsigned msg_len = sizeof(SoloMessage::Hdr) + hdr->length;
|
|
|
|
if (_buf_bytes < msg_len)
|
|
break;
|
|
|
|
// have a complete message; format and send downstream
|
|
|
|
// cout << "TcpClient: have downstream message:" << endl;
|
|
// cout << *hdr << endl;
|
|
|
|
if (hdr->type == SoloMessage::Hdr::SET_BUTTON_STRING) {
|
|
SoloMessage::SetButtonString *appMsg;
|
|
|
|
appMsg = (SoloMessage::SetButtonString *)_buf;
|
|
|
|
// struct ButtonFunctionCfgMsg does not include the descriptor
|
|
// plant the message on top of a buffer big enough
|
|
char stm32Buf[ButtonFunctionCfgMsg_BufSize];
|
|
memset(stm32Buf, 0, sizeof(stm32Buf));
|
|
ButtonFunctionCfgMsg *stm32Msg = (ButtonFunctionCfgMsg *)stm32Buf;
|
|
|
|
stm32Msg->button_id = appMsg->button_id;
|
|
stm32Msg->button_event = appMsg->button_event;
|
|
stm32Msg->shot_id = appMsg->shot_id;
|
|
stm32Msg->state = appMsg->state;
|
|
// descriptor is everything after pad to the end
|
|
int msgBodyLen = sizeof(SoloMessage::SetButtonString) - sizeof(SoloMessage::Hdr);
|
|
int descLen = appMsg->length - msgBodyLen;
|
|
// if too long, chop it off
|
|
if (descLen > (ButtonFunctionCfgMsg::descriptor_max - 1))
|
|
descLen = (ButtonFunctionCfgMsg::descriptor_max - 1);
|
|
memcpy(stm32Msg->descriptor, appMsg->descriptor, descLen);
|
|
|
|
// cout << "TcpClient: sending downstream:" << endl;
|
|
// cout << *stm32Msg << endl;
|
|
|
|
// Send it to the downstream UDP port for this message.
|
|
// (All downstream messages have to be sent via the stm32
|
|
// downstream thread to avoid contention on the serial
|
|
// interface.)
|
|
struct sockaddr_in sa;
|
|
memset((char *)&sa, 0, sizeof(sa));
|
|
sa.sin_family = AF_INET;
|
|
assert(ButtonFunctionCfg::udpPort != 0);
|
|
sa.sin_port = htons(ButtonFunctionCfg::udpPort);
|
|
sendto(_server->stm32_fd(), stm32Msg, sizeof(*stm32Msg) + descLen, 0,
|
|
(struct sockaddr *)&sa, sizeof(sa));
|
|
} else if (hdr->type == SoloMessage::Hdr::SET_SHOT_STRING) {
|
|
SoloMessage::SetShotString *appMsg;
|
|
|
|
appMsg = (SoloMessage::SetShotString *)_buf;
|
|
|
|
// struct SetShotInfoMsg does not include the descriptor
|
|
// plant the message on top of a buffer big enough
|
|
char stm32Buf[SetShotInfoMsg_BufSize];
|
|
memset(stm32Buf, 0, sizeof(stm32Buf));
|
|
SetShotInfoMsg *stm32Msg = (SetShotInfoMsg *)stm32Buf;
|
|
|
|
// descriptor is everything to the end
|
|
int msgBodyLen = sizeof(SoloMessage::SetShotString) - sizeof(SoloMessage::Hdr);
|
|
int descLen = appMsg->length - msgBodyLen;
|
|
// if too long, chop it off
|
|
if (descLen > (SetShotInfoMsg::descriptor_max - 1))
|
|
descLen = (SetShotInfoMsg::descriptor_max - 1);
|
|
memcpy(stm32Msg->descriptor, appMsg->descriptor, descLen);
|
|
|
|
// cout << "TcpClient: sending downstream:" << endl;
|
|
// cout << *stm32Msg << endl;
|
|
|
|
// Send it to the downstream UDP port for this message.
|
|
// (All downstream messages have to be sent via the stm32
|
|
// downstream thread to avoid contention on the serial
|
|
// interface.)
|
|
struct sockaddr_in sa;
|
|
memset((char *)&sa, 0, sizeof(sa));
|
|
sa.sin_family = AF_INET;
|
|
assert(SetShotInfo::udpPort != 0);
|
|
sa.sin_port = htons(SetShotInfo::udpPort);
|
|
sendto(_server->stm32_fd(), stm32Msg, sizeof(*stm32Msg) + descLen, 0,
|
|
(struct sockaddr *)&sa, sizeof(sa));
|
|
}
|
|
|
|
// done with the message at the start of the buffer;
|
|
// copy any unused data after it back to the start
|
|
_buf_bytes -= msg_len; // how much is left over
|
|
memmove(_buf, _buf + msg_len, _buf_bytes);
|
|
|
|
} // while (_buf_bytes...)
|
|
|
|
status = true;
|
|
|
|
} else // res <= 0
|
|
{
|
|
status = false;
|
|
}
|
|
|
|
invariant();
|
|
|
|
return status;
|
|
|
|
} // TcpClient::do_recv
|
|
|
|
// Constructor initializes object; call start method to open socket and start
|
|
// threads.
|
|
TcpServer::TcpServer(int port, const char *name)
|
|
: _name(name), _listen_port(port), _listen_fd(-1), _stm32_fd(-1),
|
|
// _clients
|
|
// _clients_mutex
|
|
_listen_sfd(-1), _downstream_sfd(-1)
|
|
// _listen_id
|
|
// _downstream_id
|
|
{
|
|
}
|
|
|
|
TcpServer::~TcpServer()
|
|
{
|
|
}
|
|
|
|
int TcpServer::start(void)
|
|
{
|
|
int sockopt;
|
|
struct sockaddr_in sa;
|
|
sigset_t mask;
|
|
int res;
|
|
|
|
// initialize clients list mutex
|
|
res = mutex_init(&_clients_mutex);
|
|
if (res != 0) {
|
|
syslog(LOG_ERR, "mutex_init: %d", res);
|
|
goto start_exit_0;
|
|
}
|
|
|
|
// create listening socket
|
|
_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (_listen_fd < 0) {
|
|
syslog(LOG_ERR, "socket: %s", strerror(errno));
|
|
goto start_exit_1;
|
|
}
|
|
|
|
// non-blocking - this handles the race where a client tries to connect,
|
|
// the select() activates, then the client is gone before the accept.
|
|
if (fcntl(_listen_fd, F_SETFL, O_NONBLOCK) != 0) {
|
|
syslog(LOG_ERR, "fcntl: %s", strerror(errno));
|
|
goto start_exit_2;
|
|
}
|
|
|
|
// allow re-binding to our port in case of restart
|
|
sockopt = 1;
|
|
if (setsockopt(_listen_fd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) != 0) {
|
|
syslog(LOG_ERR, "setsockopt: %s", strerror(errno));
|
|
goto start_exit_2;
|
|
}
|
|
|
|
// bind to port where we listen for connections
|
|
memset(&sa, 0, sizeof(sa));
|
|
sa.sin_family = AF_INET;
|
|
sa.sin_port = htons(_listen_port);
|
|
sa.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
if (bind(_listen_fd, (struct sockaddr *)&sa, sizeof(sa)) == -1) {
|
|
syslog(LOG_ERR, "bind: %s", strerror(errno));
|
|
goto start_exit_2;
|
|
}
|
|
|
|
// listen for connections
|
|
if (listen(_listen_fd, 10) == -1) {
|
|
syslog(LOG_ERR, "listen: %s", strerror(errno));
|
|
goto start_exit_2;
|
|
}
|
|
|
|
res = sigemptyset(&mask);
|
|
assert(res == 0); // invalid arguments
|
|
res = sigaddset(&mask, SIGHUP);
|
|
assert(res == 0); // invalid arguments
|
|
res = sigaddset(&mask, SIGQUIT);
|
|
assert(res == 0); // invalid arguments
|
|
res = pthread_sigmask(SIG_BLOCK, &mask, NULL);
|
|
assert(res == 0); // invalid arguments
|
|
|
|
// create listen signal fd
|
|
res = sigemptyset(&mask);
|
|
assert(res == 0); // invalid arguments
|
|
res = sigaddset(&mask, SIGQUIT);
|
|
assert(res == 0); // invalid arguments
|
|
_listen_sfd = signalfd(-1, &mask, 0);
|
|
if (_listen_sfd < 0) {
|
|
syslog(LOG_ERR, "signalfd: %s", strerror(errno));
|
|
goto start_exit_2;
|
|
}
|
|
|
|
// create downstream thread signal fd
|
|
res = sigemptyset(&mask);
|
|
assert(res == 0); // invalid arguments
|
|
res = sigaddset(&mask, SIGHUP);
|
|
assert(res == 0); // invalid arguments
|
|
res = sigaddset(&mask, SIGQUIT);
|
|
assert(res == 0); // invalid arguments
|
|
_downstream_sfd = signalfd(-1, &mask, 0);
|
|
if (_downstream_sfd < 0) {
|
|
syslog(LOG_ERR, "signalfd: %s", strerror(errno));
|
|
goto start_exit_3;
|
|
}
|
|
|
|
// UDP socket that will be used by all clients to send messages to an
|
|
// stm32 downstream port
|
|
_stm32_fd = socket(AF_INET, SOCK_DGRAM, 0);
|
|
if (_stm32_fd < 0) {
|
|
syslog(LOG_ERR, "socket: %s", strerror(errno));
|
|
goto start_exit_4;
|
|
}
|
|
|
|
// bind to any IP, any port */
|
|
memset((char *)&sa, 0, sizeof(sa));
|
|
sa.sin_family = AF_INET;
|
|
sa.sin_port = htons(0);
|
|
sa.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
if (bind(_stm32_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) {
|
|
syslog(LOG_ERR, "bind: %s", strerror(errno));
|
|
goto start_exit_5;
|
|
}
|
|
|
|
// start listen_thread
|
|
if (pthread_create(&_listen_id, NULL, &TcpServer::listen_entry, this) != 0) {
|
|
syslog(LOG_ERR, "pthread_create: %s", strerror(errno));
|
|
goto start_exit_5;
|
|
}
|
|
|
|
if (pthread_setname_np(_listen_id, "stm32_listen") != 0) {
|
|
syslog(LOG_ERR, "pthread_setname_np: %s", strerror(errno));
|
|
goto start_exit_6;
|
|
}
|
|
|
|
// start downstream_thread
|
|
if (pthread_create(&_downstream_id, NULL, &TcpServer::downstream_entry, this) != 0) {
|
|
syslog(LOG_ERR, "pthread_create: %s", strerror(errno));
|
|
goto start_exit_6;
|
|
}
|
|
|
|
if (pthread_setname_np(_downstream_id, "stm32_recv") != 0) {
|
|
syslog(LOG_ERR, "pthread_setname_np: %s", strerror(errno));
|
|
goto start_exit_7;
|
|
}
|
|
|
|
return 0;
|
|
|
|
// WARNING: Error conditions are not tested.
|
|
// Errors here are probably fatal to the system.
|
|
|
|
start_exit_7:
|
|
thread_kill(_downstream_id);
|
|
thread_wait(_downstream_id);
|
|
|
|
start_exit_6:
|
|
thread_kill(_listen_id);
|
|
thread_wait(_listen_id);
|
|
|
|
start_exit_5:
|
|
close(_stm32_fd);
|
|
_stm32_fd = -1;
|
|
|
|
start_exit_4:
|
|
close(_downstream_sfd);
|
|
_downstream_sfd = -1;
|
|
|
|
start_exit_3:
|
|
close(_listen_sfd);
|
|
_listen_sfd = -1;
|
|
|
|
start_exit_2:
|
|
close(_listen_fd);
|
|
_listen_fd = -1;
|
|
|
|
start_exit_1:
|
|
(void)pthread_mutex_destroy(&_clients_mutex);
|
|
|
|
start_exit_0:
|
|
return -1;
|
|
|
|
} // TcpServer::start
|
|
|
|
// WARNING: Stopping is not tested. Normal operation is to start and run
|
|
// forever, or stop by killing the entire process.
|
|
void TcpServer::stop(void)
|
|
{
|
|
|
|
thread_kill(_downstream_id);
|
|
thread_kill(_listen_id);
|
|
|
|
thread_wait(_downstream_id);
|
|
thread_wait(_listen_id);
|
|
|
|
close(_stm32_fd);
|
|
_stm32_fd = -1;
|
|
|
|
close(_downstream_sfd);
|
|
_downstream_sfd = -1;
|
|
|
|
close(_listen_sfd);
|
|
_listen_sfd = -1;
|
|
|
|
close(_listen_fd);
|
|
_listen_fd = -1;
|
|
|
|
(void)pthread_mutex_destroy(&_clients_mutex);
|
|
|
|
} // TcpServer::stop
|
|
|
|
// Tell a thread to exit. We send it SIGQUIT; it is expected to be waiting
|
|
// on a signalfd where this will arrive, and know that SIGQUIT means quit.
|
|
void TcpServer::thread_kill(pthread_t id)
|
|
{
|
|
// SIGQUIT means quit
|
|
if (pthread_kill(id, SIGQUIT) != 0)
|
|
syslog(LOG_ERR, "pthread_kill: %s", strerror(errno));
|
|
}
|
|
|
|
// Wait for a thread to exit. It should have already received the SIGQUIT, so
|
|
// should soon get it and cleanly exit. If it does not, we attempt to kill it.
|
|
void TcpServer::thread_wait(pthread_t id)
|
|
{
|
|
// The timeout to pthread_timedjoin_np() is an absolute time, which
|
|
// can break if the time is changed while waiting. Instead we poll
|
|
// pthread_tryjoin_np() few times.
|
|
|
|
// wait up to 10 msec
|
|
for (int try_num = 0; try_num < 10; try_num++) {
|
|
if (pthread_tryjoin_np(id, NULL) == 0)
|
|
return;
|
|
usleep(1000); // 1 msec
|
|
}
|
|
|
|
// the join never succeeded; try canceling
|
|
syslog(LOG_ERR, "thread_wait: canceling thread");
|
|
pthread_cancel(id);
|
|
// thread cleans up asynchronously
|
|
usleep(10000); // 10 msec
|
|
|
|
} // TcpServer::thread_wait
|
|
|
|
// Listening thread.
|
|
// Create a socket and listen for connections. When a connection is received,
|
|
// the client is added to the list of clients. The list of clients determines
|
|
// what is monitored for incoming data (select, in downstream_thread) and who
|
|
// gets all upstream messages (send_clients method).
|
|
void *TcpServer::listen_thread(void)
|
|
{
|
|
int nfds;
|
|
fd_set rfds;
|
|
|
|
syslog(LOG_INFO, "%s listen_thread: starting", _name);
|
|
|
|
while (true) {
|
|
|
|
// Select waits for either an incoming connection on _listen_fd or a
|
|
// signal on _listen_sfd.
|
|
FD_ZERO(&rfds);
|
|
nfds = 0;
|
|
|
|
FD_SET(_listen_sfd, &rfds);
|
|
if (nfds <= _listen_sfd)
|
|
nfds = _listen_sfd + 1;
|
|
|
|
FD_SET(_listen_fd, &rfds);
|
|
if (nfds <= _listen_fd)
|
|
nfds = _listen_fd + 1;
|
|
|
|
int res = select(nfds, &rfds, NULL, NULL, NULL);
|
|
|
|
if (res > 0) {
|
|
|
|
// got a signal?
|
|
if (FD_ISSET(_listen_sfd, &rfds)) {
|
|
// clear out the signal
|
|
signalfd_siginfo si;
|
|
if (read(_listen_sfd, &si, sizeof(si)) != sizeof(si)) {
|
|
syslog(LOG_ERR, "%s listen_thread: reading siginfo", _name);
|
|
break; // while (true)
|
|
} else if (si.ssi_signo == SIGQUIT) {
|
|
// SIGQUIT means quit
|
|
break; // while (true)
|
|
} else {
|
|
// mystery signal
|
|
syslog(LOG_ERR, "%s listen_thread: unexpected signal", _name);
|
|
}
|
|
}
|
|
|
|
// got a connection?
|
|
if (FD_ISSET(_listen_fd, &rfds)) {
|
|
// Incoming client - this will not block (the socket is
|
|
// nonblocking). If the client is no longer trying to
|
|
// connect, we get an invalid fd_client (immediately) and
|
|
// handle that.
|
|
struct sockaddr_in sa;
|
|
memset((char *)&sa, 0, sizeof(sa));
|
|
socklen_t sa_len = sizeof(sa);
|
|
int fd_client = accept(_listen_fd, (struct sockaddr *)&sa, &sa_len);
|
|
if (fd_client < 0) {
|
|
// client tried to connect then changed its mind
|
|
syslog(LOG_ERR, "accept: %s", strerror(errno));
|
|
usleep(10000); // 10ms
|
|
continue;
|
|
}
|
|
|
|
if (sa.sin_family == AF_INET)
|
|
syslog(LOG_INFO, "%s listen_thread: adding client %d at %s:%u", _name,
|
|
fd_client, inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
|
|
else
|
|
syslog(LOG_INFO, "%s listen_thread: adding client %d", _name, fd_client);
|
|
|
|
// Add new client to the list of clients that gets everything
|
|
// via send_clients(). Clients are removed from this list when
|
|
// send_clients() determines that the connection is dead.
|
|
TcpClient *new_client = new TcpClient(this, fd_client);
|
|
pthread_mutex_lock(&_clients_mutex);
|
|
_clients.push_back(new_client);
|
|
pthread_mutex_unlock(&_clients_mutex);
|
|
|
|
// Knock downstream_thread out of its select, so it will
|
|
// rebuild the fdset with the new client (we use SIGHUP).
|
|
if (pthread_kill(_downstream_id, SIGHUP) != 0)
|
|
syslog(LOG_ERR, "pthread_kill: %s", strerror(errno));
|
|
}
|
|
} else {
|
|
syslog(LOG_ERR, "%s listen_thread: select: %s", _name, strerror(errno));
|
|
}
|
|
|
|
} // while (true)
|
|
|
|
syslog(LOG_INFO, "%s listen_thread: exiting", _name);
|
|
|
|
return NULL;
|
|
|
|
} // TcpServer::listen_thread
|
|
|
|
// Downstream thread.
|
|
// Select on clients, waiting for data to arrive for one.
|
|
// Read data, letting each client accumulate data and send a message
|
|
// downstream when it has one.
|
|
void *TcpServer::downstream_thread(void)
|
|
{
|
|
int nfds;
|
|
fd_set rfds;
|
|
list< TcpClient * >::iterator it, next;
|
|
int res;
|
|
int fd;
|
|
|
|
syslog(LOG_INFO, "%s downstream_thread: starting", _name);
|
|
|
|
while (true) {
|
|
|
|
// build set of fds to wait on
|
|
FD_ZERO(&rfds);
|
|
nfds = 0;
|
|
|
|
// signal fd is always in the set
|
|
fd = _downstream_sfd;
|
|
FD_SET(fd, &rfds);
|
|
if (nfds <= fd)
|
|
nfds = fd + 1;
|
|
|
|
// add all the clients to the set
|
|
pthread_mutex_lock(&_clients_mutex);
|
|
for (it = _clients.begin(); it != _clients.end(); it++) {
|
|
fd = (*it)->fd_client();
|
|
FD_SET(fd, &rfds);
|
|
if (nfds <= fd)
|
|
nfds = fd + 1;
|
|
}
|
|
// unsigned num_clients = _clients.size();
|
|
pthread_mutex_unlock(&_clients_mutex);
|
|
|
|
// wait for something on any of them (or a signal)
|
|
res = select(nfds, &rfds, NULL, NULL, NULL);
|
|
|
|
if (res > 0) {
|
|
|
|
// signal?
|
|
if (FD_ISSET(_downstream_sfd, &rfds)) {
|
|
signalfd_siginfo si;
|
|
if (read(_downstream_sfd, &si, sizeof(si)) != sizeof(si)) {
|
|
syslog(LOG_ERR, "%s downstream_thread: reading siginfo", _name);
|
|
break; // while (true)
|
|
} else if (si.ssi_signo == SIGQUIT) {
|
|
// SIGQUIT means quit
|
|
break; // while (true)
|
|
} else if (si.ssi_signo == SIGHUP) {
|
|
// SIGHUP means recreate the fdset we're waiting on
|
|
} else {
|
|
// mystery signal
|
|
syslog(LOG_ERR, "%s downstream_thread: unexpected signal", _name);
|
|
}
|
|
}
|
|
|
|
// data from any client?
|
|
pthread_mutex_lock(&_clients_mutex);
|
|
for (it = _clients.begin(); it != _clients.end(); it = next) {
|
|
next = it;
|
|
next++;
|
|
TcpClient *c = *it;
|
|
if (FD_ISSET(c->fd_client(), &rfds)) {
|
|
if (!c->do_recv()) {
|
|
syslog(LOG_INFO, "%s downstream_thread: deleting client %d", _name,
|
|
c->fd_client());
|
|
_clients.erase(it);
|
|
delete c;
|
|
}
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&_clients_mutex);
|
|
} else // res <= 0
|
|
{
|
|
syslog(LOG_ERR, "%s downstream_thread: select returned %d", _name, res);
|
|
}
|
|
|
|
} // while (true)
|
|
|
|
syslog(LOG_INFO, "%s downstream_thread: exiting", _name);
|
|
|
|
return NULL;
|
|
|
|
} // TcpServer::downstream_thread
|
|
|
|
// Send data to all clients.
|
|
// The supplied data is sent to all clients in the client list, i.e. all
|
|
// clients that have connected to the server.
|
|
void TcpServer::send_clients(const void *data, int data_len)
|
|
{
|
|
list< TcpClient * >::iterator it;
|
|
|
|
pthread_mutex_lock(&_clients_mutex);
|
|
|
|
for (it = _clients.begin(); it != _clients.end(); it++) {
|
|
uint64_t t0_us = clock_gettime_us(CLOCK_MONOTONIC);
|
|
int rc = (*it)->send_upstream(data, data_len);
|
|
uint64_t t1_us = clock_gettime_us(CLOCK_MONOTONIC);
|
|
if (rc != data_len) {
|
|
// The client might be gone; downstream thread detects that
|
|
// and deletes it
|
|
syslog(LOG_ERR, "%s send_clients: %s", _name, strerror(errno));
|
|
}
|
|
unsigned delay_us = t1_us - t0_us;
|
|
if (delay_us > 10000) {
|
|
// Looking for trouble - when does send_upstream block?
|
|
syslog(LOG_ERR, "%s send_clients: send_upstream took %u usec", _name, delay_us);
|
|
}
|
|
}
|
|
|
|
pthread_mutex_unlock(&_clients_mutex);
|
|
|
|
} // TcpServer::send_clients
|