use super::{Open, Sink, SinkAsBytes}; use crate::audio::AudioPacket; use crate::config::AudioFormat; use crate::player::{NUM_CHANNELS, SAMPLE_RATE}; use gst::prelude::*; use gst::*; use std::sync::mpsc::{sync_channel, SyncSender}; use std::{io, thread}; use zerocopy::AsBytes; #[allow(dead_code)] pub struct GstreamerSink { tx: SyncSender>, pipeline: gst::Pipeline, format: AudioFormat, } impl Open for GstreamerSink { fn open(device: Option, format: AudioFormat) -> GstreamerSink { info!("Using GStreamer sink with format: {:?}", format); gst::init().expect("failed to init GStreamer!"); // GStreamer calls S24 and S24_3 different from the rest of the world let gst_format = match format { AudioFormat::S24 => "S24_32".to_string(), AudioFormat::S24_3 => "S24".to_string(), _ => format!("{:?}", format), }; let pipeline_str_preamble = format!( "appsrc caps=\"audio/x-raw,format={}LE,layout=interleaved,channels={},rate={}\" block=true max-bytes=4096 name=appsrc0 ", gst_format, NUM_CHANNELS, SAMPLE_RATE ); let pipeline_str_rest = r#" ! audioconvert ! autoaudiosink"#; let pipeline_str: String = match device { Some(x) => format!("{}{}", pipeline_str_preamble, x), None => format!("{}{}", pipeline_str_preamble, pipeline_str_rest), }; info!("Pipeline: {}", pipeline_str); gst::init().unwrap(); let pipelinee = gst::parse_launch(&*pipeline_str).expect("Couldn't launch pipeline; likely a GStreamer issue or an error in the pipeline string you specified in the 'device' argument to librespot."); let pipeline = pipelinee .dynamic_cast::() .expect("couldn't cast pipeline element at runtime!"); let bus = pipeline.get_bus().expect("couldn't get bus from pipeline"); let mainloop = glib::MainLoop::new(None, false); let appsrce: gst::Element = pipeline .get_by_name("appsrc0") .expect("couldn't get appsrc from pipeline"); let appsrc: gst_app::AppSrc = appsrce .dynamic_cast::() .expect("couldn't cast AppSrc element at runtime!"); let bufferpool = gst::BufferPool::new(); let appsrc_caps = appsrc.get_caps().expect("couldn't get appsrc caps"); let mut conf = bufferpool.get_config(); conf.set_params(Some(&appsrc_caps), 2048 * format.size() as u32, 0, 0); bufferpool .set_config(conf) .expect("couldn't configure the buffer pool"); bufferpool .set_active(true) .expect("couldn't activate buffer pool"); let (tx, rx) = sync_channel::>(64 * format.size()); thread::spawn(move || { for data in rx { let buffer = bufferpool.acquire_buffer(None); if !buffer.is_err() { let mut okbuffer = buffer.unwrap(); let mutbuf = okbuffer.make_mut(); mutbuf.set_size(data.len()); mutbuf .copy_from_slice(0, data.as_bytes()) .expect("failed to copy from slice"); let _eat = appsrc.push_buffer(okbuffer); } } }); thread::spawn(move || { let thread_mainloop = mainloop; let watch_mainloop = thread_mainloop.clone(); bus.add_watch(move |_, msg| { match msg.view() { MessageView::Eos(..) => watch_mainloop.quit(), MessageView::Error(err) => { println!( "Error from {:?}: {} ({:?})", err.get_src().map(|s| s.get_path_string()), err.get_error(), err.get_debug() ); watch_mainloop.quit(); } _ => (), }; glib::Continue(true) }) .expect("failed to add bus watch"); thread_mainloop.run(); }); pipeline .set_state(gst::State::Playing) .expect("unable to set the pipeline to the `Playing` state"); Self { tx: tx, pipeline: pipeline, format: format, } } } impl Sink for GstreamerSink { start_stop_noop!(); sink_as_bytes!(); } impl SinkAsBytes for GstreamerSink { fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { // Copy expensively (in to_vec()) to avoid thread synchronization self.tx .send(data.to_vec()) .expect("tx send failed in write function"); Ok(()) } }