From d54f65af8b6d9e7e038ad05bf9091ca7564a2bd1 Mon Sep 17 00:00:00 2001 From: modeco80 Date: Thu, 10 Oct 2024 22:06:17 -0400 Subject: [PATCH] working libretro sex --- server/src/main.rs | 164 +++++++---------------------- server/src/retro_thread.rs | 25 ++--- server/src/video/encoder_thread.rs | 162 ++++++++++++++++++++++++++++ server/src/video/h264_encoder.rs | 7 +- server/src/video/mod.rs | 2 + 5 files changed, 217 insertions(+), 143 deletions(-) create mode 100644 server/src/video/encoder_thread.rs diff --git a/server/src/main.rs b/server/src/main.rs index a4a321f..b3d21ac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -3,9 +3,10 @@ mod surface; mod types; mod video; -use retro_thread::{spawn_retro_thread, App, RetroEvent}; -use video::ffmpeg; +use retro_thread::{spawn_retro_thread, RetroState, RetroEvent}; +use video::encoder_thread::EncodeThreadInput; use video::h264_encoder::H264Encoder; +use video::{encoder_thread, ffmpeg}; use std::{ sync::{Arc, Mutex}, @@ -35,7 +36,7 @@ use axum::{ use futures::{sink::SinkExt, stream::StreamExt}; struct AppState { - encoder_tx: Arc>>, + encoder_tx: Arc>>, inputs: Arc>>, websocket_broadcast_tx: broadcast::Sender, @@ -43,7 +44,7 @@ struct AppState { } impl AppState { - fn new(encoder_tx: mpsc::Sender<()>) -> Self { + fn new(encoder_tx: mpsc::Sender) -> Self { let (chat_tx, _chat_rx) = broadcast::channel(10); Self { encoder_tx: Arc::new(TokioMutex::new(encoder_tx)), @@ -54,79 +55,6 @@ impl AppState { } } -struct EncoderState { - encoder: Option, - frame: Option, - packet: ffmpeg::Packet, -} - -impl EncoderState { - fn new() -> Self { - Self { - encoder: None, - frame: None, - packet: ffmpeg::Packet::empty(), - } - } - - fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> { - self.encoder = Some(H264Encoder::new_nvenc_swframe( - size.clone(), - 60, - 3 * (1024 * 1024), - )?); - - // FIXME: use create_frame() on H264Encoder - self.frame = Some(ffmpeg::frame::Video::new( - ffmpeg::format::Pixel::RGBA, - size.width, - size.height, - )); - - // replace packet - self.packet = ffmpeg::Packet::empty(); - - Ok(()) - } - - fn frame(&mut self) -> &mut ffmpeg::frame::Video { - self.frame.as_mut().unwrap() - } - - fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option { - let frame = self.frame.as_mut().unwrap(); - let encoder = self.encoder.as_mut().unwrap(); - - // set frame metadata - unsafe { - if force_keyframe { - (*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I; - (*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY; - (*frame.as_mut_ptr()).key_frame = 1; - } else { - (*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE; - (*frame.as_mut_ptr()).flags = 0i32; - (*frame.as_mut_ptr()).key_frame = 0; - } - - (*frame.as_mut_ptr()).pts = pts as i64; - } - - encoder.send_frame(&*frame); - encoder - .receive_packet(&mut self.packet) - .expect("Failed to recieve packet"); - - unsafe { - if !self.packet.is_empty() { - return Some(self.packet.clone()); - } - } - - return None; - } -} - #[tokio::main(flavor = "multi_thread", worker_threads = 8)] async fn main() -> anyhow::Result<()> { // Setup a tracing subscriber @@ -139,46 +67,32 @@ async fn main() -> anyhow::Result<()> { let surface = Arc::new(Mutex::new(surface::Surface::new())); // H.264 encoder related - let encoder_state = Arc::new(TokioMutex::new(EncoderState::new())); - let (encoder_tx, mut encoder_rx) = mpsc::channel(8); + let frame: Arc>> = Arc::new(Mutex::new(None)); + let (mut encoder_rx, encoder_tx) = encoder_thread::encoder_thread_spawn(&frame); let state = Arc::new(AppState::new(encoder_tx)); let (mut event_rx, event_in_tx) = spawn_retro_thread(surface.clone()); let state_clone = state.clone(); - let encoder_state_clone = encoder_state.clone(); let vnc_recv_handle = tokio::spawn(async move { let surface_clone = surface.clone(); - - // first frame is always a key frame - let mut pts = 0u64; - let mut force_keyframe = true; - let mut frame_update = false; + let frame_clone = frame.clone(); // start the thread now that we're alive let _ = event_in_tx.send(retro_thread::RetroInEvent::Start).await; loop { - match encoder_rx.try_recv() { - Ok(()) => { - // force keyframe - force_keyframe = true; - frame_update = true; - } - - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => {} - } - match event_rx.try_recv() { Ok(msg) => match msg { RetroEvent::Frame => { { - let mut state_locked = encoder_state_clone.lock().await; + let mut frame_locked = frame.lock().expect( + "Couldn't lock frame on our end. Did the encoder thread panic?", + ); - let mut_frame = state_locked.frame(); + let mut_frame = frame_locked.as_mut().expect("it's None? why?"); let width = mut_frame.width(); let height = mut_frame.height(); @@ -207,19 +121,32 @@ async fn main() -> anyhow::Result<()> { } } - frame_update = true; + state_clone + .encoder_tx + .lock() + .await + .send(encoder_thread::EncodeThreadInput::SendFrame) + .await; } RetroEvent::Resize { size } => { + // make a new frame for the encoder { - let mut state_locked = encoder_state_clone.lock().await; - state_locked.init(size).expect("fuck you"); + let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); - // reset our internal state - pts = 0; - force_keyframe = true; - frame_update = false; + *lk_frame = Some(ffmpeg::frame::Video::new( + ffmpeg::format::Pixel::BGRA, + size.clone().width, + size.clone().height, + )); } + + state_clone + .encoder_tx + .lock() + .await + .send(encoder_thread::EncodeThreadInput::Init { size: size.clone() }) + .await; } RetroEvent::WantInputs { tx } => { @@ -233,32 +160,20 @@ async fn main() -> anyhow::Result<()> { Err(TryRecvError::Empty) => {} } - // send frame if we should. - if frame_update { - let mut state_locked = encoder_state_clone.lock().await; - - match state_locked.send_frame(pts, force_keyframe) { - Some(mut packet) => { + match encoder_rx.try_recv() { + Ok(msg) => match msg { + encoder_thread::EncodeThreadOutput::Frame { mut packet } => { let vec = { let data = packet.data_mut().expect("packet is empty somehow"); data.to_vec() }; - let _ = state_clone .websocket_broadcast_tx .send(ws::Message::Binary(vec)); - - pts += 1; - - if force_keyframe { - force_keyframe = false; - } } - - None => {} - } - - frame_update = false; + }, + Err(TryRecvError::Empty) => {} + _ => break, } tokio::time::sleep(Duration::from_millis(1)).await; @@ -325,7 +240,8 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) let locked = state.encoder_tx.lock().await; // Force a ws connection to mean a keyframe - let _ = locked.send(()).await; + let _ = locked.send(EncodeThreadInput::ForceKeyframe).await; + let _ = locked.send(EncodeThreadInput::SendFrame).await; } // random username diff --git a/server/src/retro_thread.rs b/server/src/retro_thread.rs index 3a56e5a..a81daec 100644 --- a/server/src/retro_thread.rs +++ b/server/src/retro_thread.rs @@ -46,7 +46,7 @@ extern "system" fn opengl_message_callback( } } -pub struct App { +pub struct RetroState { frontend: Option>, pad: RetroPad, @@ -54,9 +54,10 @@ pub struct App { // EGL state egl_context: Option, + /// Locked framebuffer. framebuffer: Arc>, - // OpenGL object IDs + /// OpenGL FBO gl_framebuffer: gpu::GlFramebuffer, /// Cached readback buffer. @@ -65,7 +66,7 @@ pub struct App { event_tx: mpsc::Sender, } -impl App { +impl RetroState { pub fn new(framebuffer: Arc>, event_tx: mpsc::Sender) -> Box { let mut boxed = Box::new(Self { frontend: None, @@ -150,7 +151,6 @@ impl App { let step_duration = Duration::from_millis(step_ms as u64); self.get_frontend().run_frame(); - let _ = self.event_tx.blocking_send(RetroEvent::Frame); std::thread::sleep(step_duration); } @@ -188,15 +188,6 @@ impl App { dest_slice.copy_from_slice(scanlines[y as usize]); - // swap the scanline pixels to BGRA order to make minifb happy - // not the fastest code but this should do for an example - //for pix in dest_slice { - // let a = (*pix & 0xff000000) >> 24; - // let b = (*pix & 0x00ff0000) >> 16; - // let g = (*pix & 0x0000ff00) >> 8; - // let r = *pix & 0x000000ff; - // *pix = a << 24 | r << 16 | g << 8 | b; - //} } } else { for y in 0..size.height { @@ -219,7 +210,7 @@ impl App { } } -impl FrontendInterface for App { +impl FrontendInterface for RetroState { fn video_resize(&mut self, width: u32, height: u32) { tracing::info!("Resized to {width}x{height}"); @@ -246,6 +237,7 @@ impl FrontendInterface for App { fn video_update(&mut self, slice: &[u32], pitch: u32) { Self::update_impl(self.framebuffer.clone(), slice, pitch, false); + let _ = self.event_tx.blocking_send(RetroEvent::Frame); } fn video_update_gl(&mut self) { @@ -276,6 +268,7 @@ impl FrontendInterface for App { }; Self::update_impl(self.framebuffer.clone(), slice, dimensions.0, true); + let _ = self.event_tx.blocking_send(RetroEvent::Frame); } fn audio_sample(&mut self, _slice: &[i16], _size: usize) {} @@ -404,7 +397,7 @@ impl FrontendInterface for App { } } -impl Drop for App { +impl Drop for RetroState { fn drop(&mut self) { // Terminate EGL and GL resources if need be self.hw_gl_destroy(); @@ -426,7 +419,7 @@ fn retro_thread_main( event_tx: mpsc::Sender, mut event_rx: mpsc::Receiver, ) { - let mut app = App::new(surface, event_tx); + let mut app = RetroState::new(surface, event_tx); app.load_core("cores/swanstation_libretro.so") .expect("failed to load core"); diff --git a/server/src/video/encoder_thread.rs b/server/src/video/encoder_thread.rs new file mode 100644 index 0000000..369ee03 --- /dev/null +++ b/server/src/video/encoder_thread.rs @@ -0,0 +1,162 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::mpsc::{self, error::TryRecvError}; + +use super::ffmpeg; +use super::h264_encoder::H264Encoder; + +use super::hwframe::HwFrameContext; + +pub enum EncodeThreadInput { + Init { size: crate::types::Size }, + ForceKeyframe, + SendFrame, +} + +#[derive(Clone)] +pub enum EncodeThreadOutput { + Frame { packet: ffmpeg::Packet }, +} + +struct EncoderState { + encoder: Option, + frame: Arc>>, + packet: ffmpeg::Packet, +} + +impl EncoderState { + fn new(frame: Arc>>) -> Self { + Self { + encoder: None, + frame: frame, + packet: ffmpeg::Packet::empty(), + } + } + + fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> { + self.encoder = Some(H264Encoder::new_nvenc_swframe( + size.clone(), + 60, + 3 * (1024 * 1024), + )?); + + // replace packet + self.packet = ffmpeg::Packet::empty(); + + Ok(()) + } + + fn frame(&mut self) -> Arc>> { + self.frame.clone() + } + + fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option { + let mut lk = self.frame.lock().expect("fuck"); + let frame = lk.as_mut().unwrap(); + let encoder = self.encoder.as_mut().unwrap(); + + // set frame metadata + unsafe { + if force_keyframe { + (*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I; + (*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY; + (*frame.as_mut_ptr()).key_frame = 1; + } else { + (*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE; + (*frame.as_mut_ptr()).flags = 0i32; + (*frame.as_mut_ptr()).key_frame = 0; + } + + (*frame.as_mut_ptr()).pts = pts as i64; + } + + encoder.send_frame(&*frame); + encoder + .receive_packet(&mut self.packet) + .expect("Failed to recieve packet"); + + unsafe { + if !self.packet.is_empty() { + return Some(self.packet.clone()); + } + } + + return None; + } +} + +fn encoder_thread_main( + mut rx: mpsc::Receiver, + tx: mpsc::Sender, + frame: &Arc>>, +) -> anyhow::Result<()> { + // FIXME: for HW frame support + //let dev = cudarc::driver::CudaDevice::new(0)?; + + let mut frame_number = 0u64; + let mut force_keyframe = false; + + let mut encoder = EncoderState::new(frame.clone()); + + loop { + match rx.try_recv() { + Ok(msg) => match msg { + EncodeThreadInput::Init { size } => { + frame_number = 0; + + if force_keyframe { + force_keyframe = false; + } + + encoder.init(size).expect("encoder init failed"); + } + + EncodeThreadInput::ForceKeyframe => { + force_keyframe = true; + } + + EncodeThreadInput::SendFrame => { + if let Some(pkt) = encoder.send_frame(frame_number as u64, force_keyframe) { + // A bit less clear than ::empty(), but it's "Safe" + if let Some(_) = pkt.data() { + let _ = tx.blocking_send(EncodeThreadOutput::Frame { + packet: pkt.clone(), + }); + } + + frame_number += 1; + } + + if force_keyframe { + force_keyframe = false; + } + } + }, + + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => { + std::thread::sleep(Duration::from_millis(1)); + } + } + } + + Ok(()) +} + +pub fn encoder_thread_spawn( + frame: &Arc>>, +) -> ( + mpsc::Receiver, + mpsc::Sender, +) { + let (in_tx, in_rx) = mpsc::channel(1); + let (out_tx, out_rx) = mpsc::channel(1); + + let clone = Arc::clone(frame); + + std::thread::spawn(move || encoder_thread_main(in_rx, out_tx, &clone)); + + (out_rx, in_tx) +} diff --git a/server/src/video/h264_encoder.rs b/server/src/video/h264_encoder.rs index 9316f8d..6301416 100644 --- a/server/src/video/h264_encoder.rs +++ b/server/src/video/h264_encoder.rs @@ -47,7 +47,8 @@ fn create_context_and_set_common_parameters( video_encoder_context.set_format(ffmpeg::format::Pixel::YUV420P); // The GOP here is setup to balance keyframe retransmission with bandwidth. - video_encoder_context.set_gop((max_framerate * 4) as u32); + //video_encoder_context.set_gop((max_framerate * 4) as u32); + video_encoder_context.set_gop(i32::MAX as u32); video_encoder_context.set_max_b_frames(0); unsafe { @@ -134,7 +135,7 @@ impl H264Encoder { video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32); - video_encoder_context.set_qmin(35); + video_encoder_context.set_qmin(40); video_encoder_context.set_qmax(38); // set h264_nvenc options @@ -147,7 +148,7 @@ impl H264Encoder { // TODO: dict.set("rc", "vbr"); - //dict.set("qp", "45"); + dict.set("qp", "45"); dict.set("forced-idr", "1"); diff --git a/server/src/video/mod.rs b/server/src/video/mod.rs index 7f82e4a..1af486d 100644 --- a/server/src/video/mod.rs +++ b/server/src/video/mod.rs @@ -7,6 +7,8 @@ pub use ffmpeg as ffmpeg; pub mod hwdevice; pub mod hwframe; +pub mod encoder_thread; + // from hgaiser/moonshine pub fn check_ret(error_code: i32) -> Result<(), ffmpeg::Error> { if error_code != 0 {