@ -1,7 +1,12 @@
# include "acoustic_analyzer/io/mobile_phone_source.h"
# include <ros/ros.h>
# include <cstring>
# include <cmath>
# include <chrono>
# include <thread>
# include <mutex>
# include <condition_variable>
# include <queue>
# include <netinet/in.h>
# include <sys/socket.h>
# include <unistd.h>
@ -10,92 +15,135 @@
namespace acoustic {
MobilePhoneSource : : MobilePhoneSource ( int port , int sample_rate , float timeout_sec )
: port_ ( port ) , sample_rate_ ( sample_rate ) , timeout_sec_ ( timeout_sec ) , sockfd_ ( - 1 ) { }
struct MobilePhoneSource : : Impl {
int port_ = 0 ;
int sample_rate_ = 16000 ;
float timeout_sec_ = 10.0f ;
int sockfd_ = - 1 ;
bool running_ = false ;
std : : thread recv_thread_ ;
std : : mutex mutex_ ;
std : : condition_variable cv_ ;
std : : queue < float > buffer_ ;
MobilePhoneSource : : ~ MobilePhoneSource ( ) {
close ( ) ;
}
Impl ( int port , int sample_rate , float timeout_sec )
: port_ ( port ) , sample_rate_ ( sample_rate ) , timeout_sec_ ( timeout_sec ) { }
bool MobilePhoneSource : : open ( ) {
sockfd_ = socket ( AF_INET , SOCK_DGRAM , 0 ) ;
if ( sockfd_ < 0 ) return false ;
~ Impl ( ) { Close ( ) ; }
int opt = 1 ;
setsockopt ( sockfd_ , SOL_SOCKET , SO_REUSEADDR , & opt , sizeof ( opt ) ) ;
bool Open ( ) {
sockfd_ = socket ( AF_INET , SOCK_DGRAM , 0 ) ;
if ( sockfd_ < 0 ) return false ;
sockaddr_in addr { } ;
addr . sin_family = AF_INET ;
addr . sin_port = htons ( port_ ) ;
addr . sin_addr . s_addr = INADDR_ANY ;
int opt = 1 ;
setsockopt ( sockfd_ , SOL_SOCKET , SO_REUSEADDR , & opt , sizeof ( opt ) ) ;
if ( bind ( sockfd_ , reinterpret_cast < sockaddr * > ( & addr ) , sizeof ( addr ) ) < 0 ) {
: : close ( sockfd_ ) ;
sockfd_ = - 1 ;
return false ;
}
sockaddr_in addr { } ;
addr . sin_family = AF_INET ;
addr . sin_port = htons ( port_ ) ;
addr . sin_addr . s_addr = INADDR_ANY ;
// Set non-blocking
int flags = fcntl ( sockfd_ , F_GETFL , 0 ) ;
fcntl ( sockfd_ , F_SETFL , flags | O_NONBLOCK ) ;
if ( bind ( sockfd_ , reinterpret_cast < sockaddr * > ( & addr ) , sizeof ( addr ) ) < 0 ) {
: : close ( sockfd_ ) ;
sockfd_ = - 1 ;
return false ;
}
running_ = true ;
recv_thread_ = std : : thread ( & MobilePhoneSource : : receive_loop , this ) ;
return true ;
}
int flags = fcntl ( sockfd_ , F_GETFL , 0 ) ;
fcntl ( sockfd_ , F_SETFL , flags | O_NONBLOCK ) ;
void MobilePhoneSource : : close ( ) {
running_ = false ;
if ( recv_thread_ . joinable ( ) ) recv_thread_ . join ( ) ;
if ( sockfd_ > = 0 ) {
: : close ( sockfd_ ) ;
sockfd_ = - 1 ;
running_ = true ;
recv_thread_ = std : : thread ( & Impl : : ReceiveLoop , this ) ;
return true ;
}
}
void MobilePhoneSource : : receive_loop ( ) {
std : : vector < float > packet_buf ( 2048 ) ;
sockaddr_in client_addr { } ;
socklen_t addr_len = sizeof ( client_addr ) ;
auto start = std : : chrono : : steady_clock : : now ( ) ;
while ( running_ ) {
ssize_t n = recvfrom ( sockfd_ , packet_buf . data ( ) ,
packet_buf . size ( ) * sizeof ( float ) ,
0 , reinterpret_cast < sockaddr * > ( & client_addr ) , & addr_len ) ;
if ( n > 0 ) {
size_t samples = n / sizeof ( float ) ;
std : : lock_guard < std : : mutex > lock ( mutex_ ) ;
for ( size_t i = 0 ; i < samples ; + + i ) {
buffer_ . push ( packet_buf [ i ] ) ;
}
cv_ . notify_one ( ) ;
start = std : : chrono : : steady_clock : : now ( ) ;
} else {
auto now = std : : chrono : : steady_clock : : now ( ) ;
float elapsed = std : : chrono : : duration < float > ( now - start ) . count ( ) ;
if ( elapsed > timeout_sec_ ) {
// Timeout: stop waiting
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
}
void Close ( ) {
running_ = false ;
if ( recv_thread_ . joinable ( ) ) recv_thread_ . join ( ) ;
if ( sockfd_ > = 0 ) {
: : close ( sockfd_ ) ;
sockfd_ = - 1 ;
}
}
}
size_t MobilePhoneSource : : read ( std : : vector < std : : vector < float > > & out , size_t max_samples ) {
std : : unique_lock < std : : mutex > lock ( mutex_ ) ;
if ( buffer_ . empty ( ) ) {
cv_ . wait_for ( lock , std : : chrono : : milliseconds ( 100 ) ) ;
void ReceiveLoop ( ) {
std : : vector < float > packet_buf ( 2048 ) ;
sockaddr_in client_addr { } ;
socklen_t addr_len = sizeof ( client_addr ) ;
auto start = std : : chrono : : steady_clock : : now ( ) ;
while ( running_ ) {
ssize_t n = recvfrom ( sockfd_ , packet_buf . data ( ) ,
packet_buf . size ( ) * sizeof ( float ) ,
0 , reinterpret_cast < sockaddr * > ( & client_addr ) , & addr_len ) ;
if ( n > 0 ) {
size_t samples = static_cast < size_t > ( n ) / sizeof ( float ) ;
std : : lock_guard < std : : mutex > lock ( mutex_ ) ;
for ( size_t i = 0 ; i < samples ; + + i ) {
buffer_ . push ( packet_buf [ i ] ) ;
}
cv_ . notify_one ( ) ;
start = std : : chrono : : steady_clock : : now ( ) ;
} else {
auto now = std : : chrono : : steady_clock : : now ( ) ;
float elapsed = std : : chrono : : duration < float > ( now - start ) . count ( ) ;
if ( elapsed > timeout_sec_ ) {
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 10 ) ) ;
}
}
}
}
size_t available = std : : min ( max_samples , buffer_ . size ( ) ) ;
out . resize ( 1 ) ;
out [ 0 ] . resize ( available ) ;
for ( size_t i = 0 ; i < available ; + + i ) {
out [ 0 ] [ i ] = buffer_ . front ( ) ;
buffer_ . pop ( ) ;
std : : vector < float > Read ( std : : size_t max_samples ) {
std : : unique_lock < std : : mutex > lock ( mutex_ ) ;
if ( buffer_ . empty ( ) ) {
cv_ . wait_for ( lock , std : : chrono : : milliseconds ( 100 ) ) ;
}
std : : size_t available = std : : min ( max_samples , buffer_ . size ( ) ) ;
std : : vector < float > out ( available ) ;
for ( std : : size_t i = 0 ; i < available ; + + i ) {
out [ i ] = buffer_ . front ( ) ;
buffer_ . pop ( ) ;
}
return out ;
}
return available ;
} ;
MobilePhoneSource : : MobilePhoneSource ( ros : : NodeHandle & nh ,
const std : : string & topic ,
float timeout_sec ,
int sample_rate )
: impl_ ( std : : make_unique < Impl > ( 0 , sample_rate , timeout_sec ) ) {
( void ) nh ;
( void ) topic ;
// Topic-based ROS subscriber could be added here; currently using UDP fallback.
}
MobilePhoneSource : : ~ MobilePhoneSource ( ) = default ;
bool MobilePhoneSource : : Open ( ) {
return impl_ - > Open ( ) ;
}
std : : vector < float > MobilePhoneSource : : Read ( std : : size_t num_samples ) {
return impl_ - > Read ( num_samples ) ;
}
void MobilePhoneSource : : Close ( ) {
impl_ - > Close ( ) ;
}
std : : size_t MobilePhoneSource : : NumChannels ( ) const {
return 1 ;
}
int MobilePhoneSource : : SampleRate ( ) const {
return impl_ - > sample_rate_ ;
}
bool MobilePhoneSource : : IsOpen ( ) const {
return impl_ - > sockfd_ > = 0 ;
}
} // namespace acoustic