diff --git a/src/configuration.rs b/src/configuration.rs index 4087599..edbf4dc 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,15 +1,10 @@ use clap::{Parser, Subcommand}; -use lazy_static::lazy_static; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use crate::grpc; -lazy_static! { - pub static ref CONFIG: Config = Config::new(); -} - #[derive(Debug)] pub enum ConfigMode { Server, @@ -22,6 +17,12 @@ pub enum Commands { Start { #[arg(help = "Directory to scan for files", default_value = ".")] base_path: PathBuf, + #[arg( + long, + help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", + default_value = "1.0" + )] + volume: f32, }, /// Resume the playback Play, @@ -52,13 +53,6 @@ struct Args { #[arg(short, long, help = "the port to bind to", default_value = "50051")] port: u16, - - #[arg( - long, - help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", - default_value = "1.0" - )] - volume: f32, } #[derive(Debug)] @@ -85,7 +79,6 @@ impl Config { let mut config = Self::default(); config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); - config.volume = cli.volume; config.command = cli.cmd; if grpc::is_socket_in_use(config.address) { diff --git a/src/file_explorer.rs b/src/file_explorer.rs index 5913a42..682a44f 100644 --- a/src/file_explorer.rs +++ b/src/file_explorer.rs @@ -1,11 +1,9 @@ use ignore::types::TypesBuilder; use ignore::WalkBuilder; -use std::env; +use std::error::Error; use std::path::PathBuf; -use crate::configuration::{Commands, CONFIG}; - -pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result, &str> { +pub fn walk_dir(path: &PathBuf) -> Result, Box> { let mut types_builder = TypesBuilder::new(); types_builder.add_defaults(); @@ -17,34 +15,25 @@ pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result, &str> { types_builder.select("sound"); - let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // + // match path { + // Some(dir) => { + // search_path = base_path + // .join(dir) + // .canonicalize() + // .expect("Couldn't canonicalizice the path") + // } + // None => search_path = base_path.to_owned(), + // } + // + // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside + // // base_dir + // if !search_path.starts_with(base_path) { + // return Err("Tried to access file or directory outside of server `base_path` config."); + // } - if let Commands::Start { - base_path: config_path, - } = &CONFIG.command - { - base_path = config_path.to_owned(); - }; - - let search_path; - - match scan_dir { - Some(dir) => { - search_path = base_path - .join(dir) - .canonicalize() - .expect("Couldn't canonicalizice the path") - } - None => search_path = base_path.to_owned(), - } - - // PathBuf.join() can override the hole path, this ensure we're not accessing files outside - // base_dir - if !search_path.starts_with(base_path) { - return Err("Tried to access file or directory outside of server `base_path` config."); - } - - let entries: Vec = WalkBuilder::new(search_path) + let entries: Vec = WalkBuilder::new(path) .types(types_builder.build().unwrap()) .build() .filter_map(|entry| entry.ok()) diff --git a/src/grpc.rs b/src/grpc.rs index 13eed50..57f24b3 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,11 +1,10 @@ use std::net::{SocketAddr, TcpListener}; - pub use self::client::GRPCClient; pub use self::server::GRPCServer; mod client; -mod server; +pub mod server; pub mod grpc_juno { tonic::include_proto!("juno"); diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 3778f58..435c176 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,6 +1,6 @@ -use core::panic; +use std::net::SocketAddr; +use std::path::PathBuf; -use crate::configuration::{Commands, CONFIG}; use crate::grpc::grpc_juno::EmptyRequest; use super::grpc_juno; @@ -10,13 +10,19 @@ use grpc_juno::GetFilesRequest; use tonic::transport::Channel; use tonic::Request; -#[derive(Debug, Default)] -pub struct GRPCClient {} +#[derive(Debug)] +pub struct GRPCClient { + address: SocketAddr, +} impl GRPCClient { + pub fn new(address: SocketAddr) -> Self { + Self { address } + } + async fn get_client(&self) -> Result, Box> { let client = - JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; + JunoServicesClient::connect(format!("http://{}", self.address.to_string())).await?; Ok(client) } @@ -81,21 +87,17 @@ impl GRPCClient { Ok(()) } - pub async fn get_files(&self) -> Result<(), Box> { + pub async fn get_files(&self, path: &PathBuf) -> Result<(), Box> { let mut client = self.get_client().await?; - if let Commands::GetFiles { path } = &CONFIG.command { - let request = Request::new(GetFilesRequest { - path: path.display().to_string(), - }); + let request = Request::new(GetFilesRequest { + path: path.display().to_string(), + }); - let response = client.get_files(request).await?.into_inner(); + let response = client.get_files(request).await?.into_inner(); - println!("RESPONSE={:?}", response.files); + println!("RESPONSE={:?}", response.files); - return Ok(()); - }; - - panic!("Error"); + return Ok(()); } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 631db4c..9b96ce1 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,42 +1,65 @@ -use crate::configuration::{Commands, CONFIG}; -use crate::file_explorer; +use crate::configuration::Commands; use super::grpc_juno; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; use std::error::Error; +use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tonic::transport::Server; use tonic::{Request, Response, Result, Status}; +type Responder = oneshot::Sender; + +#[derive(Debug)] +pub struct GrpcServerMessage { + pub command: Commands, + pub responder: Responder<()>, +} + #[derive(Debug, Default)] pub struct GRPCServer { - transmitter: Option>, + transmitter: Option>, } impl GRPCServer { - pub fn new(tx: Sender) -> Self { + pub fn new(tx: Sender) -> Self { Self { transmitter: Some(tx), } } - async fn send_message(&self, message: Commands) -> Result<(), Box> { + async fn send_message(&self, command: Commands) -> Result<(), Box> { + let (resp_tx, resp_rx) = oneshot::channel(); + + let message = GrpcServerMessage { + command, + responder: resp_tx, + }; + if let Some(tx) = &self.transmitter { tx.send(message).await?; + + let response = resp_rx.await?; + + return Ok(response); } Ok(()) } - pub async fn serve(tx: Sender) -> Result<(), Box> { - println!("Starting server on: \"{}\"", CONFIG.address.to_string()); + pub async fn serve( + address: SocketAddr, + tx: Sender, + ) -> Result<(), Box> { + println!("Starting server on: \"{}\"", address.to_string()); Server::builder() .add_service(JunoServicesServer::new(GRPCServer::new(tx))) - .serve(CONFIG.address) + .serve(address) .await?; Ok(()) @@ -63,9 +86,9 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files = match file_explorer::walk_dir(Some(&path)) { - Ok(files) => files, - Err(err) => return Err(Status::invalid_argument(err)), + let files: Vec = match self.send_message(Commands::GetFiles { path }).await { + Ok(()) => vec![], + Err(_err) => return Err(Status::internal("An internal error has occurred.")), }; let reply = GetFilesResponse { diff --git a/src/main.rs b/src/main.rs index 96362b0..1fbf9e3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::env; use std::error::Error; use std::time::Duration; use tokio::time::sleep; @@ -6,59 +7,74 @@ use tokio::sync::mpsc; use crate::player::Player; -use self::configuration::{Commands, ConfigMode, CONFIG}; +use self::configuration::{Commands, Config, ConfigMode}; mod configuration; mod file_explorer; mod grpc; mod player; -async fn init_server() -> Result<(), Box> { - let (tx, mut rx) = mpsc::channel::(32); +async fn init_server(config: Config) -> Result<(), Box> { + let (tx, mut rx) = mpsc::channel::(32); tokio::spawn(async move { - let _ = grpc::GRPCServer::serve(tx).await; + let _ = grpc::GRPCServer::serve(config.address, tx).await; }); - let mut player = Player::new().expect("Error creating player"); + let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + let mut volume = 1.0; + + if let Commands::Start { + base_path: config_path, + volume: config_volume, + } = config.command + { + base_path = config_path.to_owned(); + volume = config_volume; + }; + + let mut player = Player::new(base_path, volume).expect("Error creating player"); println!("Listening for incomming messages..."); // This macro will wait on multiple futures and will return when the first one resolves - tokio::select! { - Some(msg) = rx.recv() => { - if let Err(err) = player.handle_message(msg) { - eprintln!("Error handling player action: {}", err); + // TODO: create a break system for shutdown + loop { + tokio::select! { + Some(msg) = rx.recv() => { + // TODO: receive message from player and send it back to server so it can be sent to + // the client + if let Err(err) = player.handle_message(msg.command) { + eprintln!("Error handling player action: {}", err); + } + + if let Err(_) = msg.responder.send(()) { + eprintln!("Error responding to grpc server"); + } + } + _ = async { + loop { + let _ = player.handle_idle(); + sleep(Duration::from_millis(200)).await; + } + } => {} + else => { + println!("player stopped"); } } - _ = async { - loop { - let _ = player.handle_idle(); - sleep(Duration::from_millis(200)).await; - } - } => {println!("player stopped");} } - - // this traps the main thread, it should run last. - while let Some(msg) = rx.recv().await { - if let Err(err) = player.handle_message(msg) { - eprintln!("Error handling player action: {}", err); - } - } - - Ok(()) } -async fn init_client() -> Result<(), Box> { - let client = grpc::GRPCClient::default(); +async fn init_client(config: Config) -> Result<(), Box> { + let client = grpc::GRPCClient::new(config.address); - match &CONFIG.command { + match &config.command { Commands::Play => client.play().await?, Commands::Pause => client.pause().await?, Commands::PlayPause => client.play_pause().await?, Commands::SkipSong => client.skip_song().await?, Commands::Set => todo!(), - Commands::GetFiles { path: _ } => client.get_files().await?, + Commands::GetFiles { path } => client.get_files(&path).await?, Commands::Ping => client.ping().await?, _ => { println!("This command doesn't apply to client mode") @@ -70,9 +86,11 @@ async fn init_client() -> Result<(), Box> { #[tokio::main()] async fn main() -> Result<(), Box> { - match CONFIG.mode { - ConfigMode::Server => init_server().await?, - ConfigMode::Client => init_client().await?, + let config = Config::new(); + + match config.mode { + ConfigMode::Server => init_server(config).await?, + ConfigMode::Client => init_client(config).await?, }; Ok(()) diff --git a/src/player.rs b/src/player.rs index 7d8ae75..5df0207 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use rodio::{OutputStream, Sink}; -use crate::configuration::{self, CONFIG}; +use crate::configuration; use crate::file_explorer::walk_dir; #[allow(dead_code)] @@ -14,6 +14,7 @@ pub struct Player { queue: VecDeque, sink: Sink, stream: OutputStream, + base_dir: PathBuf, } impl std::ops::Deref for Player { @@ -25,17 +26,18 @@ impl std::ops::Deref for Player { } impl Player { - pub fn new() -> Result> { - let queue = walk_dir(None)?; + pub fn new(base_dir: PathBuf, volume: f32) -> Result> { + let queue = walk_dir(&base_dir)?; // stream needs to exist as long as sink to work let (stream, stream_handle) = OutputStream::try_default()?; let sink = Sink::try_new(&stream_handle)?; - sink.set_volume(CONFIG.volume); + sink.set_volume(volume); Ok(Player { queue: VecDeque::from(queue), sink, stream, + base_dir, }) } @@ -80,6 +82,10 @@ impl Player { Ok(()) } + fn get_files(path: &PathBuf) -> Result, Box> { + Ok(walk_dir(&path)?) + } + fn play(&mut self) { self.sink.play(); }