move packet copy to async threads

(probably not great but oh well. can't seem to avoid it because tungstenite blows)
This commit is contained in:
Lily Tsuru 2024-10-11 00:58:17 -04:00
parent a34f833c80
commit 7c66291479
2 changed files with 115 additions and 98 deletions

View file

@ -33,27 +33,33 @@ use axum::{
use futures::{sink::SinkExt, stream::StreamExt}; use futures::{sink::SinkExt, stream::StreamExt};
#[derive(Clone)]
enum WsMessage {
VideoPacket { packet: ffmpeg::Packet },
Json(String),
}
struct AppState { struct AppState {
encoder_tx: Arc<TokioMutex<mpsc::Sender<EncodeThreadInput>>>, encoder_tx: Arc<TokioMutex<mpsc::Sender<EncodeThreadInput>>>,
inputs: Arc<TokioMutex<Vec<u32>>>, inputs: Arc<TokioMutex<Vec<u32>>>,
websocket_broadcast_tx: broadcast::Sender<ws::Message>, websocket_broadcast_tx: broadcast::Sender<WsMessage>,
websocket_count: TokioMutex<usize>, websocket_count: TokioMutex<usize>,
} }
impl AppState { impl AppState {
fn new(encoder_tx: mpsc::Sender<EncodeThreadInput>) -> Self { fn new(encoder_tx: mpsc::Sender<EncodeThreadInput>) -> Self {
let (chat_tx, _chat_rx) = broadcast::channel(10); let (broadcast_tx, _) = broadcast::channel(10);
Self { Self {
encoder_tx: Arc::new(TokioMutex::new(encoder_tx)), encoder_tx: Arc::new(TokioMutex::new(encoder_tx)),
inputs: Arc::new(TokioMutex::new(Vec::new())), inputs: Arc::new(TokioMutex::new(Vec::new())),
websocket_broadcast_tx: chat_tx, websocket_broadcast_tx: broadcast_tx,
websocket_count: TokioMutex::const_new(0usize), websocket_count: TokioMutex::const_new(0usize),
} }
} }
} }
#[tokio::main(flavor = "multi_thread", worker_threads = 8)] #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
// Setup a tracing subscriber // Setup a tracing subscriber
let subscriber = tracing_subscriber::FmtSubscriber::builder() let subscriber = tracing_subscriber::FmtSubscriber::builder()
@ -75,109 +81,106 @@ async fn main() -> anyhow::Result<()> {
let state_clone = state.clone(); let state_clone = state.clone();
// retro event handler. drives the encoder thread too // retro event handler. drives the encoder thread too
tokio::spawn(async move { let _ = std::thread::Builder::new()
let surface_clone = surface.clone(); .name("retro_event_rx".into())
let frame_clone = frame.clone(); .spawn(move || {
let surface_clone = surface.clone();
let frame_clone = frame.clone();
// start the thread now that we're alive // start the libretro thread looping now that we're alive
let _ = event_in_tx.send(retro_thread::RetroInEvent::Start).await; let _ = event_in_tx.blocking_send(retro_thread::RetroInEvent::Start);
loop { loop {
match event_rx.try_recv() { match event_rx.try_recv() {
Ok(msg) => match msg { Ok(msg) => match msg {
RetroEvent::Frame => { RetroEvent::Frame => {
{ {
let mut frame_locked = frame.lock().expect( let mut frame_locked = frame.lock().expect(
"Couldn't lock frame on our end. Did the encoder thread panic?", "Couldn't lock frame on our end. Did the encoder thread panic?",
); );
let mut_frame = frame_locked.as_mut().expect("it's None? why?"); let mut_frame = frame_locked.as_mut().expect("it's None? why?");
let width = mut_frame.width(); let width = mut_frame.width();
let height = mut_frame.height(); let height = mut_frame.height();
let mut surf = surface_clone.lock().expect( let mut surf = surface_clone.lock().expect(
"locking the VNC surface to paint it to the ffmpeg frame failed", "locking the VNC surface to paint it to the ffmpeg frame failed",
); );
let surf_buf = surf.get_buffer(); let surf_buf = surf.get_buffer();
let buf_ptr = let buf_ptr =
unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 };
for y in 0..height { for y in 0..height {
let line_stride = (y * width) as usize; let line_stride = (y * width) as usize;
// Make a slice for the line // Make a slice for the line
// SAFETY: The allocation is guaranteed to be large enough // SAFETY: The allocation is guaranteed to be large enough
// for this to work from y = 0..height // for this to work from y = 0..height
let dest_line_slice = unsafe { let dest_line_slice = unsafe {
let dest_line_ptr = buf_ptr.add(line_stride); let dest_line_ptr = buf_ptr.add(line_stride);
std::slice::from_raw_parts_mut(dest_line_ptr, width as usize) std::slice::from_raw_parts_mut(
}; dest_line_ptr,
width as usize,
)
};
dest_line_slice.copy_from_slice( dest_line_slice.copy_from_slice(
&surf_buf[line_stride..line_stride + width as usize], &surf_buf[line_stride..line_stride + width as usize],
); );
}
} }
let _ = state_clone
.encoder_tx
.blocking_lock()
.blocking_send(encoder_thread::EncodeThreadInput::SendFrame);
} }
let _ = state_clone RetroEvent::Resize { size } => {
.encoder_tx // make a new frame for the encoder
.lock() {
.await let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame");
.send(encoder_thread::EncodeThreadInput::SendFrame)
.await;
}
RetroEvent::Resize { size } => { *lk_frame = Some(ffmpeg::frame::Video::new(
// make a new frame for the encoder ffmpeg::format::Pixel::BGRA,
{ size.clone().width,
let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); size.clone().height,
));
}
*lk_frame = Some(ffmpeg::frame::Video::new( let _ = state_clone.encoder_tx.blocking_lock().blocking_send(
ffmpeg::format::Pixel::BGRA, encoder_thread::EncodeThreadInput::Init { size: size.clone() },
size.clone().width, );
size.clone().height,
));
} }
let _ = state_clone RetroEvent::WantInputs { tx } => {
.encoder_tx let inputs = state_clone.inputs.blocking_lock();
.lock() //tracing::info!("giving inputs {:?}", inputs);
.await tx.send(inputs.clone()).expect("FUCK");
.send(encoder_thread::EncodeThreadInput::Init { size: size.clone() }) }
.await; },
}
RetroEvent::WantInputs { tx } => { Err(TryRecvError::Disconnected) => break,
let inputs = state_clone.inputs.lock().await; Err(TryRecvError::Empty) => {}
//tracing::info!("giving inputs {:?}", inputs); }
tx.send(inputs.clone()).expect("FUCK");
}
},
Err(TryRecvError::Disconnected) => break, match encoder_rx.try_recv() {
Err(TryRecvError::Empty) => {} Ok(msg) => match msg {
encoder_thread::EncodeThreadOutput::Frame { packet } => {
let _ = state_clone
.websocket_broadcast_tx
.send(WsMessage::VideoPacket { packet });
}
},
Err(TryRecvError::Empty) => {}
_ => break,
}
std::thread::sleep(Duration::from_millis(1));
} }
})
match encoder_rx.try_recv() { .expect("failed to spawn retro RX thread, it's probably over");
Ok(msg) => match msg {
encoder_thread::EncodeThreadOutput::Frame { mut packet } => {
let vec = {
let data = packet.data_mut().expect("packet is empty somehow");
data.to_vec()
};
let _ = state_clone
.websocket_broadcast_tx
.send(ws::Message::Binary(vec));
}
},
Err(TryRecvError::Empty) => {}
_ => break,
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
// Axum websocket server // Axum websocket server
let app: Router<()> = Router::new() let app: Router<()> = Router::new()
@ -254,8 +257,22 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let mut sub = send_clone.websocket_broadcast_tx.subscribe(); let mut sub = send_clone.websocket_broadcast_tx.subscribe();
while let Ok(msg) = sub.recv().await { while let Ok(msg) = sub.recv().await {
if sender.send(msg).await.is_err() { match msg {
break; WsMessage::VideoPacket { mut packet } => {
// :(. At least this copy doesn't occur on the driver threads anymore..
let data = packet.data_mut().expect("shouldn't be taken");
let msg = ws::Message::Binary(data.to_vec());
if sender.send(msg).await.is_err() {
break;
}
}
WsMessage::Json(s) => {
let msg = ws::Message::Text(s);
if sender.send(msg).await.is_err() {
break;
}
}
} }
} }
}); });
@ -286,12 +303,9 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
"msg": json["msg"].as_str().unwrap() "msg": json["msg"].as_str().unwrap()
}); });
recv_clone recv_clone.websocket_broadcast_tx.send(WsMessage::Json(
.websocket_broadcast_tx serde_json::to_string(&send).expect("oh well"),
.send(ws::Message::Text( ));
serde_json::to_string(&send).expect("oh well"),
))
.expect("boom");
continue; continue;
} }

View file

@ -441,9 +441,12 @@ pub fn spawn_retro_thread(
let (event_in_tx, event_in_rx) = mpsc::channel(8); let (event_in_tx, event_in_rx) = mpsc::channel(8);
let fb_clone = surface.clone(); let fb_clone = surface.clone();
std::thread::spawn(move || { // discard the join handle
retro_thread_main(fb_clone, event_tx, event_in_rx); let _ = std::thread::Builder::new()
}); .name("retro_game".into())
.spawn(move || {
retro_thread_main(fb_clone, event_tx, event_in_rx);
}).expect("failed to spawn the game thread");
(event_rx, event_in_tx) (event_rx, event_in_tx)
} }