Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: workspace worker #74

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Client(Arc<ClientInner>);

#[derive(Debug)]
struct ClientInner {
user: User,
user: Arc<User>,
config: crate::api::Config,
workspaces: DashMap<String, Workspace>,
auth: AuthClient<Channel>,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl Client {
SessionClient::with_interceptor(channel, network::SessionInterceptor(claims.channel()));

Ok(Client(Arc::new(ClientInner {
user: resp.user.into(),
user: Arc::new(resp.user.into()),
workspaces: DashMap::default(),
claims,
auth,
Expand Down
118 changes: 66 additions & 52 deletions src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
buffer, cursor,
errors::{ConnectionResult, ControllerResult, RemoteResult},
ext::InternallyMutable,
ext::{IgnorableError, InternallyMutable},
network::Services,
};

Expand All @@ -26,8 +26,8 @@ use codemp_proto::{
};

use dashmap::{DashMap, DashSet};
use std::sync::Arc;
use tokio::sync::{mpsc, mpsc::error::TryRecvError};
use std::sync::{Arc, Weak};
use tokio::sync::{mpsc::{self, error::TryRecvError}, oneshot, watch};
use tonic::Streaming;
use uuid::Uuid;

Expand All @@ -48,17 +48,15 @@ pub struct Workspace(Arc<WorkspaceInner>);
#[derive(Debug)]
struct WorkspaceInner {
name: String,
user: User, // TODO back-reference to global user id... needed for buffer controllers
current_user: Arc<User>,
cursor: cursor::Controller,
buffers: DashMap<String, buffer::Controller>,
services: Services,
// TODO these two are Arced so that the inner worker can hold them without holding the
// WorkspaceInner itself, otherwise its impossible to drop Workspace
filetree: DashSet<String>,
users: Arc<DashMap<Uuid, User>>,
// TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
callback: std::sync::Mutex<Option<ControllerCallback<Workspace>>>, // TODO lmao another one
callback: watch::Sender<Option<ControllerCallback<Workspace>>>,
poll_tx: mpsc::UnboundedSender<oneshot::Sender<()>>,
}

impl AsyncReceiver<Event> for Workspace {
Expand All @@ -71,32 +69,27 @@ impl AsyncReceiver<Event> for Workspace {
}

async fn poll(&self) -> ControllerResult<()> {
loop {
if !self.0.events.lock().await.is_empty() {
break Ok(());
}
// TODO disgusting, please send help
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
let (tx, rx) = oneshot::channel();
self.0.poll_tx.send(tx)?;
Ok(rx.await?)
}

// TODO please send HELP ASAP this is hurting me emotionally
fn clear_callback(&self) {
*self.0.callback.lock().expect("mutex poisoned") = None;
self.0.callback.send_replace(None);
}

fn callback(&self, cb: impl Into<ControllerCallback<Self>>) {
*self.0.callback.lock().expect("mutex poisoned") = Some(cb.into());
self.0.callback.send_replace(Some(cb.into()));
}
}

impl Workspace {
pub(crate) async fn connect(
name: String,
user: User,
user: Arc<User>,
config: crate::api::Config,
token: Token,
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>, // TODO ughh receiving this
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
) -> ConnectionResult<Self> {
let workspace_claim = InternallyMutable::new(token);
let services =
Expand All @@ -105,31 +98,45 @@ impl Workspace {

let (tx, rx) = mpsc::channel(128);
let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
let (cb_tx, cb_rx) = watch::channel(None);
let cur_stream = services
.cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?
.into_inner();

let users = Arc::new(DashMap::default());

let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream, &name);

let ws = Self(Arc::new(WorkspaceInner {
name,
user,
name: name.clone(),
current_user: user,
cursor: controller,
buffers: DashMap::default(),
filetree: DashSet::default(),
users,
events: tokio::sync::Mutex::new(ev_rx),
services,
callback: std::sync::Mutex::new(None),
callback: cb_tx,
poll_tx,
}));

let weak = Arc::downgrade(&ws.0);

let worker = WorkspaceWorker {
callback: cb_rx,
pollers: Vec::new(),
poll_rx,
events: ev_tx,
};

let _t = tokio::spawn(async move {
worker.work(name, ws_stream, weak).await;
});

ws.fetch_users().await?;
ws.fetch_buffers().await?;
ws.run_actor(ws_stream, ev_tx);

Ok(ws)
}
Expand Down Expand Up @@ -175,7 +182,7 @@ impl Workspace {
);
let stream = self.0.services.buf().attach(req).await?.into_inner();

let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream, &self.0.name);
let controller = buffer::Controller::spawn(self.0.current_user.id, path, tx, stream, &self.0.name);
self.0.buffers.insert(path.to_string(), controller.clone());

Ok(controller)
Expand Down Expand Up @@ -325,29 +332,26 @@ impl Workspace {
tree.sort();
tree
}
}

pub(crate) fn run_actor(
&self,
mut stream: Streaming<WorkspaceEvent>,
tx: mpsc::UnboundedSender<crate::api::Event>,
) {
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
let weak = Arc::downgrade(&self.0);
let name = self.id();
tokio::spawn(async move {
tracing::debug!("workspace worker starting");
loop {
// TODO can we stop responsively rather than poll for Arc being dropped?
if weak.upgrade().is_none() {
break;
};
let Some(res) = tokio::select!(
x = stream.message() => Some(x),
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
) else {
continue;
};
match res {
struct WorkspaceWorker {
callback: watch::Receiver<Option<ControllerCallback<Workspace>>>,
pollers: Vec<oneshot::Sender<()>>,
poll_rx: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
events: mpsc::UnboundedSender<crate::api::Event>,
}

impl WorkspaceWorker {
pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
tracing::debug!("workspace worker starting");
loop {
tokio::select! {
res = self.poll_rx.recv() => match res {
None => break tracing::debug!("pollers channel closed: workspace has been dropped"),
Some(x) => self.pollers.push(x),
},

res = stream.message() => match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),
Ok(Some(WorkspaceEvent { event: None })) => {
Expand Down Expand Up @@ -377,13 +381,23 @@ impl Workspace {
let _ = inner.buffers.remove(&path);
}
}
if tx.send(update).is_err() {
if self.events.send(update).is_err() {
tracing::warn!("no active controller to receive workspace event");
}
self.pollers.drain(..).for_each(|x| {
x.send(()).unwrap_or_warn("poller dropped before completion");
});
if let Some(cb) = self.callback.borrow().as_ref() {
if let Some(ws) = weak.upgrade() {
cb.call(Workspace(ws));
} else {
break tracing::debug!("workspace worker clean exit");
}
}
}
}
},
}
tracing::debug!("workspace worker stopping");
});
}
tracing::debug!("workspace worker stopping");
}
}