server: cleanups

This commit is contained in:
Lily Tsuru 2024-10-10 00:56:21 -04:00
parent 084efc28da
commit cf7c2d09d3
4 changed files with 146 additions and 72 deletions

View file

@ -6,8 +6,9 @@ Some changes will ultimately be made before this is ever thought about being int
## Server side changes (probably) ## Server side changes (probably)
- HW encode support (with software being a fallback) - full-HW encode (via a hw frame context)
- for now nvenc is fine enough (it's also what we will have access to) - Investigate if EGL device platform contexts can be used. In theory if it works with
CUDA's OpenGL interop (.. don't see why it wouldn't) we can just share a
- Code cleanup - Code cleanup
- also maybe nal SPS rewriting (stolen from webrtc) to force 1:1 decoding, - also maybe nal SPS rewriting (stolen from webrtc) to force 1:1 decoding,

View file

@ -41,6 +41,7 @@ struct AppState {
encoder_tx: Arc<TokioMutex<mpsc::Sender<encoder_thread::EncodeThreadInput>>>, encoder_tx: Arc<TokioMutex<mpsc::Sender<encoder_thread::EncodeThreadInput>>>,
websocket_broadcast_tx: broadcast::Sender<ws::Message>, websocket_broadcast_tx: broadcast::Sender<ws::Message>,
websocket_count: TokioMutex<usize>,
} }
impl AppState { impl AppState {
@ -53,6 +54,7 @@ impl AppState {
engine_tx: engine_tx, engine_tx: engine_tx,
encoder_tx: Arc::new(TokioMutex::new(encoder_tx)), encoder_tx: Arc::new(TokioMutex::new(encoder_tx)),
websocket_broadcast_tx: chat_tx, websocket_broadcast_tx: chat_tx,
websocket_count: TokioMutex::const_new(0usize),
} }
} }
} }
@ -103,11 +105,19 @@ async fn main() -> anyhow::Result<()> {
// First we copy the current VNC framebuffer to the shared // First we copy the current VNC framebuffer to the shared
// frame between the encoder thread and ffmpeg // frame between the encoder thread and ffmpeg
// don't actually do anything if no one is connected at all
{
let lk = state_clone.websocket_count.lock().await;
if *lk < 1 {
continue;
}
}
// TODO: Do this on the encoder thread // TODO: Do this on the encoder thread
{ {
let mut frame_locked = frame let mut frame_locked = frame.lock().expect(
.lock() "Couldn't lock frame on our end. Did the encoder thread panic?",
.expect("Couldn't lock frame on our end. Did the VNC end panic?"); );
let mut_frame = frame_locked.as_mut().expect("it's None? why?"); let mut_frame = frame_locked.as_mut().expect("it's None? why?");
@ -115,7 +125,7 @@ async fn main() -> anyhow::Result<()> {
let height = mut_frame.height(); let height = mut_frame.height();
let mut surf = surface_clone.lock().expect( let mut surf = surface_clone.lock().expect(
"locking the surface to paint it to the ffmpeg frame failed", "locking the VNC surface to paint it to the ffmpeg frame failed",
); );
let surf_buf = surf.get_buffer(); let surf_buf = surf.get_buffer();
@ -123,12 +133,28 @@ async fn main() -> anyhow::Result<()> {
unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 };
for y in 0..height { for y in 0..height {
let line_stride = (y * width) as usize;
// Make a slice for the line
// SAFETY: The allocation is guaranteed to be large enough
// for this to work from y = 0..height
let dest_line_slice = unsafe {
let dest_line_ptr = buf_ptr.add(line_stride);
std::slice::from_raw_parts_mut(dest_line_ptr, width as usize)
};
dest_line_slice.copy_from_slice(
&surf_buf[line_stride..line_stride + width as usize],
);
/*
for x in 0..width { for x in 0..width {
unsafe { unsafe {
let ofs = (y * width + x) as usize; let ofs = (y * width + x) as usize;
*buf_ptr.add(ofs) = surf_buf[ofs]; *buf_ptr.add(ofs) = surf_buf[ofs];
} }
} }
*/
} }
} }
@ -144,13 +170,11 @@ async fn main() -> anyhow::Result<()> {
{ {
let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame");
*lk_frame = Some(ffmpeg::frame::Video::new( *lk_frame = Some(ffmpeg::frame::Video::new(
ffmpeg::format::Pixel::BGRA, ffmpeg::format::Pixel::BGRA,
size.clone().width, size.clone().width,
size.clone().height, size.clone().height,
)); ));
} }
let _ = encoder_tx_clone let _ = encoder_tx_clone
@ -237,6 +261,13 @@ async fn ws_handler(
async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>) { async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>) {
let (mut sender, mut receiver) = socket.split(); let (mut sender, mut receiver) = socket.split();
// increment connection count
let inc_clone = Arc::clone(&state);
{
let mut lk = inc_clone.websocket_count.lock().await;
*lk += 1;
}
{ {
let locked = state.encoder_tx.lock().await; let locked = state.encoder_tx.lock().await;
@ -269,6 +300,8 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let username_clone = Arc::clone(&username); let username_clone = Arc::clone(&username);
let recv_clone = Arc::clone(&state);
let mut recv_task = tokio::spawn(async move { let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await { while let Some(Ok(msg)) = receiver.next().await {
match msg { match msg {
@ -291,7 +324,7 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
"msg": json["msg"].as_str().unwrap() "msg": json["msg"].as_str().unwrap()
}); });
state recv_clone
.websocket_broadcast_tx .websocket_broadcast_tx
.send(ws::Message::Text( .send(ws::Message::Text(
serde_json::to_string(&send).expect("oh well"), serde_json::to_string(&send).expect("oh well"),
@ -313,7 +346,7 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let keysym = json["keysym"].as_u64().unwrap() as u32; let keysym = json["keysym"].as_u64().unwrap() as u32;
let pressed = json["pressed"].as_u64().unwrap() == 1; let pressed = json["pressed"].as_u64().unwrap() == 1;
let _ = state let _ = recv_clone
.engine_tx .engine_tx
.send(vnc_engine::VncMessageInput::KeyEvent { .send(vnc_engine::VncMessageInput::KeyEvent {
keysym: keysym, keysym: keysym,
@ -339,7 +372,7 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let y = json["y"].as_u64().unwrap() as u32; let y = json["y"].as_u64().unwrap() as u32;
let mask = json["mask"].as_u64().unwrap() as u8; let mask = json["mask"].as_u64().unwrap() as u8;
let _ = state let _ = recv_clone
.engine_tx .engine_tx
.send(vnc_engine::VncMessageInput::MouseEvent { .send(vnc_engine::VncMessageInput::MouseEvent {
pt: types::Point { x: x, y: y }, pt: types::Point { x: x, y: y },
@ -371,4 +404,10 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
} }
println!("{username} ({who}) left."); println!("{username} ({who}) left.");
let dec_clone = Arc::clone(&state);
{
let mut lk = dec_clone.websocket_count.lock().await;
*lk -= 1;
}
} }

View file

@ -35,36 +35,6 @@ fn set_frame_flags(frame: &mut ffmpeg::Frame, force_keyframe: bool) {
} }
} }
fn create_hardware_frame(
width: u32,
height: u32,
pixel_format: ffmpeg::format::Pixel,
context: &mut HwFrameContext,
) -> anyhow::Result<ffmpeg::Frame> {
unsafe {
let mut frame = ffmpeg::Frame::empty();
(*frame.as_mut_ptr()).format = pixel_format as i32;
(*frame.as_mut_ptr()).width = width as i32;
(*frame.as_mut_ptr()).height = height as i32;
(*frame.as_mut_ptr()).hw_frames_ctx = context.as_raw_mut();
super::check_ret(ffmpeg::sys::av_hwframe_get_buffer(
context.as_raw_mut(),
frame.as_mut_ptr(),
0,
))?;
super::check_ret(ffmpeg::sys::av_hwframe_get_buffer(
context.as_raw_mut(),
frame.as_mut_ptr(),
0,
))?;
(*frame.as_mut_ptr()).linesize[0] = (*frame.as_ptr()).width * 4;
Ok(frame)
}
}
fn encoder_thread_main( fn encoder_thread_main(
mut rx: mpsc::Receiver<EncodeThreadInput>, mut rx: mpsc::Receiver<EncodeThreadInput>,
tx: mpsc::Sender<EncodeThreadOutput>, tx: mpsc::Sender<EncodeThreadOutput>,
@ -74,7 +44,8 @@ fn encoder_thread_main(
let mut encoder: Option<H264Encoder> = None; let mut encoder: Option<H264Encoder> = None;
let dev = cudarc::driver::CudaDevice::new(0)?; // FIXME: for HW frame support
//let dev = cudarc::driver::CudaDevice::new(0)?;
let mut frame_number = 0usize; let mut frame_number = 0usize;
let mut force_keyframe = false; let mut force_keyframe = false;
@ -90,8 +61,7 @@ fn encoder_thread_main(
} }
encoder = Some(H264Encoder::new_nvenc( encoder = Some(H264Encoder::new_nvenc_swframe(
&dev,
size.clone(), size.clone(),
60, 60,
5 * (1000 * 1000), 5 * (1000 * 1000),

View file

@ -68,7 +68,7 @@ pub enum H264Encoder {
encoder: ffmpeg::encoder::video::Encoder, encoder: ffmpeg::encoder::video::Encoder,
}, },
Nvenc { NvencSWFrame {
encoder: ffmpeg::encoder::video::Encoder, encoder: ffmpeg::encoder::video::Encoder,
// FIXME: This will be needed if the user wants to encode // FIXME: This will be needed if the user wants to encode
// frames always stored in GPU memory. For now we let ffmpeg upload // frames always stored in GPU memory. For now we let ffmpeg upload
@ -79,9 +79,16 @@ pub enum H264Encoder {
//hw_context: HwFrameContext, //hw_context: HwFrameContext,
}, },
NvencHWFrame {
encoder: ffmpeg::encoder::video::Encoder,
hw_context: HwFrameContext,
},
} }
impl H264Encoder { impl H264Encoder {
/// Creates a new software encoder.
pub fn new_software(size: Size, max_framerate: u32, bitrate: usize) -> anyhow::Result<Self> { pub fn new_software(size: Size, max_framerate: u32, bitrate: usize) -> anyhow::Result<Self> {
// Create the libx264 context // Create the libx264 context
let (encoder, mut video_encoder_context) = let (encoder, mut video_encoder_context) =
@ -124,9 +131,9 @@ impl H264Encoder {
Ok(Self::Software { encoder: encoder }) Ok(Self::Software { encoder: encoder })
} }
/// Creates a new hardware encoder. /// Creates a new hardware (NVIDIA NVENC) encoder, which encodes
pub fn new_nvenc( /// frames from software input. FFmpeg handles uploading frames to the GPU.
cuda_device: &CudaDevice, pub fn new_nvenc_swframe(
size: Size, size: Size,
max_framerate: u32, max_framerate: u32,
bitrate: usize, bitrate: usize,
@ -135,23 +142,6 @@ impl H264Encoder {
create_context_and_set_common_parameters("h264_nvenc", &size, max_framerate, bitrate) create_context_and_set_common_parameters("h264_nvenc", &size, max_framerate, bitrate)
.with_context(|| "while trying to create encoder")?; .with_context(|| "while trying to create encoder")?;
/*
(See FIXMEs above)
let cuda_device_context = super::hwdevice::CudaDeviceContextBuilder::new()?
.set_cuda_context((*cuda_device.cu_primary_ctx()) as *mut _)
.build()
.with_context(|| "while trying to create CUDA device context")?;
let mut hw_frame_context = super::hwframe::HwFrameContextBuilder::new(cuda_device_context)?
.set_width(size.width)
.set_height(size.height)
.set_sw_format(ffmpeg::format::Pixel::ZRGB32)
.set_format(ffmpeg::format::Pixel::CUDA)
.build()
.with_context(|| "while trying to create CUDA frame context")?;
*/
video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32); video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32);
@ -180,15 +170,44 @@ impl H264Encoder {
.open_as_with(encoder, dict) .open_as_with(encoder, dict)
.with_context(|| "While opening h264_nvenc video codec")?; .with_context(|| "While opening h264_nvenc video codec")?;
Ok(Self::Nvenc { encoder: encoder }) Ok(Self::NvencSWFrame { encoder: encoder })
} }
// FIXME: It's a bit pointless to have this have a mut borrow, pub fn new_nvenc_hwframe(
cuda_device: &CudaDevice,
size: Size,
max_framerate: u32,
bitrate: usize,
) -> anyhow::Result<Self> {
/*
(See FIXMEs above)
let cuda_device_context = super::hwdevice::CudaDeviceContextBuilder::new()?
.set_cuda_context((*cuda_device.cu_primary_ctx()) as *mut _)
.build()
.with_context(|| "while trying to create CUDA device context")?;
let mut hw_frame_context = super::hwframe::HwFrameContextBuilder::new(cuda_device_context)?
.set_width(size.width)
.set_height(size.height)
.set_sw_format(ffmpeg::format::Pixel::ZRGB32)
.set_format(ffmpeg::format::Pixel::CUDA)
.build()
.with_context(|| "while trying to create CUDA frame context")?;
*/
todo!("Implement me!");
}
// NOTE: It's a bit pointless to have this have a mut borrow,
// but you'll probably have a mutable borrow on this already.. // but you'll probably have a mutable borrow on this already..
pub fn is_hardware(&mut self) -> bool { pub fn is_hardware(&mut self) -> bool {
match self { match self {
Self::Software { .. } => false, Self::Software { .. } => false,
Self::Nvenc { .. } => true, Self::NvencSWFrame { .. } => true,
Self::NvencHWFrame { .. } => true
} }
} }
@ -199,16 +218,55 @@ impl H264Encoder {
// } // }
//} //}
pub fn create_frame(
&mut self,
format: ffmpeg::format::Pixel,
) -> ffmpeg::frame::Video {
// FIXME: This should be used to create frames in a unified manner.
/*
unsafe {
let mut frame = ffmpeg::Frame::empty();
(*frame.as_mut_ptr()).format = pixel_format as i32;
(*frame.as_mut_ptr()).width = width as i32;
(*frame.as_mut_ptr()).height = height as i32;
(*frame.as_mut_ptr()).hw_frames_ctx = context.as_raw_mut();
super::check_ret(ffmpeg::sys::av_hwframe_get_buffer(
context.as_raw_mut(),
frame.as_mut_ptr(),
0,
))?;
super::check_ret(ffmpeg::sys::av_hwframe_get_buffer(
context.as_raw_mut(),
frame.as_mut_ptr(),
0,
))?;
(*frame.as_mut_ptr()).linesize[0] = (*frame.as_ptr()).width * 4;
Ok(frame)
}
*/
todo!("FIXME");
}
pub fn send_frame(&mut self, frame: &ffmpeg::Frame) { pub fn send_frame(&mut self, frame: &ffmpeg::Frame) {
match self { match self {
Self::Software { encoder } => { Self::Software { encoder } => {
encoder.send_frame(frame).unwrap(); encoder.send_frame(frame).unwrap();
} }
Self::Nvenc { encoder } => { Self::NvencSWFrame { encoder } => {
// Realistically this should be the same right? // Realistically this should be the same right?
encoder.send_frame(frame).unwrap(); encoder.send_frame(frame).unwrap();
//todo!("Requires support."); //todo!("Requires support.");
},
Self::NvencHWFrame { encoder, hw_context } => {
todo!("Implement send_frame() for NvencHWFrame");
} }
} }
} }
@ -219,10 +277,14 @@ impl H264Encoder {
encoder.send_eof().unwrap(); encoder.send_eof().unwrap();
} }
Self::Nvenc { encoder } => { Self::NvencSWFrame { encoder } => {
// Realistically this should be the same right? // Realistically this should be the same right?
encoder.send_eof().unwrap(); encoder.send_eof().unwrap();
// todo!("Requires support."); // todo!("Requires support.");
},
Self::NvencHWFrame { encoder, hw_context } => {
todo!("Implement send_eof() for NvencHWFrame");
} }
} }
} }
@ -230,8 +292,10 @@ impl H264Encoder {
fn receive_packet_impl(&mut self, packet: &mut ffmpeg::Packet) -> Result<(), ffmpeg::Error> { fn receive_packet_impl(&mut self, packet: &mut ffmpeg::Packet) -> Result<(), ffmpeg::Error> {
return match self { return match self {
Self::Software { encoder } => encoder.receive_packet(packet), Self::Software { encoder } => encoder.receive_packet(packet),
Self::NvencSWFrame { encoder } => encoder.receive_packet(packet),
Self::Nvenc { encoder } => encoder.receive_packet(packet), // this might work?
Self::NvencHWFrame { encoder, hw_context } => encoder.receive_packet(packet)
}; };
} }