@@ -1424,11 +1424,16 @@ void local_loop_unix(int argc, char* const* argv, int optind, int rcv_buf, int l
14241424 string tmp = i > 0 ? string_format (" %s-%d" , unix_socket, i) : string (unix_socket);
14251425 int fd = open_unix_socket_for_rx (tmp.c_str (), rcv_buf);
14261426
1427+ IPC_MSG (" %" PRIu64 " \t LISTEN_UNIX\t %s:%x\n " , get_time_ms (), tmp.c_str (), i);
14271428 WFB_INFO (" Listen on @%s for %s\n " , tmp.c_str (), wlan);
1429+
14281430 rx_fd.push_back (fd);
14291431 wlans.push_back (string (wlan));
14301432 }
14311433
1434+ IPC_MSG (" %" PRIu64 " \t LISTEN_UNIX_END\n " , get_time_ms ());
1435+ IPC_MSG_SEND ();
1436+
14321437 if (debug_port)
14331438 {
14341439 WFB_INFO (" Using %zu ports from %d for wlan emulation\n " , wlans.size (), debug_port);
@@ -1521,6 +1526,69 @@ void distributor_loop(int argc, char* const* argv, int optind, int rcv_buf, int
15211526 data_source (t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
15221527}
15231528
1529+
1530+ void distributor_loop_unix (int argc, char * const * argv, int optind, int rcv_buf, int log_interval,
1531+ const char * unix_socket, int k, int n, const string &keypair, int fec_timeout,
1532+ uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark,
1533+ radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror,
1534+ int snd_buf_size)
1535+ {
1536+ vector<int > rx_fd;
1537+ vector<pair<string, vector<uint16_t >>> remote_hosts;
1538+ int port_idx = 0 ;
1539+
1540+ set<string> hosts;
1541+
1542+ for (int i = optind; i < argc; i++)
1543+ {
1544+ vector<uint16_t > remote_ports;
1545+ char *p = argv[i];
1546+ char *t = NULL ;
1547+
1548+ t = strsep (&p, " :" );
1549+ if (t == NULL ) continue ;
1550+
1551+ string remote_host = string (t);
1552+
1553+ if (hosts.count (remote_host))
1554+ {
1555+ throw runtime_error (string_format (" Duplicate host %s" , remote_host.c_str ()));
1556+ }
1557+
1558+ hosts.insert (remote_host);
1559+
1560+ for (int j=0 ; (t=strsep (&p, " ," )) != NULL ; j++, port_idx++)
1561+ {
1562+ uint16_t remote_port = atoi (t);
1563+
1564+ string tmp = port_idx > 0 ? string_format (" %s-%d" , unix_socket, port_idx) : string (unix_socket);
1565+ int fd = open_unix_socket_for_rx (tmp.c_str (), rcv_buf);
1566+
1567+ uint64_t wlan_id = (uint64_t )ntohl (inet_addr (remote_host.c_str ())) << 24 | j;
1568+
1569+ IPC_MSG (" %" PRIu64 " \t LISTEN_UNIX\t %s:%" PRIx64 " \n " , get_time_ms (), tmp.c_str (), wlan_id);
1570+ WFB_INFO (" Listen on @%s for %s:%d\n " , tmp.c_str (), remote_host.c_str (), remote_port);
1571+
1572+ rx_fd.push_back (fd);
1573+ remote_ports.push_back (remote_port);
1574+ }
1575+
1576+ remote_hosts.push_back (pair<string, vector<uint16_t >>(remote_host, remote_ports));
1577+ }
1578+
1579+ IPC_MSG (" %" PRIu64 " \t LISTEN_UNIX_END\n " , get_time_ms ());
1580+ IPC_MSG_SEND ();
1581+
1582+ vector<tags_item_t > tags;
1583+ unique_ptr<Transmitter> t = unique_ptr<RemoteTransmitter>(new RemoteTransmitter (k, n, keypair, epoch, channel_id, fec_delay, tags,
1584+ remote_hosts, radiotap_header, frame_type, use_qdisc,
1585+ fwmark, snd_buf_size));
1586+
1587+ int control_fd = open_control_fd (control_port);
1588+ data_source (t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
1589+ }
1590+
1591+
15241592int main (int argc, char * const *argv)
15251593{
15261594 int opt;
@@ -1574,7 +1642,6 @@ int main(int argc, char * const *argv)
15741642 udp_port = atoi (optarg);
15751643 break ;
15761644 case ' U' :
1577- tx_mode = LOCAL_UNIX;
15781645 unix_socket = optarg;
15791646 break ;
15801647 case ' p' :
@@ -1718,27 +1785,41 @@ int main(int argc, char * const *argv)
17181785 break ;
17191786
17201787 case LOCAL:
1721- local_loop_udp (argc, argv, optind, rcv_buf, log_interval,
1722- udp_port, debug_port, k, n, keypair, fec_timeout,
1723- epoch, channel_id, fec_delay, use_qdisc, fwmark,
1724- radiotap_header, frame_type, control_port, mirror,
1725- snd_buf);
1726- break ;
1727-
1728- case LOCAL_UNIX:
1729- local_loop_unix (argc, argv, optind, rcv_buf, log_interval,
1730- unix_socket, debug_port, k, n, keypair, fec_timeout,
1731- epoch, channel_id, fec_delay, use_qdisc, fwmark,
1732- radiotap_header, frame_type, control_port, mirror,
1733- snd_buf);
1788+ if (unix_socket != NULL )
1789+ {
1790+ local_loop_unix (argc, argv, optind, rcv_buf, log_interval,
1791+ unix_socket, debug_port, k, n, keypair, fec_timeout,
1792+ epoch, channel_id, fec_delay, use_qdisc, fwmark,
1793+ radiotap_header, frame_type, control_port, mirror,
1794+ snd_buf);
1795+ }
1796+ else
1797+ {
1798+ local_loop_udp (argc, argv, optind, rcv_buf, log_interval,
1799+ udp_port, debug_port, k, n, keypair, fec_timeout,
1800+ epoch, channel_id, fec_delay, use_qdisc, fwmark,
1801+ radiotap_header, frame_type, control_port, mirror,
1802+ snd_buf);
1803+ }
17341804 break ;
17351805
17361806 case DISTRIBUTOR:
1737- distributor_loop (argc, argv, optind, rcv_buf, log_interval,
1738- udp_port, k, n, keypair, fec_timeout,
1739- epoch, channel_id, fec_delay, use_qdisc, fwmark,
1740- radiotap_header, frame_type, control_port, mirror,
1741- snd_buf);
1807+ if (unix_socket != NULL )
1808+ {
1809+ distributor_loop_unix (argc, argv, optind, rcv_buf, log_interval,
1810+ unix_socket, k, n, keypair, fec_timeout,
1811+ epoch, channel_id, fec_delay, use_qdisc, fwmark,
1812+ radiotap_header, frame_type, control_port, mirror,
1813+ snd_buf);
1814+ }
1815+ else
1816+ {
1817+ distributor_loop (argc, argv, optind, rcv_buf, log_interval,
1818+ udp_port, k, n, keypair, fec_timeout,
1819+ epoch, channel_id, fec_delay, use_qdisc, fwmark,
1820+ radiotap_header, frame_type, control_port, mirror,
1821+ snd_buf);
1822+ }
17421823 break ;
17431824
17441825 default :
0 commit comments