diff --git a/src/blocks/sound/pulseaudio.rs b/src/blocks/sound/pulseaudio.rs index 45a7a71c22..e8a2cc7ee1 100644 --- a/src/blocks/sound/pulseaudio.rs +++ b/src/blocks/sound/pulseaudio.rs @@ -157,119 +157,95 @@ impl Connection { IterateResult::Success(_) => Ok(()), } } -} - -impl Client { - fn new() -> Result { - let (send_req, recv_req) = unbounded(); - let (send_result, recv_result) = unbounded(); - let new_connection = move || -> Option { - match Connection::new() { + /// Create connection in a new thread. + /// + /// If connection can't be created, Err is returned. + fn spawn(thread_name: &str, f: impl FnOnce(Self) + Send + 'static) -> Result<()> { + let (tx, rx) = std::sync::mpsc::sync_channel(0); + thread::Builder::new() + .name(thread_name.to_owned()) + .spawn(move || match Self::new() { Ok(conn) => { - send_result.send(Ok(())).unwrap(); - Some(conn) + tx.send(Ok(())).unwrap(); + f(conn); } Err(err) => { - send_result.send(Err(err)).unwrap(); - None + tx.send(Err(err)).unwrap(); } - } - }; - let new_connection2 = new_connection.clone(); + }) + .error("failed to spawn a thread")?; + rx.recv().error("channel closed")? + } +} - // requests - thread::Builder::new() - .name("sound_pulseaudio_req".into()) - .spawn(move || { - let Some(mut connection) = new_connection() else { return }; +impl Client { + fn new() -> Result { + let (send_req, recv_req) = unbounded(); + // requests + Connection::spawn("sound_pulseaudio_req", move |mut connection| { + loop { + // make sure mainloop dispatched everything loop { - // make sure mainloop dispatched everything - loop { - connection.iterate(false).unwrap(); - if connection.context.get_state() == PulseState::Ready { - break; - } + connection.iterate(false).unwrap(); + if connection.context.get_state() == PulseState::Ready { + break; } + } + + let Ok(req) = recv_req.recv() else { return }; - match recv_req.recv() { - Err(_) => {} - Ok(req) => { - use ClientRequest::*; - let mut introspector = connection.context.introspect(); - - match req { - GetDefaultDevice => { - introspector.get_server_info(Client::server_info_callback); - } - GetInfoByIndex(DeviceKind::Sink, index) => { - introspector - .get_sink_info_by_index(index, Client::sink_info_callback); - } - GetInfoByIndex(DeviceKind::Source, index) => { - introspector.get_source_info_by_index( - index, - Client::source_info_callback, - ); - } - GetInfoByName(DeviceKind::Sink, name) => { - introspector - .get_sink_info_by_name(&name, Client::sink_info_callback); - } - GetInfoByName(DeviceKind::Source, name) => { - introspector.get_source_info_by_name( - &name, - Client::source_info_callback, - ); - } - SetVolumeByName(DeviceKind::Sink, name, volumes) => { - introspector.set_sink_volume_by_name(&name, &volumes, None); - } - SetVolumeByName(DeviceKind::Source, name, volumes) => { - introspector.set_source_volume_by_name(&name, &volumes, None); - } - SetMuteByName(DeviceKind::Sink, name, mute) => { - introspector.set_sink_mute_by_name(&name, mute, None); - } - SetMuteByName(DeviceKind::Source, name, mute) => { - introspector.set_source_mute_by_name(&name, mute, None); - } - }; - - // send request and receive response - connection.iterate(true).unwrap(); - connection.iterate(true).unwrap(); - } + let mut introspector = connection.context.introspect(); + + use ClientRequest::*; + match req { + GetDefaultDevice => { + introspector.get_server_info(Client::server_info_callback); } - } - }) - .unwrap(); - recv_result - .recv() - .error("Failed to receive from pulseaudio thread channel")??; + GetInfoByIndex(DeviceKind::Sink, index) => { + introspector.get_sink_info_by_index(index, Client::sink_info_callback); + } + GetInfoByIndex(DeviceKind::Source, index) => { + introspector.get_source_info_by_index(index, Client::source_info_callback); + } + GetInfoByName(DeviceKind::Sink, name) => { + introspector.get_sink_info_by_name(&name, Client::sink_info_callback); + } + GetInfoByName(DeviceKind::Source, name) => { + introspector.get_source_info_by_name(&name, Client::source_info_callback); + } + SetVolumeByName(DeviceKind::Sink, name, volumes) => { + introspector.set_sink_volume_by_name(&name, &volumes, None); + } + SetVolumeByName(DeviceKind::Source, name, volumes) => { + introspector.set_source_volume_by_name(&name, &volumes, None); + } + SetMuteByName(DeviceKind::Sink, name, mute) => { + introspector.set_sink_mute_by_name(&name, mute, None); + } + SetMuteByName(DeviceKind::Source, name, mute) => { + introspector.set_source_mute_by_name(&name, mute, None); + } + }; + + // send request and receive response + connection.iterate(true).unwrap(); + connection.iterate(true).unwrap(); + } + })?; // subscribe - thread::Builder::new() - .name("sound_pulseaudio_sub".into()) - .spawn(move || { - let Some(mut connection) = new_connection2() else { return }; - - // subscribe for events - connection - .context - .set_subscribe_callback(Some(Box::new(Client::subscribe_callback))); - connection.context.subscribe( - InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE, - |_| {}, - ); - - connection.mainloop.run().unwrap(); - }) - .unwrap(); - recv_result - .recv() - .error("Failed to receive from pulseaudio thread channel")??; + Connection::spawn("sound_pulseaudio_sub", |mut connection| { + connection + .context + .set_subscribe_callback(Some(Box::new(Client::subscribe_callback))); + connection.context.subscribe( + InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE, + |_| (), + ); + connection.mainloop.run().unwrap(); + })?; Ok(Client { sender: send_req }) } @@ -332,21 +308,13 @@ impl Client { _operation: Option, index: u32, ) { - match facility { - None => {} - Some(facility) => match facility { - Facility::Server => { - Client::send(ClientRequest::GetDefaultDevice).ok(); - } - Facility::Sink => { - Client::send(ClientRequest::GetInfoByIndex(DeviceKind::Sink, index)).ok(); - } - Facility::Source => { - Client::send(ClientRequest::GetInfoByIndex(DeviceKind::Source, index)).ok(); - } - _ => {} - }, - } + let request = match facility { + Some(Facility::Server) => ClientRequest::GetDefaultDevice, + Some(Facility::Sink) => ClientRequest::GetInfoByIndex(DeviceKind::Sink, index), + Some(Facility::Source) => ClientRequest::GetInfoByIndex(DeviceKind::Source, index), + _ => return, + }; + let _ = Client::send(request); } fn send_update_event() {