diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9b96ce1..1dcb0b4 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -14,10 +14,26 @@ use tonic::{Request, Response, Result, Status}; type Responder = oneshot::Sender; -#[derive(Debug)] -pub struct GrpcServerMessage { - pub command: Commands, - pub responder: Responder<()>, +pub enum GrpcServerMessage { + Play { + resp: Responder>, + }, + Pause { + resp: Responder>, + }, + PlayPause { + resp: Responder>, + }, + SkipSong { + resp: Responder>, + }, + Set { + resp: Responder>, + }, + GetFiles { + path: PathBuf, + resp: Responder>, + }, } #[derive(Debug, Default)] @@ -32,25 +48,6 @@ impl GRPCServer { } } - async fn send_message(&self, command: Commands) -> Result<(), Box> { - let (resp_tx, resp_rx) = oneshot::channel(); - - let message = GrpcServerMessage { - command, - responder: resp_tx, - }; - - if let Some(tx) = &self.transmitter { - tx.send(message).await?; - - let response = resp_rx.await?; - - return Ok(response); - } - - Ok(()) - } - pub async fn serve( address: SocketAddr, tx: Sender, @@ -86,10 +83,24 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files: Vec = match self.send_message(Commands::GetFiles { path }).await { - Ok(()) => vec![], - Err(_err) => return Err(Status::internal("An internal error has occurred.")), - }; + let mut files: Vec = vec![]; + + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::GetFiles { + resp: resp_tx, + path, + }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + files = match resp_rx.await { + Ok(response) => response, + Err(_err) => return Err(Status::internal("An internal error has occurred.")), + }; + } let reply = GetFilesResponse { files: files.iter().map(|x| x.display().to_string()).collect(), @@ -102,8 +113,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::Play).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Play { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -113,8 +133,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::Pause).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Pause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -124,8 +153,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::PlayPause).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::PlayPause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -135,8 +173,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::SkipSong).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::SkipSong { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) diff --git a/src/main.rs b/src/main.rs index 1fbf9e3..51ab7a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,12 +8,41 @@ use tokio::sync::mpsc; use crate::player::Player; use self::configuration::{Commands, Config, ConfigMode}; +use self::grpc::server::GrpcServerMessage; mod configuration; mod file_explorer; mod grpc; mod player; +async fn handle_message(player: &mut Player, message: GrpcServerMessage) { + match message { + GrpcServerMessage::Play { resp } => { + player.play(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::Pause { resp } => { + player.pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::PlayPause { resp } => { + player.play_pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::SkipSong { resp } => { + let _ = match player.skip_song() { + Ok(_) => resp.send(Ok(())), + Err(err) => resp.send(Err(err.to_string())), + }; + } + GrpcServerMessage::Set { resp } => todo!(), + GrpcServerMessage::GetFiles { path, resp } => { + let files = player.get_files(&path).unwrap(); + let _ = resp.send(files); + } + } +} + async fn init_server(config: Config) -> Result<(), Box> { let (tx, mut rx) = mpsc::channel::(32); @@ -42,15 +71,7 @@ async fn init_server(config: Config) -> Result<(), Box> { loop { tokio::select! { Some(msg) = rx.recv() => { - // TODO: receive message from player and send it back to server so it can be sent to - // the client - if let Err(err) = player.handle_message(msg.command) { - eprintln!("Error handling player action: {}", err); - } - - if let Err(_) = msg.responder.send(()) { - eprintln!("Error responding to grpc server"); - } + handle_message(&mut player, msg).await; } _ = async { loop { diff --git a/src/player.rs b/src/player.rs index 5df0207..86ec0ff 100644 --- a/src/player.rs +++ b/src/player.rs @@ -41,24 +41,6 @@ impl Player { }) } - pub fn handle_message( - &mut self, - message: configuration::Commands, - ) -> Result<(), Box> { - match message { - configuration::Commands::Play => self.play(), - configuration::Commands::Pause => self.pause(), - configuration::Commands::PlayPause => self.play_pause(), - configuration::Commands::SkipSong => self.skip_song()?, - configuration::Commands::Set => todo!(), - _ => { - println!("This command doesn't apply to client mode") - } - }; - - Ok(()) - } - pub fn handle_idle(&mut self) -> Result<(), Box> { if self.sink.is_paused() { return Ok(()); @@ -82,19 +64,44 @@ impl Player { Ok(()) } - fn get_files(path: &PathBuf) -> Result, Box> { - Ok(walk_dir(&path)?) + pub fn get_files(&mut self, path: &PathBuf) -> Result, Box> { + // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // + // match path { + // Some(dir) => { + // } + // None => search_path = base_path.to_owned(), + // } + // + // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside + // // base_dir + // if !search_path.starts_with(base_path) { + // return Err("Tried to access file or directory outside of server `base_path` config."); + // } + + let search_path = self + .base_dir + .join(path) + .canonicalize() + .expect("Couldn't canonicalizice the path"); + + // PathBuf.join() can override the hole path, this ensure we're not accessing files outside base_dir + if !search_path.starts_with(&self.base_dir) { + panic!("Tried to access file or directory outside of server `base_path` config.") + } + + Ok(walk_dir(&search_path)?) } - fn play(&mut self) { + pub fn play(&mut self) { self.sink.play(); } - fn pause(&mut self) { + pub fn pause(&mut self) { self.sink.pause(); } - fn play_pause(&self) { + pub fn play_pause(&self) { if self.sink.is_paused() { self.sink.play(); } else { @@ -102,7 +109,7 @@ impl Player { }; } - fn skip_song(&mut self) -> Result<(), Box> { + pub fn skip_song(&mut self) -> Result<(), Box> { println!("Skipping current song...:"); let file_path = self.queue.pop_front().expect("foo"); self.enqueue_file(file_path)?;