server/video: remove the encoder thread

This commit is contained in:
Lily Tsuru 2024-10-10 03:27:16 -04:00
parent cf7c2d09d3
commit a4d791f1df
4 changed files with 168 additions and 259 deletions

View file

@ -3,8 +3,8 @@ mod types;
mod video; mod video;
mod vnc_engine; mod vnc_engine;
use video::encoder_thread;
use video::ffmpeg; use video::ffmpeg;
use video::h264_encoder::H264Encoder;
use std::{ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -38,7 +38,7 @@ struct AppState {
/// Channel for sending things to the VNC engine /// Channel for sending things to the VNC engine
/// should later be used for control /// should later be used for control
engine_tx: mpsc::Sender<vnc_engine::VncMessageInput>, engine_tx: mpsc::Sender<vnc_engine::VncMessageInput>,
encoder_tx: Arc<TokioMutex<mpsc::Sender<encoder_thread::EncodeThreadInput>>>, encoder_tx: Arc<TokioMutex<mpsc::Sender<()>>>,
websocket_broadcast_tx: broadcast::Sender<ws::Message>, websocket_broadcast_tx: broadcast::Sender<ws::Message>,
websocket_count: TokioMutex<usize>, websocket_count: TokioMutex<usize>,
@ -47,7 +47,7 @@ struct AppState {
impl AppState { impl AppState {
fn new( fn new(
engine_tx: mpsc::Sender<vnc_engine::VncMessageInput>, engine_tx: mpsc::Sender<vnc_engine::VncMessageInput>,
encoder_tx: mpsc::Sender<encoder_thread::EncodeThreadInput>, encoder_tx: mpsc::Sender<()>,
) -> Self { ) -> Self {
let (chat_tx, _chat_rx) = broadcast::channel(10); let (chat_tx, _chat_rx) = broadcast::channel(10);
Self { Self {
@ -59,6 +59,77 @@ impl AppState {
} }
} }
struct EncoderState {
encoder: Option<H264Encoder>,
frame: Option<ffmpeg::frame::Video>,
packet: ffmpeg::Packet,
}
impl EncoderState {
fn new() -> Self {
Self {
encoder: None,
frame: None,
packet: ffmpeg::Packet::empty(),
}
}
fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> {
self.encoder = Some(H264Encoder::new_nvenc_swframe(
size.clone(),
60,
3 * (1024 * 1024),
)?);
// FIXME: use create_frame() on H264Encoder
self.frame = Some(ffmpeg::frame::Video::new(
ffmpeg::format::Pixel::BGRA,
size.width,
size.height,
));
// replace packet
self.packet = ffmpeg::Packet::empty();
Ok(())
}
fn frame(&mut self) -> &mut ffmpeg::frame::Video {
self.frame.as_mut().unwrap()
}
fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option<ffmpeg::Packet> {
let frame = self.frame.as_mut().unwrap();
let encoder = self.encoder.as_mut().unwrap();
// set frame metadata
unsafe {
if force_keyframe {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I;
(*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY;
(*frame.as_mut_ptr()).key_frame = 1;
} else {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE;
(*frame.as_mut_ptr()).flags = 0i32;
(*frame.as_mut_ptr()).key_frame = 0;
}
(*frame.as_mut_ptr()).pts = pts as i64;
}
encoder.send_frame(&*frame);
encoder.receive_packet(&mut self.packet).expect("Failed to recieve packet");
unsafe {
if !self.packet.is_empty() {
return Some(self.packet.clone());
}
}
return None;
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 8)] #[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let surface = Arc::new(Mutex::new(surface::Surface::new())); let surface = Arc::new(Mutex::new(surface::Surface::new()));
@ -66,9 +137,11 @@ async fn main() -> anyhow::Result<()> {
let (engine_output_tx, mut engine_output_rx) = mpsc::channel(32); let (engine_output_tx, mut engine_output_rx) = mpsc::channel(32);
let (engine_input_tx, engine_input_rx) = mpsc::channel(16); let (engine_input_tx, engine_input_rx) = mpsc::channel(16);
let encoder_state = Arc::new(TokioMutex::new(EncoderState::new()));
// H.264 encoder related // H.264 encoder related
let frame: Arc<Mutex<Option<ffmpeg::frame::Video>>> = Arc::new(Mutex::new(None)); //let frame: Arc<Mutex<Option<ffmpeg::frame::Video>>> = Arc::new(Mutex::new(None));
let (mut encoder_rx, encoder_tx) = video::encoder_thread_spawn(&frame); let (encoder_tx, mut encoder_rx) = mpsc::channel(8);
let state = Arc::new(AppState::new(engine_input_tx, encoder_tx)); let state = Arc::new(AppState::new(engine_input_tx, encoder_tx));
@ -76,28 +149,49 @@ async fn main() -> anyhow::Result<()> {
let mut vnc_client = let mut vnc_client =
vnc_engine::Client::new(engine_output_tx, engine_input_rx, surface.clone()); vnc_engine::Client::new(engine_output_tx, engine_input_rx, surface.clone());
// vnc client task
tokio::spawn(async move { tokio::spawn(async move {
let addr = vnc_engine::Address::Tcp("10.16.0.1:5930".parse().expect("its over")); let addr = vnc_engine::Address::Tcp("10.16.0.1:5930".parse().expect("its over"));
//let addr = vnc_engine::Address::Tcp("127.0.0.1:6930".parse().expect("its over")); //let addr = vnc_engine::Address::Tcp("127.0.0.1:6930".parse().expect("its over"));
vnc_client.connect_and_run(addr).await vnc_client.connect_and_run(addr).await
}); });
// vnc recv task
let state_clone = state.clone(); let state_clone = state.clone();
let encoder_state_clone = encoder_state.clone();
let vnc_recv_handle = tokio::spawn(async move { let vnc_recv_handle = tokio::spawn(async move {
let frame_clone = frame.clone();
let encoder_tx_clone = state_clone.encoder_tx.clone();
let surface_clone = surface.clone(); let surface_clone = surface.clone();
// first frame is always a key frame
let mut pts = 0u64;
let mut force_keyframe = true;
let mut frame_update = false;
let mut connected = false;
loop { loop {
match encoder_rx.try_recv() {
Ok(()) => {
if connected {
// force keyframe
force_keyframe = true;
frame_update = true;
}
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
}
match engine_output_rx.try_recv() { match engine_output_rx.try_recv() {
Ok(msg) => match msg { Ok(msg) => match msg {
VncMessageOutput::Connect => { VncMessageOutput::Connect => {
println!("connected") println!("connected");
connected = true;
} }
VncMessageOutput::Disconnect => { VncMessageOutput::Disconnect => {
println!("disconnect") println!("disconnect");
} }
VncMessageOutput::FramebufferUpdate => { VncMessageOutput::FramebufferUpdate => {
@ -105,21 +199,10 @@ async fn main() -> anyhow::Result<()> {
// First we copy the current VNC framebuffer to the shared // First we copy the current VNC framebuffer to the shared
// frame between the encoder thread and ffmpeg // frame between the encoder thread and ffmpeg
// don't actually do anything if no one is connected at all
{ {
let lk = state_clone.websocket_count.lock().await; let mut state_locked = encoder_state_clone.lock().await;
if *lk < 1 {
continue;
}
}
// TODO: Do this on the encoder thread let mut_frame = state_locked.frame();
{
let mut frame_locked = frame.lock().expect(
"Couldn't lock frame on our end. Did the encoder thread panic?",
);
let mut_frame = frame_locked.as_mut().expect("it's None? why?");
let width = mut_frame.width(); let width = mut_frame.width();
let height = mut_frame.height(); let height = mut_frame.height();
@ -134,7 +217,6 @@ async fn main() -> anyhow::Result<()> {
for y in 0..height { for y in 0..height {
let line_stride = (y * width) as usize; let line_stride = (y * width) as usize;
// Make a slice for the line // Make a slice for the line
// SAFETY: The allocation is guaranteed to be large enough // SAFETY: The allocation is guaranteed to be large enough
// for this to work from y = 0..height // for this to work from y = 0..height
@ -146,42 +228,22 @@ async fn main() -> anyhow::Result<()> {
dest_line_slice.copy_from_slice( dest_line_slice.copy_from_slice(
&surf_buf[line_stride..line_stride + width as usize], &surf_buf[line_stride..line_stride + width as usize],
); );
/*
for x in 0..width {
unsafe {
let ofs = (y * width + x) as usize;
*buf_ptr.add(ofs) = surf_buf[ofs];
}
}
*/
} }
} }
let _ = encoder_tx_clone frame_update = true;
.lock()
.await
.send(crate::encoder_thread::EncodeThreadInput::SendFrame)
.await;
} }
VncMessageOutput::FramebufferResized(size) => { VncMessageOutput::FramebufferResized(size) => {
// make a new frame for the encoder
{ {
let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); let mut state_locked = encoder_state_clone.lock().await;
state_locked.init(size).expect("fuck you");
*lk_frame = Some(ffmpeg::frame::Video::new( // reset our internal state
ffmpeg::format::Pixel::BGRA, pts = 0;
size.clone().width, force_keyframe = true;
size.clone().height, frame_update = false;
));
} }
let _ = encoder_tx_clone
.lock()
.await
.send(crate::encoder_thread::EncodeThreadInput::Init { size: size })
.await;
} }
}, },
@ -189,20 +251,32 @@ async fn main() -> anyhow::Result<()> {
Err(TryRecvError::Empty) => {} Err(TryRecvError::Empty) => {}
} }
match encoder_rx.try_recv() { // send frame if we should.
Ok(msg) => match msg { if frame_update && connected {
encoder_thread::EncodeThreadOutput::Frame { mut packet } => { let mut state_locked = encoder_state_clone.lock().await;
match state_locked.send_frame(pts, force_keyframe) {
Some(mut packet) => {
let vec = { let vec = {
let data = packet.data_mut().expect("packet is empty somehow"); let data = packet.data_mut().expect("packet is empty somehow");
data.to_vec() data.to_vec()
}; };
let _ = state_clone let _ = state_clone
.websocket_broadcast_tx .websocket_broadcast_tx
.send(ws::Message::Binary(vec)); .send(ws::Message::Binary(vec));
pts += 1;
if force_keyframe {
force_keyframe = false;
} }
}, }
Err(TryRecvError::Empty) => {}
_ => break, None => {}
}
frame_update = false;
} }
tokio::time::sleep(Duration::from_millis(2)).await; tokio::time::sleep(Duration::from_millis(2)).await;
@ -272,13 +346,7 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let locked = state.encoder_tx.lock().await; let locked = state.encoder_tx.lock().await;
// Force a ws connection to mean a keyframe // Force a ws connection to mean a keyframe
let _ = locked let _ = locked.send(()).await;
.send(crate::encoder_thread::EncodeThreadInput::ForceKeyframe)
.await;
let _ = locked
.send(crate::encoder_thread::EncodeThreadInput::SendFrame)
.await;
} }
// random username // random username

View file

@ -1,162 +0,0 @@
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::mpsc::{self, error::TryRecvError};
use super::ffmpeg;
use super::h264_encoder::H264Encoder;
use super::hwframe::HwFrameContext;
pub enum EncodeThreadInput {
Init { size: crate::types::Size },
ForceKeyframe,
SendFrame,
}
#[derive(Clone)]
pub enum EncodeThreadOutput {
Frame { packet: ffmpeg::Packet },
}
#[inline]
fn set_frame_flags(frame: &mut ffmpeg::Frame, force_keyframe: bool) {
unsafe {
if force_keyframe {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I;
(*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY;
(*frame.as_mut_ptr()).key_frame = 1;
} else {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE;
(*frame.as_mut_ptr()).flags = 0i32;
(*frame.as_mut_ptr()).key_frame = 0;
}
}
}
fn encoder_thread_main(
mut rx: mpsc::Receiver<EncodeThreadInput>,
tx: mpsc::Sender<EncodeThreadOutput>,
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> anyhow::Result<()> {
let mut packet = ffmpeg::Packet::empty();
let mut encoder: Option<H264Encoder> = None;
// FIXME: for HW frame support
//let dev = cudarc::driver::CudaDevice::new(0)?;
let mut frame_number = 0usize;
let mut force_keyframe = false;
loop {
match rx.try_recv() {
Ok(msg) => match msg {
EncodeThreadInput::Init { size } => {
frame_number = 0;
if force_keyframe {
force_keyframe = false;
}
encoder = Some(H264Encoder::new_nvenc_swframe(
size.clone(),
60,
5 * (1000 * 1000),
)?);
}
EncodeThreadInput::ForceKeyframe => {
println!("got force keyframe request");
force_keyframe = true;
}
EncodeThreadInput::SendFrame => {
let enc = encoder.as_mut().unwrap();
// let's encode a frame
let mut producer_frame_locked = frame.lock().expect("Couldn't lock producer frame");
let producer_frame = producer_frame_locked.as_mut().expect("NOOOO");
// set the right flags!!
set_frame_flags(producer_frame, force_keyframe);
unsafe {
(*producer_frame.as_mut_ptr()).pts = frame_number as i64;
}
if enc.is_hardware() {
// should always be Some if we get here on this path
// let mut mut_hw_frame = hw_frame.as_mut().unwrap();
enc.send_frame(&producer_frame);
enc.receive_packet(&mut packet)
.expect("failed to recv packet");
} else {
todo!("FIXME: re-implement SW support.");
}
// If a packet was recieved dump it
unsafe {
if !packet.is_empty() {
let _ = tx.blocking_send(EncodeThreadOutput::Frame {
packet: packet.clone(),
});
}
}
frame_number += 1;
if force_keyframe {
force_keyframe = false;
}
}
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(1));
}
}
}
if encoder.is_some() {
let enc = encoder.as_mut().unwrap();
enc.send_eof();
enc.receive_packet(&mut packet)
.expect("failed to recv eof packet");
unsafe {
if !packet.is_empty() {
let _ = tx.blocking_send(EncodeThreadOutput::Frame {
packet: packet.clone(),
});
}
}
}
Ok(())
}
pub fn encoder_thread_spawn(
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> (
mpsc::Receiver<EncodeThreadOutput>,
mpsc::Sender<EncodeThreadInput>,
) {
let (in_tx, in_rx) = mpsc::channel(32);
let (out_tx, out_rx) = mpsc::channel(32);
let clone = Arc::clone(frame);
std::thread::spawn(move || encoder_thread_main(in_rx, out_tx, &clone));
(out_rx, in_tx)
}

View file

@ -46,11 +46,8 @@ fn create_context_and_set_common_parameters(
video_encoder_context.set_time_base(ffmpeg::Rational(1, max_framerate as i32).invert()); video_encoder_context.set_time_base(ffmpeg::Rational(1, max_framerate as i32).invert());
video_encoder_context.set_format(ffmpeg::format::Pixel::YUV420P); video_encoder_context.set_format(ffmpeg::format::Pixel::YUV420P);
// We manually control the GOP of the stream so we only need one stream // The GOP here is setup to balance keyframe retransmission with bandwidth.
// that can be broadcast to many users. video_encoder_context.set_gop((max_framerate * 4) as u32);
//
// TODO: Either no GOP, or a fairly large one.
video_encoder_context.set_gop(i32::MAX as u32);
video_encoder_context.set_max_b_frames(0); video_encoder_context.set_max_b_frames(0);
unsafe { unsafe {
@ -68,19 +65,12 @@ pub enum H264Encoder {
encoder: ffmpeg::encoder::video::Encoder, encoder: ffmpeg::encoder::video::Encoder,
}, },
/// Hardware encoding, with frames uploaded to the GPU by ffmpeg.
NvencSWFrame { NvencSWFrame {
encoder: ffmpeg::encoder::video::Encoder, encoder: ffmpeg::encoder::video::Encoder,
// FIXME: This will be needed if the user wants to encode
// frames always stored in GPU memory. For now we let ffmpeg upload
// and download frames to the GPU, but at some point
// it would be a good idea to have, say,
// new_nvenc_hwframe(dev, ...)
// new_nvenc_cpuframe(...) (has the same behaviour as current new_nvenc)
//hw_context: HwFrameContext,
}, },
/// Hardware encoding, with frames already on the GPU.
NvencHWFrame { NvencHWFrame {
encoder: ffmpeg::encoder::video::Encoder, encoder: ffmpeg::encoder::video::Encoder,
hw_context: HwFrameContext, hw_context: HwFrameContext,
@ -142,11 +132,10 @@ impl H264Encoder {
create_context_and_set_common_parameters("h264_nvenc", &size, max_framerate, bitrate) create_context_and_set_common_parameters("h264_nvenc", &size, max_framerate, bitrate)
.with_context(|| "while trying to create encoder")?; .with_context(|| "while trying to create encoder")?;
video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32); video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32);
video_encoder_context.set_qmin(38); video_encoder_context.set_qmin(35);
video_encoder_context.set_qmax(35); video_encoder_context.set_qmax(38);
// set h264_nvenc options // set h264_nvenc options
let mut dict = ffmpeg::Dictionary::new(); let mut dict = ffmpeg::Dictionary::new();
@ -179,7 +168,6 @@ impl H264Encoder {
max_framerate: u32, max_framerate: u32,
bitrate: usize, bitrate: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
/* /*
(See FIXMEs above) (See FIXMEs above)
@ -207,7 +195,7 @@ impl H264Encoder {
match self { match self {
Self::Software { .. } => false, Self::Software { .. } => false,
Self::NvencSWFrame { .. } => true, Self::NvencSWFrame { .. } => true,
Self::NvencHWFrame { .. } => true Self::NvencHWFrame { .. } => true,
} }
} }
@ -218,11 +206,20 @@ impl H264Encoder {
// } // }
//} //}
pub fn create_frame( pub fn create_frame(&mut self) -> ffmpeg::frame::Video {
&mut self,
format: ffmpeg::format::Pixel, match self {
) -> ffmpeg::frame::Video { Self::Software { encoder } | Self::NvencSWFrame { encoder } => {
// FIXME: This should be used to create frames in a unified manner. return ffmpeg::frame::Video::new(encoder.format(), encoder.width(), encoder.height());
}
Self::NvencHWFrame {
encoder,
hw_context,
} => {
todo!("Implement H264Encoder::create_frame() for NvencHWFrame!");
}
}
/* /*
unsafe { unsafe {
@ -263,9 +260,12 @@ impl H264Encoder {
// Realistically this should be the same right? // Realistically this should be the same right?
encoder.send_frame(frame).unwrap(); encoder.send_frame(frame).unwrap();
//todo!("Requires support."); //todo!("Requires support.");
}, }
Self::NvencHWFrame { encoder, hw_context } => { Self::NvencHWFrame {
encoder,
hw_context,
} => {
todo!("Implement send_frame() for NvencHWFrame"); todo!("Implement send_frame() for NvencHWFrame");
} }
} }
@ -281,9 +281,12 @@ impl H264Encoder {
// Realistically this should be the same right? // Realistically this should be the same right?
encoder.send_eof().unwrap(); encoder.send_eof().unwrap();
// todo!("Requires support."); // todo!("Requires support.");
}, }
Self::NvencHWFrame { encoder, hw_context } => { Self::NvencHWFrame {
encoder,
hw_context,
} => {
todo!("Implement send_eof() for NvencHWFrame"); todo!("Implement send_eof() for NvencHWFrame");
} }
} }
@ -295,7 +298,10 @@ impl H264Encoder {
Self::NvencSWFrame { encoder } => encoder.receive_packet(packet), Self::NvencSWFrame { encoder } => encoder.receive_packet(packet),
// this might work? // this might work?
Self::NvencHWFrame { encoder, hw_context } => encoder.receive_packet(packet) Self::NvencHWFrame {
encoder,
hw_context,
} => encoder.receive_packet(packet),
}; };
} }

View file

@ -1,4 +1,3 @@
pub mod encoder_thread;
pub mod h264_encoder; pub mod h264_encoder;
//pub mod lc_muxer; //pub mod lc_muxer;
@ -8,8 +7,6 @@ pub use ffmpeg as ffmpeg;
pub mod hwdevice; pub mod hwdevice;
pub mod hwframe; pub mod hwframe;
pub use encoder_thread::*;
// from hgaiser/moonshine // from hgaiser/moonshine
pub fn check_ret(error_code: i32) -> Result<(), ffmpeg::Error> { pub fn check_ret(error_code: i32) -> Result<(), ffmpeg::Error> {
if error_code != 0 { if error_code != 0 {