diff --git a/server/src/encoder_thread.rs b/server/src/encoder_thread.rs index 347f80a..e8da40c 100644 --- a/server/src/encoder_thread.rs +++ b/server/src/encoder_thread.rs @@ -7,7 +7,6 @@ use tokio::sync::mpsc::{self, error::TryRecvError}; pub enum EncodeThreadInput { Init { size: crate::types::Size }, ForceKeyframe, - Shutdown, SendFrame, } @@ -20,7 +19,6 @@ pub enum EncodeThreadOutput { fn set_frame_flags(frame: &mut ffmpeg_the_third::Frame, force_keyframe: bool) { unsafe { if force_keyframe { - //println!("frame {frame_number} will be a keyframe"); (*frame.as_mut_ptr()).pict_type = ffmpeg_the_third::sys::AVPictureType::AV_PICTURE_TYPE_I; (*frame.as_mut_ptr()).flags = ffmpeg_the_third::sys::AV_FRAME_FLAG_KEY; @@ -49,8 +47,6 @@ fn encoder_thread_main( let mut frame_number = 0usize; let mut force_keyframe = false; - println!("encoder thread spawned"); - loop { match rx.try_recv() { Ok(msg) => match msg { @@ -82,26 +78,6 @@ fn encoder_thread_main( ); } - EncodeThreadInput::Shutdown => { - if encoder.is_some() { - let enc = encoder.as_mut().unwrap(); - - enc.send_eof(); - enc.receive_packet(&mut packet) - .expect("failed to recv eof packet"); - - unsafe { - if !packet.is_empty() { - let _ = tx.blocking_send(EncodeThreadOutput::Frame { - packet: packet.clone(), - }); - } - } - } - - break; - } - EncodeThreadInput::ForceKeyframe => { force_keyframe = true; } @@ -157,6 +133,22 @@ fn encoder_thread_main( } } } + + if encoder.is_some() { + let enc = encoder.as_mut().unwrap(); + + enc.send_eof(); + enc.receive_packet(&mut packet) + .expect("failed to recv eof packet"); + + unsafe { + if !packet.is_empty() { + let _ = tx.blocking_send(EncodeThreadOutput::Frame { + packet: packet.clone(), + }); + } + } + } } pub fn encoder_thread_spawn( diff --git a/server/src/main.rs b/server/src/main.rs index deed8eb..00c1dd2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,10 +4,16 @@ mod surface; mod types; mod vnc_engine; -use std::sync::{Arc, Mutex}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use rand::distributions::DistString; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{ + broadcast, + mpsc::{self, error::TryRecvError}, +}; use vnc_engine::VncMessageOutput; use std::net::SocketAddr; @@ -15,7 +21,7 @@ use std::net::SocketAddr; use axum::{ extract::{ connect_info::ConnectInfo, - ws::{Message, WebSocket, WebSocketUpgrade}, + ws::{self, Message, WebSocket, WebSocketUpgrade}, State, }, response::IntoResponse, @@ -25,39 +31,149 @@ use axum::{ use futures::{sink::SinkExt, stream::StreamExt}; -#[derive(Clone, Debug)] -enum WsMessage { - Text(String), - Buffer(Vec), -} - struct AppState { /// Channel for sending things to the VNC engine /// should later be used for control engine_tx: mpsc::Sender, + encoder_tx: mpsc::Sender, - websocket_broadcast_tx: broadcast::Sender, + websocket_broadcast_tx: broadcast::Sender, } impl AppState { - fn new(engine_tx: mpsc::Sender) -> Self { + fn new( + engine_tx: mpsc::Sender, + encoder_tx: mpsc::Sender, + ) -> Self { let (chat_tx, _chat_rx) = broadcast::channel(10); Self { engine_tx: engine_tx, + encoder_tx: encoder_tx, websocket_broadcast_tx: chat_tx, } } } -#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +#[tokio::main(flavor = "multi_thread", worker_threads = 8)] async fn main() -> anyhow::Result<()> { let surface = Arc::new(Mutex::new(surface::Surface::new())); let (engine_output_tx, mut engine_output_rx) = mpsc::channel(32); let (engine_input_tx, engine_input_rx) = mpsc::channel(16); - let state = Arc::new(AppState::new(engine_input_tx)); + // H.264 encoder related + let frame: Arc>> = Arc::new(Mutex::new(None)); + let (mut encoder_rx, encoder_tx) = encoder_thread::encoder_thread_spawn(&frame); + let state = Arc::new(AppState::new(engine_input_tx, encoder_tx)); + + // VNC related + let mut vnc_client = + vnc_engine::Client::new(engine_output_tx, engine_input_rx, surface.clone()); + + tokio::spawn(async move { + let addr = vnc_engine::Address::Tcp("10.16.0.1:5930".parse().expect("its over")); + //let addr = vnc_engine::Address::Tcp("127.0.0.1:6930".parse().expect("its over")); + vnc_client.connect_and_run(addr).await + }); + + let state_clone = state.clone(); + + let vnc_recv_handle = tokio::spawn(async move { + let frame_clone = frame.clone(); + let encoder_tx_clone = state_clone.encoder_tx.clone(); + let surface_clone = surface.clone(); + + loop { + match engine_output_rx.try_recv() { + Ok(msg) => match msg { + VncMessageOutput::Connect => { + println!("connected") + } + + VncMessageOutput::Disconnect => { + println!("disconnect") + } + + VncMessageOutput::FramebufferUpdate => { + // let's encode a frame + // First we copy the current VNC framebuffer to the shared + // frame between the encoder thread and ffmpeg + { + let mut frame_locked = frame + .lock() + .expect("Couldn't lock frame on our end. Did the VNC end panic?"); + + let mut_frame = frame_locked.as_mut().expect("it's None? why?"); + + let width = mut_frame.width(); + let height = mut_frame.height(); + + let mut surf = surface_clone.lock().expect( + "locking the surface to paint it to the ffmpeg frame failed", + ); + let surf_buf = surf.get_buffer(); + + let buf_ptr = + unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; + + for y in 0..height { + for x in 0..width { + unsafe { + let ofs = (y * width + x) as usize; + *buf_ptr.add(ofs) = surf_buf[ofs]; + } + } + } + } + + let _ = encoder_tx_clone + .send(crate::encoder_thread::EncodeThreadInput::SendFrame) + .await; + } + + VncMessageOutput::FramebufferResized(size) => { + // make a new frame for the encoder + { + let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame"); + *lk_frame = Some(ffmpeg_the_third::frame::Video::new( + ffmpeg_the_third::format::Pixel::BGRA, + size.clone().width, + size.clone().height, + )); + } + + let _ = encoder_tx_clone + .send(crate::encoder_thread::EncodeThreadInput::Init { size: size }) + .await; + } + }, + + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => {} + } + + match encoder_rx.try_recv() { + Ok(msg) => match msg { + encoder_thread::EncodeThreadOutput::Frame { mut packet } => { + let vec = { + let data = packet.data_mut().expect("packet is empty somehow"); + data.to_vec() + }; + let _ = state_clone + .websocket_broadcast_tx + .send(ws::Message::Binary(vec)); + } + }, + Err(TryRecvError::Empty) => {} + _ => break, + } + + tokio::time::sleep(Duration::from_millis(2)).await; + } + }); + + // Axum websocket server let app: Router<()> = Router::new() .route( "/", @@ -71,48 +187,26 @@ async fn main() -> anyhow::Result<()> { ) .with_state(state.clone()); - let mut engine = vnc_engine::Client::new(engine_output_tx, engine_input_rx, surface); - let tcp_listener = tokio::net::TcpListener::bind("0.0.0.0:4940") .await .expect("failed to listen"); - let vnc_engine_handle = tokio::spawn(async move { - let addr = vnc_engine::Address::Tcp("10.16.0.1:5930".parse().expect("its over")); - //let addr = vnc_engine::Address::Tcp("127.0.0.1:6930".parse().expect("its over")); - engine.connect_and_run(addr).await - }); - - let state_clone = Arc::clone(&state); - tokio::spawn(async move { - while let Some(msg) = engine_output_rx.recv().await { - match msg { - VncMessageOutput::Connect => { - println!("connected") - } - - VncMessageOutput::Disconnect => { - println!("disconnect") - } - - VncMessageOutput::FramebufferUpdate(vec) => { - let _ = state_clone - .websocket_broadcast_tx - .send(WsMessage::Buffer(vec)); - } - _ => {} - } - } - }); - - let (res1, res2) = tokio::join!( - axum::serve( - tcp_listener, - app.into_make_service_with_connect_info::(), - ), - vnc_engine_handle + let axum_future = axum::serve( + tcp_listener, + app.into_make_service_with_connect_info::(), ); + // If the VNC client disconnects we should exit. + tokio::select! { + _ = axum_future => { + println!("axum died"); + } + + _ = vnc_recv_handle => { + println!("VNC client disconnected, exiting"); + } + } + Ok(()) } @@ -133,8 +227,13 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) // Force a ws connection to mean a keyframe let _ = state - .engine_tx - .send(vnc_engine::VncMessageInput::ForceKeyframe) + .encoder_tx + .send(crate::encoder_thread::EncodeThreadInput::ForceKeyframe) + .await; + + let _ = state + .encoder_tx + .send(crate::encoder_thread::EncodeThreadInput::SendFrame) .await; // random username @@ -148,14 +247,8 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) let mut sub = send_clone.websocket_broadcast_tx.subscribe(); while let Ok(msg) = sub.recv().await { - match msg { - WsMessage::Text(c) => { - let _ = sender.send(Message::Text(c)).await; - } - - WsMessage::Buffer(buffer) => { - let _ = sender.send(Message::Binary(buffer)).await; - } + if sender.send(msg).await.is_err() { + break; } } }); @@ -186,8 +279,8 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc) state .websocket_broadcast_tx - .send(WsMessage::Text( - serde_json::to_string(&send).expect("penis"), + .send(ws::Message::Text( + serde_json::to_string(&send).expect("oh well"), )) .expect("boom"); diff --git a/server/src/vnc_engine.rs b/server/src/vnc_engine.rs index 450c208..97d7428 100644 --- a/server/src/vnc_engine.rs +++ b/server/src/vnc_engine.rs @@ -1,7 +1,5 @@ //! Native-side VNC client. This is usually run in another OS thread. -use crate::encoder_thread::{encoder_thread_spawn, EncodeThreadOutput}; - use super::surface::Surface; use super::types::*; @@ -29,24 +27,14 @@ pub enum VncMessageOutput { Connect, Disconnect, // this will contain a single annex B packet - FramebufferUpdate(Vec), + FramebufferUpdate, FramebufferResized(Size), } #[derive(Debug)] pub enum VncMessageInput { - KeyEvent { - keysym: u32, - pressed: bool, - }, - MouseEvent { - pt: Point, - buttons: u8, - }, - Disconnect, - - /// Forces a keyframe to occur in the video stream. - ForceKeyframe, + KeyEvent { keysym: u32, pressed: bool }, + MouseEvent { pt: Point, buttons: u8 }, } pub struct Client { @@ -110,10 +98,6 @@ impl Client { self.out_tx.send(VncMessageOutput::Connect).await?; - // h.264 encoder related - let frame: Arc>> = Arc::new(Mutex::new(None)); - let (mut encoder_rx, encoder_tx) = encoder_thread_spawn(&frame); - loop { // Pull a event and act on it. If none are there, it's fine and we can just move on to // advancing the vnc client, but if the channel is closed, that means we are to disconnect @@ -137,16 +121,6 @@ impl Client { })) .await?; } - VncMessageInput::Disconnect => break, - - VncMessageInput::ForceKeyframe => { - encoder_tx - .send(crate::encoder_thread::EncodeThreadInput::ForceKeyframe) - .await?; - encoder_tx - .send(crate::encoder_thread::EncodeThreadInput::SendFrame) - .await?; - } }, Err(TryRecvError::Empty) => {} @@ -176,22 +150,6 @@ impl Client { height: res.height as u32, }; - // make a new frame for the encoder - { - let mut lk_frame = frame.lock().expect("oh FUCK"); - *lk_frame = Some(ffmpeg_the_third::frame::Video::new( - ffmpeg_the_third::format::Pixel::BGRA, - cvm_size.clone().width, - cvm_size.clone().height, - )); - } - - encoder_tx - .send(crate::encoder_thread::EncodeThreadInput::Init { - size: cvm_size.clone(), - }) - .await?; - self.out_tx .send(VncMessageOutput::FramebufferResized(cvm_size)) .await?; @@ -230,44 +188,13 @@ impl Client { if !self.rects_in_frame.is_empty() { // We don't care what rects ARE there, but // if none were pressent then we probably need not bother - //println!("vnc engine frame"); - // let's encode a frame - // First we copy the current VNC framebuffer to the shared - // frame between the encoder thread and ffmpeg - { - let mut frame_locked = - frame.lock().expect("Couldn't lock the damn frame. FUCK"); - - let mut_frame = frame_locked.as_mut().expect("NOOOO"); - - let width = mut_frame.width(); - let height = mut_frame.height(); - - let mut surf = self.surf.lock().expect("frame lock fail"); - let surf_buf = surf.get_buffer(); - - let buf_ptr = - unsafe { (*(*mut_frame.as_mut_ptr()).buf[0]).data as *mut u32 }; - - for y in 0..height { - for x in 0..width { - unsafe { - let ofs = (y * width + x) as usize; - *buf_ptr.add(ofs) = surf_buf[ofs]; - } - } - } - } - - encoder_tx - .send(crate::encoder_thread::EncodeThreadInput::SendFrame) + self.out_tx + .send(VncMessageOutput::FramebufferUpdate) .await?; self.rects_in_frame.clear(); } - - // send frame to encoder thread, pull encoded data back } // TODO: we might want to pass this to js at some point @@ -276,29 +203,6 @@ impl Client { } } - match encoder_rx.try_recv() { - Ok(msg) => { - match msg { - EncodeThreadOutput::Frame { mut packet } => { - let vec = { - //f !packet.is_empty() - - let data = packet.data_mut().expect("packet is not empty"); - - data.to_vec() - }; - - self.out_tx - .send(VncMessageOutput::FramebufferUpdate(vec)) - .await?; - } - } - } - Err(TryRecvError::Empty) => {} - _ => break, - } - - // Sleep to give CPU time tokio::time::sleep(Duration::from_millis(2)).await; }