From 7c662914797788e9e8193a79ac2ff73ae70439f6 Mon Sep 17 00:00:00 2001 From: modeco80 Date: Fri, 11 Oct 2024 00:58:17 -0400 Subject: [PATCH] move packet copy to async threads (probably not great but oh well. can't seem to avoid it because tungstenite blows) --- server/src/main.rs | 204 ++++++++++++++++++++----------------- server/src/retro_thread.rs | 9 +- 2 files changed, 115 insertions(+), 98 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 196c7be..e3432d8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,27 +33,33 @@ use axum::{ use futures::{sink::SinkExt, stream::StreamExt}; +#[derive(Clone)] +enum WsMessage { + VideoPacket { packet: ffmpeg::Packet }, + Json(String), +} + struct AppState { encoder_tx: Arc>>, inputs: Arc>>, - websocket_broadcast_tx: broadcast::Sender, + websocket_broadcast_tx: broadcast::Sender, websocket_count: TokioMutex, } impl AppState { fn new(encoder_tx: mpsc::Sender) -> Self { - let (chat_tx, _chat_rx) = broadcast::channel(10); + let (broadcast_tx, _) = broadcast::channel(10); Self { encoder_tx: Arc::new(TokioMutex::new(encoder_tx)), inputs: Arc::new(TokioMutex::new(Vec::new())), - websocket_broadcast_tx: chat_tx, + websocket_broadcast_tx: broadcast_tx, websocket_count: TokioMutex::const_new(0usize), } } } -#[tokio::main(flavor = "multi_thread", worker_threads = 8)] +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn main() -> anyhow::Result<()> { // Setup a tracing subscriber let subscriber = tracing_subscriber::FmtSubscriber::builder() @@ -75,109 +81,106 @@ async fn main() -> anyhow::Result<()> { let state_clone = state.clone(); // retro event handler. drives the encoder thread too - tokio::spawn(async move { - let surface_clone = surface.clone(); - let frame_clone = frame.clone(); + let _ = std::thread::Builder::new() + .name("retro_event_rx".into()) + .spawn(move || { + let surface_clone = surface.clone(); + let frame_clone = frame.clone(); - // start the thread now that we're alive - let _ = event_in_tx.send(retro_thread::RetroInEvent::Start).await; + // start the libretro thread looping now that we're alive + let _ = event_in_tx.blocking_send(retro_thread::RetroInEvent::Start); - loop { - match event_rx.try_recv() { - Ok(msg) => match msg { - RetroEvent::Frame => { - { - let mut frame_locked = frame.lock().expect( - "Couldn't lock frame on our end. Did the encoder thread panic?", - ); + loop { + match event_rx.try_recv() { + Ok(msg) => match msg { + RetroEvent::Frame => { + { + let mut frame_locked = frame.lock().expect( + "Couldn't lock frame on our end. Did the encoder thread panic?", + ); - let mut_frame = frame_locked.as_mut().expect("it's None? why?"); + let mut_frame = frame_locked.as_mut().expect("it's None? why?"); - let width = mut_frame.width(); - let height = mut_frame.height(); + let width = mut_frame.width(); + let height = mut_frame.height(); - let mut surf = surface_clone.lock().expect( + let mut surf = surface_clone.lock().expect( "locking the VNC surface to paint it to the ffmpeg frame failed", ); - let surf_buf = surf.get_buffer(); + let surf_buf = surf.get_buffer(); - let buf_ptr = - unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; + let buf_ptr = + unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; - for y in 0..height { - let line_stride = (y * width) as usize; - // Make a slice for the line - // SAFETY: The allocation is guaranteed to be large enough - // for this to work from y = 0..height - let dest_line_slice = unsafe { - let dest_line_ptr = buf_ptr.add(line_stride); - std::slice::from_raw_parts_mut(dest_line_ptr, width as usize) - }; + for y in 0..height { + let line_stride = (y * width) as usize; + // Make a slice for the line + // SAFETY: The allocation is guaranteed to be large enough + // for this to work from y = 0..height + let dest_line_slice = unsafe { + let dest_line_ptr = buf_ptr.add(line_stride); + std::slice::from_raw_parts_mut( + dest_line_ptr, + width as usize, + ) + }; - dest_line_slice.copy_from_slice( - &surf_buf[line_stride..line_stride + width as usize], - ); + dest_line_slice.copy_from_slice( + &surf_buf[line_stride..line_stride + width as usize], + ); + } } + + let _ = state_clone + .encoder_tx + .blocking_lock() + .blocking_send(encoder_thread::EncodeThreadInput::SendFrame); } - let _ = state_clone - .encoder_tx - .lock() - .await - .send(encoder_thread::EncodeThreadInput::SendFrame) - .await; - } + RetroEvent::Resize { size } => { + // make a new frame for the encoder + { + let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); - RetroEvent::Resize { size } => { - // make a new frame for the encoder - { - let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); + *lk_frame = Some(ffmpeg::frame::Video::new( + ffmpeg::format::Pixel::BGRA, + size.clone().width, + size.clone().height, + )); + } - *lk_frame = Some(ffmpeg::frame::Video::new( - ffmpeg::format::Pixel::BGRA, - size.clone().width, - size.clone().height, - )); + let _ = state_clone.encoder_tx.blocking_lock().blocking_send( + encoder_thread::EncodeThreadInput::Init { size: size.clone() }, + ); } - let _ = state_clone - .encoder_tx - .lock() - .await - .send(encoder_thread::EncodeThreadInput::Init { size: size.clone() }) - .await; - } + RetroEvent::WantInputs { tx } => { + let inputs = state_clone.inputs.blocking_lock(); + //tracing::info!("giving inputs {:?}", inputs); + tx.send(inputs.clone()).expect("FUCK"); + } + }, - RetroEvent::WantInputs { tx } => { - let inputs = state_clone.inputs.lock().await; - //tracing::info!("giving inputs {:?}", inputs); - tx.send(inputs.clone()).expect("FUCK"); - } - }, + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => {} + } - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => {} + match encoder_rx.try_recv() { + Ok(msg) => match msg { + encoder_thread::EncodeThreadOutput::Frame { packet } => { + let _ = state_clone + .websocket_broadcast_tx + .send(WsMessage::VideoPacket { packet }); + } + }, + Err(TryRecvError::Empty) => {} + _ => break, + } + + std::thread::sleep(Duration::from_millis(1)); } - - match encoder_rx.try_recv() { - Ok(msg) => match msg { - encoder_thread::EncodeThreadOutput::Frame { mut packet } => { - let vec = { - let data = packet.data_mut().expect("packet is empty somehow"); - data.to_vec() - }; - let _ = state_clone - .websocket_broadcast_tx - .send(ws::Message::Binary(vec)); - } - }, - Err(TryRecvError::Empty) => {} - _ => break, - } - - tokio::time::sleep(Duration::from_millis(1)).await; - } - }); + }) + .expect("failed to spawn retro RX thread, it's probably over"); // Axum websocket server let app: Router<()> = Router::new() @@ -254,8 +257,22 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) let mut sub = send_clone.websocket_broadcast_tx.subscribe(); while let Ok(msg) = sub.recv().await { - if sender.send(msg).await.is_err() { - break; + match msg { + WsMessage::VideoPacket { mut packet } => { + // :(. At least this copy doesn't occur on the driver threads anymore.. + let data = packet.data_mut().expect("shouldn't be taken"); + let msg = ws::Message::Binary(data.to_vec()); + if sender.send(msg).await.is_err() { + break; + } + } + + WsMessage::Json(s) => { + let msg = ws::Message::Text(s); + if sender.send(msg).await.is_err() { + break; + } + } } } }); @@ -286,12 +303,9 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) "msg": json["msg"].as_str().unwrap() }); - recv_clone - .websocket_broadcast_tx - .send(ws::Message::Text( - serde_json::to_string(&send).expect("oh well"), - )) - .expect("boom"); + recv_clone.websocket_broadcast_tx.send(WsMessage::Json( + serde_json::to_string(&send).expect("oh well"), + )); continue; } diff --git a/server/src/retro_thread.rs b/server/src/retro_thread.rs index 0496a70..795d814 100644 --- a/server/src/retro_thread.rs +++ b/server/src/retro_thread.rs @@ -441,9 +441,12 @@ pub fn spawn_retro_thread( let (event_in_tx, event_in_rx) = mpsc::channel(8); let fb_clone = surface.clone(); - std::thread::spawn(move || { - retro_thread_main(fb_clone, event_tx, event_in_rx); - }); + // discard the join handle + let _ = std::thread::Builder::new() + .name("retro_game".into()) + .spawn(move || { + retro_thread_main(fb_clone, event_tx, event_in_rx); + }).expect("failed to spawn the game thread"); (event_rx, event_in_tx) }