video: rewrite encoder thread entirely

Instead of using channels we communicate via a condvar/mutex based sytem
This commit is contained in:
Lily Tsuru 2024-12-04 17:46:44 -05:00
parent fac32bd41d
commit ab9df17b6a
5 changed files with 294 additions and 238 deletions

7
server/Cargo.lock generated
View file

@ -925,19 +925,12 @@ dependencies = [
"libc", "libc",
"libloading", "libloading",
"libretro-sys", "libretro-sys",
"rgb565",
"serde", "serde",
"thiserror", "thiserror",
"toml", "toml",
"tracing", "tracing",
] ]
[[package]]
name = "rgb565"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43e85498d0bb728f77a88b4313eaf4ed21673f3f8a05c36e835cf6c9c0d066"
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.24" version = "0.1.24"

View file

@ -13,8 +13,8 @@ use retro_thread::{spawn_retro_thread, RetroEvent};
use transport::websocket::WebsocketTransport; use transport::websocket::WebsocketTransport;
use transport::{Transport, TransportReciever}; use transport::{Transport, TransportReciever};
use video::cuda_gl::safe::GraphicsResource; use video::cuda_gl::safe::GraphicsResource;
use video::encoder_thread;
use video::encoder_thread::EncodeThreadInput; use video::encoder_thread::EncodeThreadInput;
use video::encoder_thread::{self, EncoderThreadControl};
use std::{ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -27,7 +27,7 @@ use tokio::sync::{
}; };
struct AppState<T: Transport> { struct AppState<T: Transport> {
encoder_tx: Arc<TokioMutex<mpsc::Sender<EncodeThreadInput>>>, encoder_control: EncoderThreadControl,
inputs: Arc<TokioMutex<Vec<u32>>>, inputs: Arc<TokioMutex<Vec<u32>>>,
transport: Arc<T>, transport: Arc<T>,
@ -38,9 +38,9 @@ impl<T> AppState<T>
where where
T: Transport + Send + Sync + 'static, T: Transport + Send + Sync + 'static,
{ {
fn new(encoder_tx: mpsc::Sender<EncodeThreadInput>, transport: Arc<T>) -> Self { fn new(encoder_control: EncoderThreadControl, transport: Arc<T>) -> Self {
Self { Self {
encoder_tx: Arc::new(TokioMutex::new(encoder_tx)), encoder_control,
inputs: Arc::new(TokioMutex::new(Vec::new())), inputs: Arc::new(TokioMutex::new(Vec::new())),
transport: transport, transport: transport,
connection_count: TokioMutex::const_new(0usize), connection_count: TokioMutex::const_new(0usize),
@ -54,7 +54,7 @@ where
T: Transport + Send + Sync + 'static, T: Transport + Send + Sync + 'static,
{ {
async fn on_connect(&self, username: &String) -> anyhow::Result<()> { async fn on_connect(&self, username: &String) -> anyhow::Result<()> {
println!("{username} joined!"); tracing::info!("{username} joined!");
{ {
let mut lk = self.connection_count.lock().await; let mut lk = self.connection_count.lock().await;
@ -62,11 +62,10 @@ where
} }
{ {
let locked = self.encoder_tx.lock().await; self.encoder_control
.send_command(EncodeThreadInput::ForceKeyframe);
// Force a ws connection to mean a keyframe //self.encoder_control
let _ = locked.send(EncodeThreadInput::ForceKeyframe).await; // .send_command(EncodeThreadInput::SendFrame);
let _ = locked.send(EncodeThreadInput::SendFrame).await;
} }
Ok(()) Ok(())
@ -156,7 +155,7 @@ where
*lk -= 1; *lk -= 1;
} }
println!("{username} left."); tracing::info!("{username} left!");
Ok(()) Ok(())
} }
} }
@ -177,17 +176,18 @@ async fn main() -> anyhow::Result<()> {
let resource = Arc::new(Mutex::new(GraphicsResource::new(&device))); let resource = Arc::new(Mutex::new(GraphicsResource::new(&device)));
let (mut encoder_rx, encoder_tx) = encoder_thread::encoder_thread_spawn_hwframe( let encoder_control = encoder_thread::encoder_thread_spawn_hwframe(
&device.clone(), &device.clone(),
&resource.clone(), &resource.clone(),
&egl_ctx.clone(), &egl_ctx.clone(),
false,
); );
let transport = Arc::new(WebsocketTransport::new()); let transport = Arc::new(WebsocketTransport::new());
let state = Arc::new(AppState::new(encoder_tx, transport.clone())); let state = Arc::new(AppState::new(encoder_control.clone(), transport.clone()));
let (mut retro_event_rx, retro_input_event_tx) = let (mut retro_event_rx, retro_input_event_tx) =
spawn_retro_thread(egl_ctx.clone(), resource.clone()); spawn_retro_thread(encoder_control.clone(), egl_ctx.clone(), resource.clone());
let state_clone = state.clone(); let state_clone = state.clone();
@ -196,21 +196,24 @@ async fn main() -> anyhow::Result<()> {
.name("retro_event_rx".into()) .name("retro_event_rx".into())
.spawn(move || { .spawn(move || {
// load game // load game
/*
let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadCore( let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadCore(
"cores/swanstation_libretro.so".into(), "cores/swanstation_libretro.so".into(),
)); ));
let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadGame( let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadGame(
"roms/merged/nmv2/jagb/nmv2jagb.cue".into(), "roms/merged/nmv2/jagb/nmv2jagb.cue".into(),
)); ));
*/
/*
let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadCore( let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadCore(
"cores/pcsx2_libretro.so".into(), "cores/pcsx2_libretro.so".into(),
)); ));
let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadGame( let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::LoadGame(
"/data/sda/lily/ISOs/Sony PlayStation 2/ztx-hl.bin".into(), "/data/sda/lily/ISOs/Sony PlayStation 2/ztx-hl.bin".into(),
)); ));
*/
std::thread::sleep(Duration::from_millis(100)); // hack
// start the libretro thread looping now that we're alive // start the libretro thread looping now that we're alive
let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::Start); let _ = retro_input_event_tx.blocking_send(retro_thread::RetroInEvent::Start);
@ -218,19 +221,6 @@ async fn main() -> anyhow::Result<()> {
loop { loop {
match retro_event_rx.blocking_recv() { match retro_event_rx.blocking_recv() {
Some(msg) => match msg { Some(msg) => match msg {
RetroEvent::Frame => {
let _ = state_clone
.encoder_tx
.blocking_lock()
.blocking_send(encoder_thread::EncodeThreadInput::SendFrame);
}
RetroEvent::Resize { size } => {
let _ = state_clone.encoder_tx.blocking_lock().blocking_send(
encoder_thread::EncodeThreadInput::Init { size: size.clone() },
);
}
RetroEvent::WantInputs { tx } => { RetroEvent::WantInputs { tx } => {
let inputs = state_clone.inputs.blocking_lock(); let inputs = state_clone.inputs.blocking_lock();
tx.send(inputs.clone()).expect("FUCK"); tx.send(inputs.clone()).expect("FUCK");
@ -240,26 +230,20 @@ async fn main() -> anyhow::Result<()> {
None => break, None => break,
} }
match encoder_rx.try_recv() { // wait for a packet
Ok(msg) => match msg { {
encoder_thread::EncodeThreadOutput::Frame { packet } => { let mut waited_lk = encoder_control.wait_for_packet(); //Duration::from_millis(2)
// :(
let packet_data = { let packet_data = {
let slice = packet.data().expect( let slice = (&mut *waited_lk)
"should NOT be empty, this invariant is checked beforehand", .data()
); .expect("should NOT be empty, this invariant is checked beforehand");
slice.to_vec() slice.to_vec()
}; };
let _ = state_clone.transport.broadcast_message( let _ = state_clone
transport::TransportMessage::Binary(packet_data), .transport
); .broadcast_message(transport::TransportMessage::Binary(packet_data));
} }
},
Err(TryRecvError::Empty) => {}
_ => break,
}
std::thread::sleep(Duration::from_millis(4));
} }
}) })
.expect("failed to spawn retro RX thread, it's probably over"); .expect("failed to spawn retro RX thread, it's probably over");

View file

@ -17,7 +17,14 @@ use retro_frontend::{
use gpu::egl_helpers::DeviceContext; use gpu::egl_helpers::DeviceContext;
use letsplay_gpu as gpu; use letsplay_gpu as gpu;
use crate::{surface::Surface, types::Size, video::cuda_gl::safe::GraphicsResource}; use crate::{
surface::Surface,
types::Size,
video::{
cuda_gl::safe::GraphicsResource,
encoder_thread::{EncodeThreadInput, EncoderThreadControl},
},
};
/// Called by OpenGL. We use this to dump errors. /// Called by OpenGL. We use this to dump errors.
extern "system" fn opengl_message_callback( extern "system" fn opengl_message_callback(
@ -61,6 +68,8 @@ pub struct RetroState {
cuda_resource: Arc<Mutex<GraphicsResource>>, cuda_resource: Arc<Mutex<GraphicsResource>>,
encoder_control: EncoderThreadControl,
event_tx: mpsc::Sender<RetroEvent>, event_tx: mpsc::Sender<RetroEvent>,
} }
@ -69,6 +78,7 @@ impl RetroState {
device_context: Arc<Mutex<DeviceContext>>, device_context: Arc<Mutex<DeviceContext>>,
resource: Arc<Mutex<GraphicsResource>>, resource: Arc<Mutex<GraphicsResource>>,
event_tx: mpsc::Sender<RetroEvent>, event_tx: mpsc::Sender<RetroEvent>,
encoder_control: EncoderThreadControl,
) -> Box<Self> { ) -> Box<Self> {
let mut boxed = Box::new(Self { let mut boxed = Box::new(Self {
frontend: None, frontend: None,
@ -81,6 +91,8 @@ impl RetroState {
cuda_resource: resource.clone(), cuda_resource: resource.clone(),
encoder_control,
event_tx, event_tx,
}); });
@ -175,6 +187,10 @@ impl RetroState {
locked_egl.release(); locked_egl.release();
} }
// Send frame.
self.encoder_control
.send_command(EncodeThreadInput::SendFrame);
std::thread::sleep(step_duration); std::thread::sleep(step_duration);
} }
@ -237,7 +253,7 @@ impl FrontendInterface for RetroState {
.expect("Failed to register OpenGL texture with CUDA Graphics resource"); .expect("Failed to register OpenGL texture with CUDA Graphics resource");
} }
let _ = self.event_tx.blocking_send(RetroEvent::Resize { self.encoder_control.send_command(EncodeThreadInput::Init {
size: Size { width, height }, size: Size { width, height },
}); });
} }
@ -266,12 +282,9 @@ impl FrontendInterface for RetroState {
gl::BindTexture(gl::TEXTURE_2D, 0); gl::BindTexture(gl::TEXTURE_2D, 0);
} }
let _ = self.event_tx.blocking_send(RetroEvent::Frame);
} }
fn video_update_gl(&mut self) { fn video_update_gl(&mut self) {
let _ = self.event_tx.blocking_send(RetroEvent::Frame);
} }
fn audio_sample(&mut self, _slice: &[i16], _size: usize) {} fn audio_sample(&mut self, _slice: &[i16], _size: usize) {}
@ -466,8 +479,6 @@ impl Drop for RetroState {
} }
pub enum RetroEvent { pub enum RetroEvent {
Frame,
Resize { size: Size },
WantInputs { tx: oneshot::Sender<Vec<u32>> }, WantInputs { tx: oneshot::Sender<Vec<u32>> },
} }
@ -478,12 +489,18 @@ pub enum RetroInEvent {
} }
fn retro_thread_main( fn retro_thread_main(
encoder_control: EncoderThreadControl,
context: &Arc<Mutex<DeviceContext>>, context: &Arc<Mutex<DeviceContext>>,
resource: &Arc<Mutex<GraphicsResource>>, resource: &Arc<Mutex<GraphicsResource>>,
event_tx: mpsc::Sender<RetroEvent>, event_tx: mpsc::Sender<RetroEvent>,
mut event_rx: mpsc::Receiver<RetroInEvent>, mut event_rx: mpsc::Receiver<RetroInEvent>,
) { ) {
let mut app = RetroState::new(context.clone(), resource.clone(), event_tx); let mut app = RetroState::new(
context.clone(),
resource.clone(),
event_tx,
encoder_control.clone(),
);
// do EGL init first // do EGL init first
app.hw_gl_egl_init(); app.hw_gl_egl_init();
@ -514,6 +531,7 @@ fn retro_thread_main(
} }
pub fn spawn_retro_thread( pub fn spawn_retro_thread(
encoder_control: EncoderThreadControl,
context: Arc<Mutex<DeviceContext>>, context: Arc<Mutex<DeviceContext>>,
resource: Arc<Mutex<GraphicsResource>>, resource: Arc<Mutex<GraphicsResource>>,
) -> (mpsc::Receiver<RetroEvent>, mpsc::Sender<RetroInEvent>) { ) -> (mpsc::Receiver<RetroEvent>, mpsc::Sender<RetroInEvent>) {
@ -523,12 +541,13 @@ pub fn spawn_retro_thread(
let cloned = resource.clone(); let cloned = resource.clone();
let ctxcloned = context.clone(); let ctxcloned = context.clone();
let encodercontrol_cloned = encoder_control.clone();
// discard the join handle // discard the join handle
let _ = std::thread::Builder::new() let _ = std::thread::Builder::new()
.name("retro_game".into()) .name("retro_game".into())
.spawn(move || { .spawn(move || {
retro_thread_main(&ctxcloned, &cloned, event_tx, event_in_rx); retro_thread_main(encodercontrol_cloned, &ctxcloned, &cloned, event_tx, event_in_rx);
}) })
.expect("failed to spawn the game thread"); .expect("failed to spawn the game thread");

View file

@ -8,7 +8,7 @@ use cudarc::{
}; };
use letsplay_gpu::egl_helpers::DeviceContext; use letsplay_gpu::egl_helpers::DeviceContext;
use std::{ use std::{
sync::{Arc, Mutex}, sync::{Arc, Condvar, Mutex, MutexGuard},
time::Duration, time::Duration,
}; };
use tokio::sync::mpsc::{self, error::TryRecvError}; use tokio::sync::mpsc::{self, error::TryRecvError};
@ -16,8 +16,11 @@ use tokio::sync::mpsc::{self, error::TryRecvError};
use super::h264_encoder::H264Encoder; use super::h264_encoder::H264Encoder;
use super::{cuda_gl::safe::GraphicsResource, ffmpeg}; use super::{cuda_gl::safe::GraphicsResource, ffmpeg};
#[derive(Debug)]
pub enum EncodeThreadInput { pub enum EncodeThreadInput {
Init { size: crate::types::Size }, Init { size: crate::types::Size },
Shutdown,
ForceKeyframe, ForceKeyframe,
SendFrame, SendFrame,
} }
@ -27,73 +30,6 @@ pub enum EncodeThreadOutput {
Frame { packet: ffmpeg::Packet }, Frame { packet: ffmpeg::Packet },
} }
struct EncoderState {
encoder: Option<H264Encoder>,
frame: Arc<Mutex<Option<ffmpeg::frame::Video>>>,
packet: ffmpeg::Packet,
}
impl EncoderState {
fn new(frame: Arc<Mutex<Option<ffmpeg::frame::Video>>>) -> Self {
Self {
encoder: None,
frame: frame,
packet: ffmpeg::Packet::empty(),
}
}
fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> {
self.encoder = Some(H264Encoder::new_nvenc_swframe(
size.clone(),
60,
2 * (1024 * 1024),
)?);
// replace packet
self.packet = ffmpeg::Packet::empty();
Ok(())
}
//fn frame(&mut self) -> Arc<Mutex<Option<ffmpeg::frame::Video>>> {
// self.frame.clone()
//}
fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option<ffmpeg::Packet> {
let mut lk = self.frame.lock().expect("fuck");
let frame = lk.as_mut().unwrap();
let encoder = self.encoder.as_mut().unwrap();
// set frame metadata
unsafe {
if force_keyframe {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I;
(*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY;
(*frame.as_mut_ptr()).key_frame = 1;
} else {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE;
(*frame.as_mut_ptr()).flags = 0i32;
(*frame.as_mut_ptr()).key_frame = 0;
}
(*frame.as_mut_ptr()).pts = pts as i64;
}
encoder.send_frame(&*frame);
encoder
.receive_packet(&mut self.packet)
.expect("Failed to recieve packet");
unsafe {
if !self.packet.is_empty() {
return Some(self.packet.clone());
}
}
return None;
}
}
struct EncoderStateHW { struct EncoderStateHW {
encoder: Option<H264Encoder>, encoder: Option<H264Encoder>,
frame: ffmpeg::frame::Video, frame: ffmpeg::frame::Video,
@ -163,80 +99,6 @@ impl EncoderStateHW {
} }
} }
fn encoder_thread_swframe_main(
mut rx: mpsc::Receiver<EncodeThreadInput>,
tx: mpsc::Sender<EncodeThreadOutput>,
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> anyhow::Result<()> {
// FIXME: for HW frame support
//let dev = cudarc::driver::CudaDevice::new(0)?;
let mut frame_number = 0u64;
let mut force_keyframe = false;
let mut encoder = EncoderState::new(frame.clone());
loop {
match rx.try_recv() {
Ok(msg) => match msg {
EncodeThreadInput::Init { size } => {
frame_number = 0;
if force_keyframe {
force_keyframe = false;
}
encoder.init(size).expect("encoder init failed");
}
EncodeThreadInput::ForceKeyframe => {
force_keyframe = true;
}
EncodeThreadInput::SendFrame => {
if let Some(pkt) = encoder.send_frame(frame_number as u64, force_keyframe) {
// A bit less clear than ::empty(), but it's "Safe"
if let Some(_) = pkt.data() {
let _ = tx.blocking_send(EncodeThreadOutput::Frame {
packet: pkt.clone(),
});
}
frame_number += 1;
}
if force_keyframe {
force_keyframe = false;
}
}
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(1));
}
}
}
Ok(())
}
pub fn encoder_thread_spawn_swframe(
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> (
mpsc::Receiver<EncodeThreadOutput>,
mpsc::Sender<EncodeThreadInput>,
) {
let (in_tx, in_rx) = mpsc::channel(1);
let (out_tx, out_rx) = mpsc::channel(1);
let clone = Arc::clone(frame);
std::thread::spawn(move || encoder_thread_swframe_main(in_rx, out_tx, &clone));
(out_rx, in_tx)
}
/// Source for the kernel used to flip OpenGL framebuffers right-side up. /// Source for the kernel used to flip OpenGL framebuffers right-side up.
const OPENGL_FLIP_KERNEL_SRC: &str = " const OPENGL_FLIP_KERNEL_SRC: &str = "
extern \"C\" __global__ void flip_opengl( extern \"C\" __global__ void flip_opengl(
@ -254,9 +116,46 @@ extern \"C\" __global__ void flip_opengl(
} }
}"; }";
/// Version which also replaces channel order
const OPENGL_FLIP_KERNEL_BGRA_SRC: &str = "
extern \"C\" __global__ void flip_opengl(
const unsigned* pSrc,
unsigned* pDest,
int width,
int height
) {
const unsigned x = blockIdx.x * blockDim.x + threadIdx.x;
const unsigned y = blockIdx.y * blockDim.y + threadIdx.y;
// byte pointers for channel swap
const unsigned char* pSrcData = (const unsigned char*)pSrc;
unsigned char* pBufferData = (unsigned char*)pDest;
if (x < width && y < height) {
//unsigned reversed_y = (height - 1) - y;
// aaa
unsigned long long srcStart = (y * width);
unsigned long long dstStart = (y * width);
// swap. probably should do this in a way that does more than 1 pixel at a time but oh well
pBufferData[((dstStart + x) * 4) + 0] = pSrcData[((srcStart + x) * 4) + 2]; // B
pBufferData[((dstStart + x) * 4) + 1] = pSrcData[((srcStart + x) * 4) + 1]; // G
pBufferData[((dstStart + x) * 4) + 2] = pSrcData[((srcStart + x) * 4) + 0]; // R
pBufferData[((dstStart + x) * 4) + 3] = 0xff; // A
}
}";
fn encoder_thread_hwframe_main( fn encoder_thread_hwframe_main(
mut rx: mpsc::Receiver<EncodeThreadInput>, input_msg_notify: Arc<Condvar>,
tx: mpsc::Sender<EncodeThreadOutput>, input_msg: Arc<Mutex<EncodeThreadInput>>,
processed_notify: Arc<Condvar>,
processed: Arc<Mutex<bool>>,
packet_update: Arc<Condvar>,
packet: Arc<Mutex<ffmpeg::Packet>>,
rgba_to_bgra_kernel: bool,
cuda_device: &Arc<CudaDevice>, cuda_device: &Arc<CudaDevice>,
cuda_resource: &Arc<Mutex<GraphicsResource>>, cuda_resource: &Arc<Mutex<GraphicsResource>>,
@ -270,9 +169,13 @@ fn encoder_thread_hwframe_main(
// :) // :)
cuda_device.bind_to_thread()?; cuda_device.bind_to_thread()?;
// Compile the support kernel // Compile the given support kernel.
let ptx = cudarc::nvrtc::compile_ptx_with_opts( let ptx = cudarc::nvrtc::compile_ptx_with_opts(
&OPENGL_FLIP_KERNEL_SRC, if rgba_to_bgra_kernel {
&OPENGL_FLIP_KERNEL_BGRA_SRC
} else {
&OPENGL_FLIP_KERNEL_SRC
},
CompileOptions { CompileOptions {
//options: vec!["--gpu-architecture=compute_50".into()], //options: vec!["--gpu-architecture=compute_50".into()],
..Default::default() ..Default::default()
@ -302,24 +205,58 @@ fn encoder_thread_hwframe_main(
// allocated. // allocated.
let mut temp_buffer: CudaSlice<u32> = cuda_device.alloc_zeros::<u32>(48).expect("over"); let mut temp_buffer: CudaSlice<u32> = cuda_device.alloc_zeros::<u32>(48).expect("over");
let mut last_size: Option<crate::types::Size> = None;
loop { loop {
match rx.blocking_recv() { // wait for a message
Some(msg) => match msg { {
let lk = input_msg.lock().expect("FUCK");
let waited_lk = input_msg_notify.wait(lk).expect("you bone");
//println!("got here with {:?}", &*waited_lk);
match &*waited_lk {
EncodeThreadInput::Init { size } => { EncodeThreadInput::Init { size } => {
frame_number = 0; frame_number = 0;
force_keyframe = true;
if force_keyframe { // Allocate the flip buffer
force_keyframe = false;
}
temp_buffer = cuda_device temp_buffer = cuda_device
.alloc_zeros::<u32>((size.width * size.height) as usize) .alloc_zeros::<u32>((size.width * size.height) as usize)
.expect("oh youre fucked anyways"); .expect("oh youre fucked anyways");
encoder encoder
.init(cuda_device, size) .init(cuda_device, size.clone())
.expect("encoder init failed"); .expect("encoder init failed");
tracing::info!("Encoder initalized for {}x{}", size.width, size.height);
/*
if last_size.is_some() {
let lss = last_size.as_ref().unwrap();
if size.width != lss.width && size.height != lss.height {
last_size = Some(size.clone());
} }
} else {
// Allocate the flip buffer
temp_buffer = cuda_device
.alloc_zeros::<u32>((size.width * size.height) as usize)
.expect("oh youre fucked anyways");
encoder
.init(cuda_device, size.clone())
.expect("encoder init failed");
tracing::info!("Encoder initalized for {}x{}", size.width, size.height);
last_size = Some(size.clone());
}
*/
}
// Simply shutdown.
EncodeThreadInput::Shutdown => break,
EncodeThreadInput::ForceKeyframe => { EncodeThreadInput::ForceKeyframe => {
force_keyframe = true; force_keyframe = true;
@ -356,7 +293,7 @@ fn encoder_thread_hwframe_main(
unsafe { unsafe {
let frame_ptr = frame.as_mut_ptr(); let frame_ptr = frame.as_mut_ptr();
memcpy.dstDevice = temp_buffer.device_ptr().clone(); memcpy.dstDevice = *temp_buffer.device_ptr();
memcpy.dstPitch = (*frame_ptr).linesize[0] as usize; memcpy.dstPitch = (*frame_ptr).linesize[0] as usize;
memcpy.WidthInBytes = ((*frame_ptr).width * 4) as usize; memcpy.WidthInBytes = ((*frame_ptr).width * 4) as usize;
memcpy.Height = (*frame_ptr).height as usize; memcpy.Height = (*frame_ptr).height as usize;
@ -420,15 +357,19 @@ fn encoder_thread_hwframe_main(
gl_ctx.release(); gl_ctx.release();
} }
if let Some(pkt) = encoder.send_frame(frame_number as u64, force_keyframe) { frame_number += 1;
if let Some(mut pkt) = encoder.send_frame(frame_number as u64, force_keyframe) {
// A bit less clear than ::empty(), but it's "Safe" // A bit less clear than ::empty(), but it's "Safe"
if let Some(_) = pkt.data() { if let Some(_) = pkt.data() {
let _ = tx.blocking_send(EncodeThreadOutput::Frame { {
packet: pkt.clone(), // Swap the packet and notify a waiting thread that we produced a packet
}); let mut locked_packet =
packet.lock().expect("failed to lock packet");
std::mem::swap(&mut *locked_packet, &mut pkt);
packet_update.notify_one();
}
} }
frame_number += 1;
} }
if force_keyframe { if force_keyframe {
@ -437,35 +378,154 @@ fn encoder_thread_hwframe_main(
//tracing::info!("encoding frame {frame_number} took {:2?}", start.elapsed()); //tracing::info!("encoding frame {frame_number} took {:2?}", start.elapsed());
} }
}, }
None => break, // processed message
{
//println!("gggggg?");
{
let mut process_lock = processed.lock().expect("gggg");
*process_lock = true;
processed_notify.notify_one();
}
}
} }
} }
//std::thread::sleep(Duration::from_millis(1)); {
let mut process_lock = processed.lock().expect("gggg");
*process_lock = true;
processed_notify.notify_one();
}
Ok(()) Ok(())
} }
#[derive(Clone)]
pub struct EncoderThreadControl {
// input
/// NOTE: Only signal. Do not wait
input_updated_cv: Arc<Condvar>,
input: Arc<Mutex<EncodeThreadInput>>,
processed: Arc<Mutex<bool>>,
processed_cv: Arc<Condvar>,
/// Only wait: do not signal, or I will be a very sad foxgirl.
packet_updated_cv: Arc<Condvar>,
packet: Arc<Mutex<ffmpeg::Packet>>,
}
impl EncoderThreadControl {
pub fn send_command(&self, cmd: EncodeThreadInput) {
{
let mut lk = self.input.lock().expect("failed to lock input");
//println!("Sent {:?}", cmd);
*lk = cmd;
self.input_updated_cv.notify_one();
}
// Wait for the encoder thread to notify completion.
{
let mut lklk = self.processed.lock().expect("failed to lock processed flag");
*lklk = false;
while *lklk == false {
lklk = self.processed_cv.wait(lklk).expect("failed to wait for encoder thread to signal completion");
}
}
}
/// Shorthand to shutdown the encoder
pub fn shutdown(&self) {
self.send_command(EncodeThreadInput::Shutdown);
}
pub fn wait_for_packet(&self) -> MutexGuard<'_, ffmpeg::Packet> {
let mut lk = self.packet.lock().expect("failed to lock packet");
let mut waited_lk = self
.packet_updated_cv
.wait(lk)
.expect("failed to wait for encoder thread to update packet");
waited_lk
}
pub fn wait_for_packet_timeout(
&self,
timeout: Duration,
) -> Option<MutexGuard<'_, ffmpeg::Packet>> {
let mut lk = self.packet.lock().expect("failed to lock packet");
let mut wait_result = self
.packet_updated_cv
.wait_timeout(lk, timeout)
.expect("failed to wait");
if wait_result.1.timed_out() {
None
} else {
Some(wait_result.0)
}
}
}
pub fn encoder_thread_spawn_hwframe( pub fn encoder_thread_spawn_hwframe(
cuda_device: &Arc<CudaDevice>, cuda_device: &Arc<CudaDevice>,
cuda_resource: &Arc<Mutex<GraphicsResource>>, cuda_resource: &Arc<Mutex<GraphicsResource>>,
gl_context: &Arc<Mutex<DeviceContext>>, gl_context: &Arc<Mutex<DeviceContext>>,
) -> (
mpsc::Receiver<EncodeThreadOutput>, rgba_to_bgra_kernel: bool,
mpsc::Sender<EncodeThreadInput>, ) -> EncoderThreadControl {
) { let processed = Arc::new(Mutex::new(false));
let (in_tx, in_rx) = mpsc::channel(1); let processed_cv = Arc::new(Condvar::new());
let (out_tx, out_rx) = mpsc::channel(1);
let input_updated = Arc::new(Condvar::new());
let input = Arc::new(Mutex::new(EncodeThreadInput::ForceKeyframe)); // something dummy
// packet
let pkt_update_cv = Arc::new(Condvar::new());
let pkt = Arc::new(Mutex::new(ffmpeg::Packet::empty()));
// clones for the thread
let processed_clone = processed.clone();
let processed_cv_clone = processed_cv.clone();
let input_updated_clone = input_updated.clone();
let input_clone = input.clone();
let pkt_update_cv_clone = pkt_update_cv.clone();
let pkt_clone = pkt.clone();
let dev_clone = Arc::clone(cuda_device); let dev_clone = Arc::clone(cuda_device);
let rsrc_clone = Arc::clone(cuda_resource); let rsrc_clone = Arc::clone(cuda_resource);
let gl_clone = Arc::clone(gl_context); let gl_clone = Arc::clone(gl_context);
std::thread::spawn(move || { std::thread::spawn(move || {
encoder_thread_hwframe_main(in_rx, out_tx, &dev_clone, &rsrc_clone, &gl_clone) match encoder_thread_hwframe_main(
input_updated_clone,
input_clone,
processed_cv_clone,
processed_clone,
pkt_update_cv_clone,
pkt_clone,
rgba_to_bgra_kernel,
&dev_clone,
&rsrc_clone,
&gl_clone,
) {
Ok(_) => {}
Err(err) => {
tracing::error!("encoder thread error: {err}");
}
}
}); });
(out_rx, in_tx) EncoderThreadControl {
input_updated_cv: input_updated.clone(),
input: input.clone(),
processed: processed.clone(),
processed_cv: processed_cv.clone(),
packet_updated_cv: pkt_update_cv.clone(),
packet: pkt.clone(),
}
} }

View file

@ -192,8 +192,8 @@ impl H264Encoder {
video_encoder_context.set_format(ffmpeg::format::Pixel::CUDA); video_encoder_context.set_format(ffmpeg::format::Pixel::CUDA);
video_encoder_context.set_qmin(35); video_encoder_context.set_qmin(33);
video_encoder_context.set_qmax(38); video_encoder_context.set_qmax(35);
unsafe { unsafe {
// FIXME: this currently breaks the avbufferref system a bit // FIXME: this currently breaks the avbufferref system a bit
@ -209,7 +209,7 @@ impl H264Encoder {
dict.set("tune", "ull"); dict.set("tune", "ull");
dict.set("preset", "p1"); dict.set("preset", "p1");
dict.set("profile", "main"); dict.set("profile", "baseline");
// TODO: // TODO:
dict.set("rc", "vbr"); dict.set("rc", "vbr");