working libretro sex

This commit is contained in:
Lily Tsuru 2024-10-10 22:06:17 -04:00
parent 1c86b877ca
commit d54f65af8b
5 changed files with 217 additions and 143 deletions

View file

@ -3,9 +3,10 @@ mod surface;
mod types;
mod video;
use retro_thread::{spawn_retro_thread, App, RetroEvent};
use video::ffmpeg;
use retro_thread::{spawn_retro_thread, RetroState, RetroEvent};
use video::encoder_thread::EncodeThreadInput;
use video::h264_encoder::H264Encoder;
use video::{encoder_thread, ffmpeg};
use std::{
sync::{Arc, Mutex},
@ -35,7 +36,7 @@ use axum::{
use futures::{sink::SinkExt, stream::StreamExt};
struct AppState {
encoder_tx: Arc<TokioMutex<mpsc::Sender<()>>>,
encoder_tx: Arc<TokioMutex<mpsc::Sender<EncodeThreadInput>>>,
inputs: Arc<TokioMutex<Vec<u32>>>,
websocket_broadcast_tx: broadcast::Sender<ws::Message>,
@ -43,7 +44,7 @@ struct AppState {
}
impl AppState {
fn new(encoder_tx: mpsc::Sender<()>) -> Self {
fn new(encoder_tx: mpsc::Sender<EncodeThreadInput>) -> Self {
let (chat_tx, _chat_rx) = broadcast::channel(10);
Self {
encoder_tx: Arc::new(TokioMutex::new(encoder_tx)),
@ -54,79 +55,6 @@ impl AppState {
}
}
struct EncoderState {
encoder: Option<H264Encoder>,
frame: Option<ffmpeg::frame::Video>,
packet: ffmpeg::Packet,
}
impl EncoderState {
fn new() -> Self {
Self {
encoder: None,
frame: None,
packet: ffmpeg::Packet::empty(),
}
}
fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> {
self.encoder = Some(H264Encoder::new_nvenc_swframe(
size.clone(),
60,
3 * (1024 * 1024),
)?);
// FIXME: use create_frame() on H264Encoder
self.frame = Some(ffmpeg::frame::Video::new(
ffmpeg::format::Pixel::RGBA,
size.width,
size.height,
));
// replace packet
self.packet = ffmpeg::Packet::empty();
Ok(())
}
fn frame(&mut self) -> &mut ffmpeg::frame::Video {
self.frame.as_mut().unwrap()
}
fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option<ffmpeg::Packet> {
let frame = self.frame.as_mut().unwrap();
let encoder = self.encoder.as_mut().unwrap();
// set frame metadata
unsafe {
if force_keyframe {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I;
(*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY;
(*frame.as_mut_ptr()).key_frame = 1;
} else {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE;
(*frame.as_mut_ptr()).flags = 0i32;
(*frame.as_mut_ptr()).key_frame = 0;
}
(*frame.as_mut_ptr()).pts = pts as i64;
}
encoder.send_frame(&*frame);
encoder
.receive_packet(&mut self.packet)
.expect("Failed to recieve packet");
unsafe {
if !self.packet.is_empty() {
return Some(self.packet.clone());
}
}
return None;
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() -> anyhow::Result<()> {
// Setup a tracing subscriber
@ -139,46 +67,32 @@ async fn main() -> anyhow::Result<()> {
let surface = Arc::new(Mutex::new(surface::Surface::new()));
// H.264 encoder related
let encoder_state = Arc::new(TokioMutex::new(EncoderState::new()));
let (encoder_tx, mut encoder_rx) = mpsc::channel(8);
let frame: Arc<Mutex<Option<ffmpeg::frame::Video>>> = Arc::new(Mutex::new(None));
let (mut encoder_rx, encoder_tx) = encoder_thread::encoder_thread_spawn(&frame);
let state = Arc::new(AppState::new(encoder_tx));
let (mut event_rx, event_in_tx) = spawn_retro_thread(surface.clone());
let state_clone = state.clone();
let encoder_state_clone = encoder_state.clone();
let vnc_recv_handle = tokio::spawn(async move {
let surface_clone = surface.clone();
// first frame is always a key frame
let mut pts = 0u64;
let mut force_keyframe = true;
let mut frame_update = false;
let frame_clone = frame.clone();
// start the thread now that we're alive
let _ = event_in_tx.send(retro_thread::RetroInEvent::Start).await;
loop {
match encoder_rx.try_recv() {
Ok(()) => {
// force keyframe
force_keyframe = true;
frame_update = true;
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
}
match event_rx.try_recv() {
Ok(msg) => match msg {
RetroEvent::Frame => {
{
let mut state_locked = encoder_state_clone.lock().await;
let mut frame_locked = frame.lock().expect(
"Couldn't lock frame on our end. Did the encoder thread panic?",
);
let mut_frame = state_locked.frame();
let mut_frame = frame_locked.as_mut().expect("it's None? why?");
let width = mut_frame.width();
let height = mut_frame.height();
@ -207,19 +121,32 @@ async fn main() -> anyhow::Result<()> {
}
}
frame_update = true;
state_clone
.encoder_tx
.lock()
.await
.send(encoder_thread::EncodeThreadInput::SendFrame)
.await;
}
RetroEvent::Resize { size } => {
// make a new frame for the encoder
{
let mut state_locked = encoder_state_clone.lock().await;
state_locked.init(size).expect("fuck you");
let mut lk_frame = frame_clone.lock().expect("Couldn't lock frame");
// reset our internal state
pts = 0;
force_keyframe = true;
frame_update = false;
*lk_frame = Some(ffmpeg::frame::Video::new(
ffmpeg::format::Pixel::BGRA,
size.clone().width,
size.clone().height,
));
}
state_clone
.encoder_tx
.lock()
.await
.send(encoder_thread::EncodeThreadInput::Init { size: size.clone() })
.await;
}
RetroEvent::WantInputs { tx } => {
@ -233,32 +160,20 @@ async fn main() -> anyhow::Result<()> {
Err(TryRecvError::Empty) => {}
}
// send frame if we should.
if frame_update {
let mut state_locked = encoder_state_clone.lock().await;
match state_locked.send_frame(pts, force_keyframe) {
Some(mut packet) => {
match encoder_rx.try_recv() {
Ok(msg) => match msg {
encoder_thread::EncodeThreadOutput::Frame { mut packet } => {
let vec = {
let data = packet.data_mut().expect("packet is empty somehow");
data.to_vec()
};
let _ = state_clone
.websocket_broadcast_tx
.send(ws::Message::Binary(vec));
pts += 1;
if force_keyframe {
force_keyframe = false;
}
}
None => {}
}
frame_update = false;
},
Err(TryRecvError::Empty) => {}
_ => break,
}
tokio::time::sleep(Duration::from_millis(1)).await;
@ -325,7 +240,8 @@ async fn handle_socket(socket: WebSocket, who: SocketAddr, state: Arc<AppState>)
let locked = state.encoder_tx.lock().await;
// Force a ws connection to mean a keyframe
let _ = locked.send(()).await;
let _ = locked.send(EncodeThreadInput::ForceKeyframe).await;
let _ = locked.send(EncodeThreadInput::SendFrame).await;
}
// random username

View file

@ -46,7 +46,7 @@ extern "system" fn opengl_message_callback(
}
}
pub struct App {
pub struct RetroState {
frontend: Option<Box<Frontend>>,
pad: RetroPad,
@ -54,9 +54,10 @@ pub struct App {
// EGL state
egl_context: Option<DeviceContext>,
/// Locked framebuffer.
framebuffer: Arc<Mutex<Surface>>,
// OpenGL object IDs
/// OpenGL FBO
gl_framebuffer: gpu::GlFramebuffer,
/// Cached readback buffer.
@ -65,7 +66,7 @@ pub struct App {
event_tx: mpsc::Sender<RetroEvent>,
}
impl App {
impl RetroState {
pub fn new(framebuffer: Arc<Mutex<Surface>>, event_tx: mpsc::Sender<RetroEvent>) -> Box<Self> {
let mut boxed = Box::new(Self {
frontend: None,
@ -150,7 +151,6 @@ impl App {
let step_duration = Duration::from_millis(step_ms as u64);
self.get_frontend().run_frame();
let _ = self.event_tx.blocking_send(RetroEvent::Frame);
std::thread::sleep(step_duration);
}
@ -188,15 +188,6 @@ impl App {
dest_slice.copy_from_slice(scanlines[y as usize]);
// swap the scanline pixels to BGRA order to make minifb happy
// not the fastest code but this should do for an example
//for pix in dest_slice {
// let a = (*pix & 0xff000000) >> 24;
// let b = (*pix & 0x00ff0000) >> 16;
// let g = (*pix & 0x0000ff00) >> 8;
// let r = *pix & 0x000000ff;
// *pix = a << 24 | r << 16 | g << 8 | b;
//}
}
} else {
for y in 0..size.height {
@ -219,7 +210,7 @@ impl App {
}
}
impl FrontendInterface for App {
impl FrontendInterface for RetroState {
fn video_resize(&mut self, width: u32, height: u32) {
tracing::info!("Resized to {width}x{height}");
@ -246,6 +237,7 @@ impl FrontendInterface for App {
fn video_update(&mut self, slice: &[u32], pitch: u32) {
Self::update_impl(self.framebuffer.clone(), slice, pitch, false);
let _ = self.event_tx.blocking_send(RetroEvent::Frame);
}
fn video_update_gl(&mut self) {
@ -276,6 +268,7 @@ impl FrontendInterface for App {
};
Self::update_impl(self.framebuffer.clone(), slice, dimensions.0, true);
let _ = self.event_tx.blocking_send(RetroEvent::Frame);
}
fn audio_sample(&mut self, _slice: &[i16], _size: usize) {}
@ -404,7 +397,7 @@ impl FrontendInterface for App {
}
}
impl Drop for App {
impl Drop for RetroState {
fn drop(&mut self) {
// Terminate EGL and GL resources if need be
self.hw_gl_destroy();
@ -426,7 +419,7 @@ fn retro_thread_main(
event_tx: mpsc::Sender<RetroEvent>,
mut event_rx: mpsc::Receiver<RetroInEvent>,
) {
let mut app = App::new(surface, event_tx);
let mut app = RetroState::new(surface, event_tx);
app.load_core("cores/swanstation_libretro.so")
.expect("failed to load core");

View file

@ -0,0 +1,162 @@
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::mpsc::{self, error::TryRecvError};
use super::ffmpeg;
use super::h264_encoder::H264Encoder;
use super::hwframe::HwFrameContext;
pub enum EncodeThreadInput {
Init { size: crate::types::Size },
ForceKeyframe,
SendFrame,
}
#[derive(Clone)]
pub enum EncodeThreadOutput {
Frame { packet: ffmpeg::Packet },
}
struct EncoderState {
encoder: Option<H264Encoder>,
frame: Arc<Mutex<Option<ffmpeg::frame::Video>>>,
packet: ffmpeg::Packet,
}
impl EncoderState {
fn new(frame: Arc<Mutex<Option<ffmpeg::frame::Video>>>) -> Self {
Self {
encoder: None,
frame: frame,
packet: ffmpeg::Packet::empty(),
}
}
fn init(&mut self, size: crate::types::Size) -> anyhow::Result<()> {
self.encoder = Some(H264Encoder::new_nvenc_swframe(
size.clone(),
60,
3 * (1024 * 1024),
)?);
// replace packet
self.packet = ffmpeg::Packet::empty();
Ok(())
}
fn frame(&mut self) -> Arc<Mutex<Option<ffmpeg::frame::Video>>> {
self.frame.clone()
}
fn send_frame(&mut self, pts: u64, force_keyframe: bool) -> Option<ffmpeg::Packet> {
let mut lk = self.frame.lock().expect("fuck");
let frame = lk.as_mut().unwrap();
let encoder = self.encoder.as_mut().unwrap();
// set frame metadata
unsafe {
if force_keyframe {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_I;
(*frame.as_mut_ptr()).flags = ffmpeg::sys::AV_FRAME_FLAG_KEY;
(*frame.as_mut_ptr()).key_frame = 1;
} else {
(*frame.as_mut_ptr()).pict_type = ffmpeg::sys::AVPictureType::AV_PICTURE_TYPE_NONE;
(*frame.as_mut_ptr()).flags = 0i32;
(*frame.as_mut_ptr()).key_frame = 0;
}
(*frame.as_mut_ptr()).pts = pts as i64;
}
encoder.send_frame(&*frame);
encoder
.receive_packet(&mut self.packet)
.expect("Failed to recieve packet");
unsafe {
if !self.packet.is_empty() {
return Some(self.packet.clone());
}
}
return None;
}
}
fn encoder_thread_main(
mut rx: mpsc::Receiver<EncodeThreadInput>,
tx: mpsc::Sender<EncodeThreadOutput>,
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> anyhow::Result<()> {
// FIXME: for HW frame support
//let dev = cudarc::driver::CudaDevice::new(0)?;
let mut frame_number = 0u64;
let mut force_keyframe = false;
let mut encoder = EncoderState::new(frame.clone());
loop {
match rx.try_recv() {
Ok(msg) => match msg {
EncodeThreadInput::Init { size } => {
frame_number = 0;
if force_keyframe {
force_keyframe = false;
}
encoder.init(size).expect("encoder init failed");
}
EncodeThreadInput::ForceKeyframe => {
force_keyframe = true;
}
EncodeThreadInput::SendFrame => {
if let Some(pkt) = encoder.send_frame(frame_number as u64, force_keyframe) {
// A bit less clear than ::empty(), but it's "Safe"
if let Some(_) = pkt.data() {
let _ = tx.blocking_send(EncodeThreadOutput::Frame {
packet: pkt.clone(),
});
}
frame_number += 1;
}
if force_keyframe {
force_keyframe = false;
}
}
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(1));
}
}
}
Ok(())
}
pub fn encoder_thread_spawn(
frame: &Arc<Mutex<Option<ffmpeg::frame::Video>>>,
) -> (
mpsc::Receiver<EncodeThreadOutput>,
mpsc::Sender<EncodeThreadInput>,
) {
let (in_tx, in_rx) = mpsc::channel(1);
let (out_tx, out_rx) = mpsc::channel(1);
let clone = Arc::clone(frame);
std::thread::spawn(move || encoder_thread_main(in_rx, out_tx, &clone));
(out_rx, in_tx)
}

View file

@ -47,7 +47,8 @@ fn create_context_and_set_common_parameters(
video_encoder_context.set_format(ffmpeg::format::Pixel::YUV420P);
// The GOP here is setup to balance keyframe retransmission with bandwidth.
video_encoder_context.set_gop((max_framerate * 4) as u32);
//video_encoder_context.set_gop((max_framerate * 4) as u32);
video_encoder_context.set_gop(i32::MAX as u32);
video_encoder_context.set_max_b_frames(0);
unsafe {
@ -134,7 +135,7 @@ impl H264Encoder {
video_encoder_context.set_format(ffmpeg::format::Pixel::ZRGB32);
video_encoder_context.set_qmin(35);
video_encoder_context.set_qmin(40);
video_encoder_context.set_qmax(38);
// set h264_nvenc options
@ -147,7 +148,7 @@ impl H264Encoder {
// TODO:
dict.set("rc", "vbr");
//dict.set("qp", "45");
dict.set("qp", "45");
dict.set("forced-idr", "1");

View file

@ -7,6 +7,8 @@ pub use ffmpeg as ffmpeg;
pub mod hwdevice;
pub mod hwframe;
pub mod encoder_thread;
// from hgaiser/moonshine
pub fn check_ret(error_code: i32) -> Result<(), ffmpeg::Error> {
if error_code != 0 {