refactor(server): allow to send message between player and server

This commit is contained in:
Alexander Navarro 2024-07-25 16:10:49 -04:00
parent 940093d599
commit 0df5ed34e5
3 changed files with 143 additions and 68 deletions

View file

@ -14,10 +14,26 @@ use tonic::{Request, Response, Result, Status};
type Responder<T> = oneshot::Sender<T>; type Responder<T> = oneshot::Sender<T>;
#[derive(Debug)] pub enum GrpcServerMessage {
pub struct GrpcServerMessage { Play {
pub command: Commands, resp: Responder<Result<()>>,
pub responder: Responder<()>, },
Pause {
resp: Responder<Result<()>>,
},
PlayPause {
resp: Responder<Result<()>>,
},
SkipSong {
resp: Responder<Result<(), String>>,
},
Set {
resp: Responder<Result<()>>,
},
GetFiles {
path: PathBuf,
resp: Responder<Vec<PathBuf>>,
},
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -32,25 +48,6 @@ impl GRPCServer {
} }
} }
async fn send_message(&self, command: Commands) -> Result<(), Box<dyn Error>> {
let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage {
command,
responder: resp_tx,
};
if let Some(tx) = &self.transmitter {
tx.send(message).await?;
let response = resp_rx.await?;
return Ok(response);
}
Ok(())
}
pub async fn serve( pub async fn serve(
address: SocketAddr, address: SocketAddr,
tx: Sender<GrpcServerMessage>, tx: Sender<GrpcServerMessage>,
@ -86,10 +83,24 @@ impl JunoServices for GRPCServer {
let path = PathBuf::from_str(request.into_inner().path.as_str()) let path = PathBuf::from_str(request.into_inner().path.as_str())
.expect("Failed to create pathbuf"); .expect("Failed to create pathbuf");
let files: Vec<PathBuf> = match self.send_message(Commands::GetFiles { path }).await { let mut files: Vec<PathBuf> = vec![];
Ok(()) => vec![],
Err(_err) => return Err(Status::internal("An internal error has occurred.")), if let Some(tx) = &self.transmitter {
}; let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage::GetFiles {
resp: resp_tx,
path,
};
if let Err(_err) = tx.send(message).await {
return Err(Status::internal("An internal error has occurred."));
}
files = match resp_rx.await {
Ok(response) => response,
Err(_err) => return Err(Status::internal("An internal error has occurred.")),
};
}
let reply = GetFilesResponse { let reply = GetFilesResponse {
files: files.iter().map(|x| x.display().to_string()).collect(), files: files.iter().map(|x| x.display().to_string()).collect(),
@ -102,8 +113,17 @@ impl JunoServices for GRPCServer {
&self, &self,
_request: Request<EmptyRequest>, _request: Request<EmptyRequest>,
) -> Result<Response<EmptyResponse>, Status> { ) -> Result<Response<EmptyResponse>, Status> {
if let Err(_err) = self.send_message(Commands::Play).await { if let Some(tx) = &self.transmitter {
return Err(Status::internal("An internal error has occurred.")); let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage::Play { resp: resp_tx };
if let Err(_err) = tx.send(message).await {
return Err(Status::internal("An internal error has occurred."));
}
if let Err(_err) = resp_rx.await {
return Err(Status::internal("An internal error has occurred."));
}
} }
Ok(Response::new(EmptyResponse {})) Ok(Response::new(EmptyResponse {}))
@ -113,8 +133,17 @@ impl JunoServices for GRPCServer {
&self, &self,
_request: Request<EmptyRequest>, _request: Request<EmptyRequest>,
) -> Result<Response<EmptyResponse>, Status> { ) -> Result<Response<EmptyResponse>, Status> {
if let Err(_err) = self.send_message(Commands::Pause).await { if let Some(tx) = &self.transmitter {
return Err(Status::internal("An internal error has occurred.")); let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage::Pause { resp: resp_tx };
if let Err(_err) = tx.send(message).await {
return Err(Status::internal("An internal error has occurred."));
}
if let Err(_err) = resp_rx.await {
return Err(Status::internal("An internal error has occurred."));
}
} }
Ok(Response::new(EmptyResponse {})) Ok(Response::new(EmptyResponse {}))
@ -124,8 +153,17 @@ impl JunoServices for GRPCServer {
&self, &self,
_request: Request<EmptyRequest>, _request: Request<EmptyRequest>,
) -> Result<Response<EmptyResponse>, Status> { ) -> Result<Response<EmptyResponse>, Status> {
if let Err(_err) = self.send_message(Commands::PlayPause).await { if let Some(tx) = &self.transmitter {
return Err(Status::internal("An internal error has occurred.")); let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage::PlayPause { resp: resp_tx };
if let Err(_err) = tx.send(message).await {
return Err(Status::internal("An internal error has occurred."));
}
if let Err(_err) = resp_rx.await {
return Err(Status::internal("An internal error has occurred."));
}
} }
Ok(Response::new(EmptyResponse {})) Ok(Response::new(EmptyResponse {}))
@ -135,8 +173,17 @@ impl JunoServices for GRPCServer {
&self, &self,
_request: Request<EmptyRequest>, _request: Request<EmptyRequest>,
) -> Result<Response<EmptyResponse>, Status> { ) -> Result<Response<EmptyResponse>, Status> {
if let Err(_err) = self.send_message(Commands::SkipSong).await { if let Some(tx) = &self.transmitter {
return Err(Status::internal("An internal error has occurred.")); let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage::SkipSong { resp: resp_tx };
if let Err(_err) = tx.send(message).await {
return Err(Status::internal("An internal error has occurred."));
}
if let Err(_err) = resp_rx.await {
return Err(Status::internal("An internal error has occurred."));
}
} }
Ok(Response::new(EmptyResponse {})) Ok(Response::new(EmptyResponse {}))

View file

@ -8,12 +8,41 @@ use tokio::sync::mpsc;
use crate::player::Player; use crate::player::Player;
use self::configuration::{Commands, Config, ConfigMode}; use self::configuration::{Commands, Config, ConfigMode};
use self::grpc::server::GrpcServerMessage;
mod configuration; mod configuration;
mod file_explorer; mod file_explorer;
mod grpc; mod grpc;
mod player; mod player;
async fn handle_message(player: &mut Player, message: GrpcServerMessage) {
match message {
GrpcServerMessage::Play { resp } => {
player.play();
let _ = resp.send(Ok(()));
}
GrpcServerMessage::Pause { resp } => {
player.pause();
let _ = resp.send(Ok(()));
}
GrpcServerMessage::PlayPause { resp } => {
player.play_pause();
let _ = resp.send(Ok(()));
}
GrpcServerMessage::SkipSong { resp } => {
let _ = match player.skip_song() {
Ok(_) => resp.send(Ok(())),
Err(err) => resp.send(Err(err.to_string())),
};
}
GrpcServerMessage::Set { resp } => todo!(),
GrpcServerMessage::GetFiles { path, resp } => {
let files = player.get_files(&path).unwrap();
let _ = resp.send(files);
}
}
}
async fn init_server(config: Config) -> Result<(), Box<dyn Error>> { async fn init_server(config: Config) -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::channel::<grpc::server::GrpcServerMessage>(32); let (tx, mut rx) = mpsc::channel::<grpc::server::GrpcServerMessage>(32);
@ -42,15 +71,7 @@ async fn init_server(config: Config) -> Result<(), Box<dyn Error>> {
loop { loop {
tokio::select! { tokio::select! {
Some(msg) = rx.recv() => { Some(msg) = rx.recv() => {
// TODO: receive message from player and send it back to server so it can be sent to handle_message(&mut player, msg).await;
// the client
if let Err(err) = player.handle_message(msg.command) {
eprintln!("Error handling player action: {}", err);
}
if let Err(_) = msg.responder.send(()) {
eprintln!("Error responding to grpc server");
}
} }
_ = async { _ = async {
loop { loop {

View file

@ -41,24 +41,6 @@ impl Player {
}) })
} }
pub fn handle_message(
&mut self,
message: configuration::Commands,
) -> Result<(), Box<dyn Error>> {
match message {
configuration::Commands::Play => self.play(),
configuration::Commands::Pause => self.pause(),
configuration::Commands::PlayPause => self.play_pause(),
configuration::Commands::SkipSong => self.skip_song()?,
configuration::Commands::Set => todo!(),
_ => {
println!("This command doesn't apply to client mode")
}
};
Ok(())
}
pub fn handle_idle(&mut self) -> Result<(), Box<dyn Error>> { pub fn handle_idle(&mut self) -> Result<(), Box<dyn Error>> {
if self.sink.is_paused() { if self.sink.is_paused() {
return Ok(()); return Ok(());
@ -82,19 +64,44 @@ impl Player {
Ok(()) Ok(())
} }
fn get_files(path: &PathBuf) -> Result<Vec<PathBuf>, Box<dyn Error>> { pub fn get_files(&mut self, path: &PathBuf) -> Result<Vec<PathBuf>, Box<dyn Error>> {
Ok(walk_dir(&path)?) // let mut base_path = env::current_dir().expect("Error accesing the enviroment");
//
// match path {
// Some(dir) => {
// }
// None => search_path = base_path.to_owned(),
// }
//
// // PathBuf.join() can override the hole path, this ensure we're not accessing files outside
// // base_dir
// if !search_path.starts_with(base_path) {
// return Err("Tried to access file or directory outside of server `base_path` config.");
// }
let search_path = self
.base_dir
.join(path)
.canonicalize()
.expect("Couldn't canonicalizice the path");
// PathBuf.join() can override the hole path, this ensure we're not accessing files outside base_dir
if !search_path.starts_with(&self.base_dir) {
panic!("Tried to access file or directory outside of server `base_path` config.")
}
Ok(walk_dir(&search_path)?)
} }
fn play(&mut self) { pub fn play(&mut self) {
self.sink.play(); self.sink.play();
} }
fn pause(&mut self) { pub fn pause(&mut self) {
self.sink.pause(); self.sink.pause();
} }
fn play_pause(&self) { pub fn play_pause(&self) {
if self.sink.is_paused() { if self.sink.is_paused() {
self.sink.play(); self.sink.play();
} else { } else {
@ -102,7 +109,7 @@ impl Player {
}; };
} }
fn skip_song(&mut self) -> Result<(), Box<dyn Error>> { pub fn skip_song(&mut self) -> Result<(), Box<dyn Error>> {
println!("Skipping current song...:"); println!("Skipping current song...:");
let file_path = self.queue.pop_front().expect("foo"); let file_path = self.queue.pop_front().expect("foo");
self.enqueue_file(file_path)?; self.enqueue_file(file_path)?;