Skip to content

Commit

Permalink
sound: some refactoring of pa driver
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxVerevkin committed Apr 29, 2023
1 parent 3d243ba commit c3a34ae
Showing 1 changed file with 83 additions and 115 deletions.
198 changes: 83 additions & 115 deletions src/blocks/sound/pulseaudio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,119 +157,95 @@ impl Connection {
IterateResult::Success(_) => Ok(()),
}
}
}

impl Client {
fn new() -> Result<Client> {
let (send_req, recv_req) = unbounded();
let (send_result, recv_result) = unbounded();

let new_connection = move || -> Option<Connection> {
match Connection::new() {
/// Create connection in a new thread.
///
/// If connection can't be created, Err is returned.
fn spawn(thread_name: &str, f: impl FnOnce(Self) + Send + 'static) -> Result<()> {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
thread::Builder::new()
.name(thread_name.to_owned())
.spawn(move || match Self::new() {
Ok(conn) => {
send_result.send(Ok(())).unwrap();
Some(conn)
tx.send(Ok(())).unwrap();
f(conn);
}
Err(err) => {
send_result.send(Err(err)).unwrap();
None
tx.send(Err(err)).unwrap();
}
}
};
let new_connection2 = new_connection.clone();
})
.error("failed to spawn a thread")?;
rx.recv().error("channel closed")?
}
}

// requests
thread::Builder::new()
.name("sound_pulseaudio_req".into())
.spawn(move || {
let Some(mut connection) = new_connection() else { return };
impl Client {
fn new() -> Result<Client> {
let (send_req, recv_req) = unbounded();

// requests
Connection::spawn("sound_pulseaudio_req", move |mut connection| {
loop {
// make sure mainloop dispatched everything
loop {
// make sure mainloop dispatched everything
loop {
connection.iterate(false).unwrap();
if connection.context.get_state() == PulseState::Ready {
break;
}
connection.iterate(false).unwrap();
if connection.context.get_state() == PulseState::Ready {
break;
}
}

let Ok(req) = recv_req.recv() else { return };

match recv_req.recv() {
Err(_) => {}
Ok(req) => {
use ClientRequest::*;
let mut introspector = connection.context.introspect();

match req {
GetDefaultDevice => {
introspector.get_server_info(Client::server_info_callback);
}
GetInfoByIndex(DeviceKind::Sink, index) => {
introspector
.get_sink_info_by_index(index, Client::sink_info_callback);
}
GetInfoByIndex(DeviceKind::Source, index) => {
introspector.get_source_info_by_index(
index,
Client::source_info_callback,
);
}
GetInfoByName(DeviceKind::Sink, name) => {
introspector
.get_sink_info_by_name(&name, Client::sink_info_callback);
}
GetInfoByName(DeviceKind::Source, name) => {
introspector.get_source_info_by_name(
&name,
Client::source_info_callback,
);
}
SetVolumeByName(DeviceKind::Sink, name, volumes) => {
introspector.set_sink_volume_by_name(&name, &volumes, None);
}
SetVolumeByName(DeviceKind::Source, name, volumes) => {
introspector.set_source_volume_by_name(&name, &volumes, None);
}
SetMuteByName(DeviceKind::Sink, name, mute) => {
introspector.set_sink_mute_by_name(&name, mute, None);
}
SetMuteByName(DeviceKind::Source, name, mute) => {
introspector.set_source_mute_by_name(&name, mute, None);
}
};

// send request and receive response
connection.iterate(true).unwrap();
connection.iterate(true).unwrap();
}
let mut introspector = connection.context.introspect();

use ClientRequest::*;
match req {
GetDefaultDevice => {
introspector.get_server_info(Client::server_info_callback);
}
}
})
.unwrap();
recv_result
.recv()
.error("Failed to receive from pulseaudio thread channel")??;
GetInfoByIndex(DeviceKind::Sink, index) => {
introspector.get_sink_info_by_index(index, Client::sink_info_callback);
}
GetInfoByIndex(DeviceKind::Source, index) => {
introspector.get_source_info_by_index(index, Client::source_info_callback);
}
GetInfoByName(DeviceKind::Sink, name) => {
introspector.get_sink_info_by_name(&name, Client::sink_info_callback);
}
GetInfoByName(DeviceKind::Source, name) => {
introspector.get_source_info_by_name(&name, Client::source_info_callback);
}
SetVolumeByName(DeviceKind::Sink, name, volumes) => {
introspector.set_sink_volume_by_name(&name, &volumes, None);
}
SetVolumeByName(DeviceKind::Source, name, volumes) => {
introspector.set_source_volume_by_name(&name, &volumes, None);
}
SetMuteByName(DeviceKind::Sink, name, mute) => {
introspector.set_sink_mute_by_name(&name, mute, None);
}
SetMuteByName(DeviceKind::Source, name, mute) => {
introspector.set_source_mute_by_name(&name, mute, None);
}
};

// send request and receive response
connection.iterate(true).unwrap();
connection.iterate(true).unwrap();
}
})?;

// subscribe
thread::Builder::new()
.name("sound_pulseaudio_sub".into())
.spawn(move || {
let Some(mut connection) = new_connection2() else { return };

// subscribe for events
connection
.context
.set_subscribe_callback(Some(Box::new(Client::subscribe_callback)));
connection.context.subscribe(
InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE,
|_| {},
);

connection.mainloop.run().unwrap();
})
.unwrap();
recv_result
.recv()
.error("Failed to receive from pulseaudio thread channel")??;
Connection::spawn("sound_pulseaudio_sub", |mut connection| {
connection
.context
.set_subscribe_callback(Some(Box::new(Client::subscribe_callback)));
connection.context.subscribe(
InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE,
|_| (),
);
connection.mainloop.run().unwrap();
})?;

Ok(Client { sender: send_req })
}
Expand Down Expand Up @@ -332,21 +308,13 @@ impl Client {
_operation: Option<SubscribeOperation>,
index: u32,
) {
match facility {
None => {}
Some(facility) => match facility {
Facility::Server => {
Client::send(ClientRequest::GetDefaultDevice).ok();
}
Facility::Sink => {
Client::send(ClientRequest::GetInfoByIndex(DeviceKind::Sink, index)).ok();
}
Facility::Source => {
Client::send(ClientRequest::GetInfoByIndex(DeviceKind::Source, index)).ok();
}
_ => {}
},
}
let request = match facility {
Some(Facility::Server) => ClientRequest::GetDefaultDevice,
Some(Facility::Sink) => ClientRequest::GetInfoByIndex(DeviceKind::Sink, index),
Some(Facility::Source) => ClientRequest::GetInfoByIndex(DeviceKind::Source, index),
_ => return,
};
let _ = Client::send(request);
}

fn send_update_event() {
Expand Down

0 comments on commit c3a34ae

Please sign in to comment.