From 7382b06bdf6c027f84bc799a05a3971930a9441f Mon Sep 17 00:00:00 2001 From: aleidk Date: Tue, 28 May 2024 20:11:53 -0400 Subject: [PATCH 1/8] feat: implement basic music player and grpc server --- Cargo.toml | 2 + README.md | 18 +++++++ proto/juno.proto | 15 ++++-- src/configuration.rs | 37 ++++++++++++-- src/file_explorer.rs | 6 +-- src/grpc.rs | 24 +++------ src/grpc/client.rs | 54 +++++++++++++++----- src/grpc/server.rs | 63 +++++++++++++++++------ src/main.rs | 61 ++++++++++++++++++++-- src/player.rs | 117 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 336 insertions(+), 61 deletions(-) create mode 100644 src/player.rs diff --git a/Cargo.toml b/Cargo.toml index b8a5deb..4feab30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ edition = "2021" [dependencies] clap = { version = "4.5.4", features = ["derive"] } +futures = "0.3.30" ignore = "0.4.22" lazy_static = "1.4.0" prost = "0.12.4" +rodio = "0.17.3" tokio = { version = "1", features = ["full"] } tonic = "0.11.0" diff --git a/README.md b/README.md index 202e32b..94dff6e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,24 @@ The project was split into 2 though: - [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player. - [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player. +## Contributing + +Aside from rust and cargo, you need the following dependencies: + +- alsa-lib devel package +```bash +# fedora +dnf install -y alsa-lib-devel + +``` + +- Tonic protobuf dependencies +```bash +# fedora +dnf install -y protobuf-devel + +``` + ## Similar projects - [Navidrome](https://www.navidrome.org) diff --git a/proto/juno.proto b/proto/juno.proto index aad85d6..b81cb82 100644 --- a/proto/juno.proto +++ b/proto/juno.proto @@ -2,15 +2,21 @@ syntax = "proto3"; package juno; -service JunoRequest { - rpc Ping (PingRequestMessage) returns (PingResponseMessage); +service JunoServices { + rpc Ping (EmptyRequest) returns (PingResponse); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); + rpc SkipSong (EmptyRequest) returns (StatusResponse); } -message PingRequestMessage { +message EmptyRequest { } -message PingResponseMessage { +// TODO: add an enmurator and a "message" so this act as a generic response to +// services that don't need to return valuable data +message StatusResponse { +} + +message PingResponse { string message = 1; } @@ -21,3 +27,4 @@ message GetFilesRequest { message GetFilesResponse { repeated string files = 1; } + diff --git a/src/configuration.rs b/src/configuration.rs index 73bcf2b..49feaeb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,42 +1,73 @@ use clap::Parser; use lazy_static::lazy_static; use std::env; +use std::net::SocketAddr; use std::path::PathBuf; +use std::str::FromStr; + +use crate::grpc; lazy_static! { pub static ref CONFIG: Config = Config::new(); } +#[derive(Debug)] +pub enum ConfigMode { + Server, + Client, +} + #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { #[arg(help = "Directory to scan for files")] path: Option, + #[arg(short, long, help = "the port to bind to", default_value = "50051")] + port: u16, + #[arg( + long, + help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", + default_value = "1.0" + )] + volume: f32, } #[derive(Debug)] pub struct Config { pub base_path: PathBuf, + pub address: SocketAddr, + pub mode: ConfigMode, + pub volume: f32, } impl Default for Config { fn default() -> Self { Config { base_path: env::current_dir().expect("Current directory is not available."), + mode: ConfigMode::Server, + address: SocketAddr::from_str("[::1]:50051").unwrap(), + volume: 1.0, } } } - impl Config { pub fn new() -> Self { - let mut config = Self::default(); - let cli = Self::get_cli_args(); + let mut config = Self::default(); + config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); + config.volume = cli.volume; + if let Some(path) = cli.path { config.base_path = path; } + if grpc::is_socket_in_use(config.address) { + config.mode = ConfigMode::Client; + } else { + config.mode = ConfigMode::Server; + }; + config } diff --git a/src/file_explorer.rs b/src/file_explorer.rs index e932592..13cb57b 100644 --- a/src/file_explorer.rs +++ b/src/file_explorer.rs @@ -8,7 +8,7 @@ pub fn walk_dir(path: &PathBuf) -> Result, &str> { let mut types_builder = TypesBuilder::new(); types_builder.add_defaults(); - let accepted_filetypes = ["mp3", "flac"]; + let accepted_filetypes = ["mp3", "flac", "wav"]; for filetype in accepted_filetypes { let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); @@ -17,10 +17,6 @@ pub fn walk_dir(path: &PathBuf) -> Result, &str> { types_builder.select("sound"); let search_path = CONFIG.base_path.join(path); - eprintln!( - "DEBUGPRINT[1]: file_explorer.rs:19: search_path={:#?}", - search_path - ); // PathBuf.join() can override the hole path, this ensure we're not accessing files outside // base_dir diff --git a/src/grpc.rs b/src/grpc.rs index 77e1b60..20bf2e0 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -3,8 +3,8 @@ use std::net::{SocketAddr, TcpListener}; use tonic::async_trait; -use self::client::GRPCClient; -use self::server::GRPCServer; +pub use self::client::GRPCClient; +pub use self::server::GRPCServer; mod client; mod server; @@ -18,20 +18,10 @@ pub trait Connection { async fn connect(&self) -> Result<(), Box>; } -fn is_socket_in_use(addr: String) -> bool { - let socket: SocketAddr = addr.parse().expect("Failed to create socket"); - match TcpListener::bind(socket) { - Ok(_) => true, - Err(_) => false, - } -} - -pub fn run() -> Result, Box> { - let addr = "[::1]:50051"; - - if is_socket_in_use(addr.to_string()) { - Ok(Box::new(GRPCServer::new(addr.to_string()))) - } else { - Ok(Box::new(GRPCClient::new(addr.to_string()))) +/// Return true if the addr is already in use, false otherwise +pub fn is_socket_in_use(addr: SocketAddr) -> bool { + match TcpListener::bind(addr) { + Ok(_) => false, + Err(_) => true, } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index b5026b1..816d2ed 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,28 +1,25 @@ +use crate::configuration::CONFIG; +use crate::grpc::grpc_juno::EmptyRequest; + use super::grpc_juno; -use grpc_juno::juno_request_client::JunoRequestClient; +use grpc_juno::juno_services_client::JunoServicesClient; use grpc_juno::GetFilesRequest; use tonic::async_trait; +use tonic::transport::Channel; use tonic::Request; #[derive(Debug, Default)] -pub struct GRPCClient { - address: String, -} - -impl GRPCClient { - pub fn new(address: String) -> Self { - Self { address } - } -} +pub struct GRPCClient {} #[async_trait] impl super::Connection for GRPCClient { async fn connect(&self) -> Result<(), Box> { - let mut client = JunoRequestClient::connect(format!("http://{}", self.address)).await?; + let mut client = + JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; let request = Request::new(GetFilesRequest { - path: "/home/aleidk/Documents/".to_string(), + path: CONFIG.base_path.display().to_string(), }); let response = client.get_files(request).await?.into_inner(); @@ -32,3 +29,36 @@ impl super::Connection for GRPCClient { Ok(()) } } + +impl GRPCClient { + async fn get_client(&self) -> Result, Box> { + let client = + JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; + + Ok(client) + } + + pub async fn ping(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.ping(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn skip_song(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.skip_song(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } +} diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9b1363b..77b1aff 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,33 +1,55 @@ -use crate::file_explorer; +use crate::configuration::CONFIG; +use crate::{file_explorer, PlayerAction}; use super::grpc_juno; -use grpc_juno::juno_request_server::{JunoRequest, JunoRequestServer}; -use grpc_juno::{GetFilesRequest, GetFilesResponse, PingRequestMessage, PingResponseMessage}; +use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; +use grpc_juno::{EmptyRequest, GetFilesRequest, GetFilesResponse, PingResponse, StatusResponse}; use std::error::Error; -use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use tokio::sync::mpsc::Sender; use tonic::transport::Server; use tonic::{async_trait, Request, Response, Result, Status}; #[derive(Debug, Default)] pub struct GRPCServer { - address: String, + transmitter: Option>, } impl GRPCServer { - pub fn new(address: String) -> Self { - Self { address } + pub fn new(tx: Sender) -> Self { + Self { + transmitter: Some(tx), + } + } + + async fn send_message(&self, message: PlayerAction) -> Result<(), Box> { + if let Some(tx) = &self.transmitter { + tx.send(message).await?; + } + + Ok(()) + } + + pub async fn serve(tx: Sender) -> Result<(), Box> { + println!("Starting server on: \"{}\"", CONFIG.address.to_string()); + + Server::builder() + .add_service(JunoServicesServer::new(GRPCServer::new(tx))) + .serve(CONFIG.address) + .await?; + + Ok(()) } } #[tonic::async_trait] -impl JunoRequest for GRPCServer { +impl JunoServices for GRPCServer { async fn ping( &self, - _request: Request, - ) -> Result, Status> { - let reply = PingResponseMessage { + _request: Request, + ) -> Result, Status> { + let reply = PingResponse { message: "pong!".to_string(), }; @@ -52,18 +74,27 @@ impl JunoRequest for GRPCServer { Ok(Response::new(reply)) } + + async fn skip_song( + &self, + _request: Request, + ) -> Result, Status> { + if let Err(_err) = self.send_message(PlayerAction::SkipSong).await { + return Err(Status::internal("An internal error has occurred.")); + } + + Ok(Response::new(StatusResponse {})) + } } #[async_trait] impl super::Connection for GRPCServer { async fn connect(&self) -> Result<(), Box> { - println!("Starting server on: \"{}\"", self.address); - - let socket: SocketAddr = self.address.parse()?; + println!("Starting server on: \"{}\"", CONFIG.address.to_string()); Server::builder() - .add_service(JunoRequestServer::new(GRPCServer::default())) - .serve(socket) + .add_service(JunoServicesServer::new(GRPCServer::default())) + .serve(CONFIG.address) .await?; Ok(()) diff --git a/src/main.rs b/src/main.rs index dee7a89..e68622e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,67 @@ use std::error::Error; +use std::time::Duration; +use tokio::time::sleep; + +use tokio::sync::mpsc; + +use crate::player::Player; + +use self::configuration::{ConfigMode, CONFIG}; +use self::player::PlayerAction; mod configuration; mod file_explorer; mod grpc; +mod player; -#[tokio::main()] -async fn main() -> Result<(), Box> { - let server = grpc::run()?; +async fn init_server() -> Result<(), Box> { + let (tx, mut rx) = mpsc::channel::(32); - server.connect().await?; + tokio::spawn(async move { + let _ = grpc::GRPCServer::serve(tx).await; + }); + + let mut player = Player::new().expect("Error creating player"); + + println!("Listening for incomming messages..."); + + // This macro will wait on multiple futures and will return when the first one resolves + tokio::select! { + Some(msg) = rx.recv() => { + if let Err(err) = player.handle_message(msg) { + eprintln!("Error handling player action: {}", err); + } + } + _ = async { + loop { + let _ = player.handle_idle(); + sleep(Duration::from_millis(200)).await; + } + } => {println!("player stopped");} + } + + // this traps the main thread, it should run last. + while let Some(msg) = rx.recv().await { + if let Err(err) = player.handle_message(msg) { + eprintln!("Error handling player action: {}", err); + } + } + + Ok(()) +} + +async fn init_client() -> Result<(), Box> { + let client = grpc::GRPCClient::default(); + let _ = client.skip_song().await; + Ok(()) +} + +#[tokio::main()] +async fn main() -> Result<(), Box> { + match CONFIG.mode { + ConfigMode::Server => init_server().await?, + ConfigMode::Client => init_client().await?, + }; Ok(()) } diff --git a/src/player.rs b/src/player.rs new file mode 100644 index 0000000..dd1c4d0 --- /dev/null +++ b/src/player.rs @@ -0,0 +1,117 @@ +use std::collections::VecDeque; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::path::PathBuf; + +use rodio::{OutputStream, Sink}; + +use crate::configuration::CONFIG; +use crate::file_explorer::walk_dir; + +#[derive(Debug)] +pub enum PlayerAction { + Play, + SkipSong, + Set, +} + +pub struct Player { + queue: VecDeque, + sink: Sink, + stream: OutputStream, +} + +impl std::ops::Deref for Player { + type Target = Sink; + + fn deref(&self) -> &Self::Target { + &self.sink + } +} + +impl Player { + pub fn new() -> Result> { + let queue = walk_dir(&CONFIG.base_path)?; + let (stream, stream_handle) = OutputStream::try_default()?; + let sink = Sink::try_new(&stream_handle)?; + sink.set_volume(CONFIG.volume); + Ok(Player { + queue: VecDeque::from(queue), + sink, + stream, + }) + } + + pub fn handle_message(&mut self, message: PlayerAction) -> Result<(), Box> { + match message { + PlayerAction::Play => self.play()?, + PlayerAction::SkipSong => self.skip_song()?, + PlayerAction::Set => unimplemented!(), + } + + Ok(()) + } + + pub fn handle_idle(&mut self) -> Result<(), Box> { + if self.sink.is_paused() { + return Ok(()); + } + + if self.queue.len() == 0 { + return Ok(()); + } + + if self.sink.len() != 0 { + return Ok(()); + } + + let file_path = self + .queue + .pop_front() + .expect("There was an error with the queue"); + + self.enqueue_file(file_path)?; + + Ok(()) + } + + fn play(&mut self) -> Result<(), Box> { + self.sink.play(); + + Ok(()) + } + + fn skip_song(&mut self) -> Result<(), Box> { + println!("Skipping current song...:"); + let file_path = self.queue.pop_front().expect("foo"); + self.enqueue_file(file_path)?; + self.sink.skip_one(); + + Ok(()) + } + + fn enqueue_file(&self, file_path: PathBuf) -> Result<(), Box> { + println!("Playing file: {}", file_path.display()); + let file = File::open(file_path)?; + + self.sink.append(rodio::Decoder::new(BufReader::new(file))?); + Ok(()) + } + + fn _play_pause(&self) { + if self.sink.is_paused() { + self.sink.play(); + } else { + self.sink.pause(); + }; + } + + fn set_playback_state(&self, is_paused: bool) { + if is_paused { + self.sink.pause(); + } else { + self.sink.play(); + }; + } +} From 3fefadd5b5a5e7eb54158f796b99efb4c505eb00 Mon Sep 17 00:00:00 2001 From: aleidk Date: Wed, 17 Jul 2024 14:37:54 -0400 Subject: [PATCH 2/8] feat(cli): implements subcommands --- src/configuration.rs | 28 +++++++++++++++++++++++----- src/grpc.rs | 5 ----- src/grpc/client.rs | 32 ++++++++++++++------------------ src/grpc/server.rs | 14 -------------- src/player.rs | 8 -------- 5 files changed, 37 insertions(+), 50 deletions(-) diff --git a/src/configuration.rs b/src/configuration.rs index 49feaeb..118f2fb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{Parser, Subcommand}; use lazy_static::lazy_static; use std::env; use std::net::SocketAddr; @@ -17,13 +17,26 @@ pub enum ConfigMode { Client, } +#[derive(Subcommand, Debug, Clone)] +pub enum Commands { + Start { + #[arg(help = "Directory to scan for files")] + path: Option, + }, + Play, + SkipSong, + Set, +} + #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { - #[arg(help = "Directory to scan for files")] - path: Option, + #[command(subcommand)] + cmd: Commands, + #[arg(short, long, help = "the port to bind to", default_value = "50051")] port: u16, + #[arg( long, help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", @@ -34,6 +47,7 @@ struct Args { #[derive(Debug)] pub struct Config { + pub command: Commands, pub base_path: PathBuf, pub address: SocketAddr, pub mode: ConfigMode, @@ -43,6 +57,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { + command: Commands::Play, base_path: env::current_dir().expect("Current directory is not available."), mode: ConfigMode::Server, address: SocketAddr::from_str("[::1]:50051").unwrap(), @@ -57,9 +72,12 @@ impl Config { let mut config = Self::default(); config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); config.volume = cli.volume; + config.command = cli.cmd.to_owned(); - if let Some(path) = cli.path { - config.base_path = path; + if let Commands::Start { path } = cli.cmd { + if let Some(path) = path { + config.base_path = path; + } } if grpc::is_socket_in_use(config.address) { diff --git a/src/grpc.rs b/src/grpc.rs index 20bf2e0..495e524 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -13,11 +13,6 @@ pub mod grpc_juno { tonic::include_proto!("juno"); } -#[async_trait] -pub trait Connection { - async fn connect(&self) -> Result<(), Box>; -} - /// Return true if the addr is already in use, false otherwise pub fn is_socket_in_use(addr: SocketAddr) -> bool { match TcpListener::bind(addr) { diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 816d2ed..9523785 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -12,24 +12,6 @@ use tonic::Request; #[derive(Debug, Default)] pub struct GRPCClient {} -#[async_trait] -impl super::Connection for GRPCClient { - async fn connect(&self) -> Result<(), Box> { - let mut client = - JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; - - let request = Request::new(GetFilesRequest { - path: CONFIG.base_path.display().to_string(), - }); - - let response = client.get_files(request).await?.into_inner(); - - println!("RESPONSE={:?}", response.files); - - Ok(()) - } -} - impl GRPCClient { async fn get_client(&self) -> Result, Box> { let client = @@ -61,4 +43,18 @@ impl GRPCClient { Ok(()) } + + pub async fn list_files(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(GetFilesRequest { + path: CONFIG.base_path.display().to_string(), + }); + + let response = client.get_files(request).await?.into_inner(); + + println!("RESPONSE={:?}", response.files); + + Ok(()) + } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 77b1aff..c80641c 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -86,17 +86,3 @@ impl JunoServices for GRPCServer { Ok(Response::new(StatusResponse {})) } } - -#[async_trait] -impl super::Connection for GRPCServer { - async fn connect(&self) -> Result<(), Box> { - println!("Starting server on: \"{}\"", CONFIG.address.to_string()); - - Server::builder() - .add_service(JunoServicesServer::new(GRPCServer::default())) - .serve(CONFIG.address) - .await?; - - Ok(()) - } -} diff --git a/src/player.rs b/src/player.rs index dd1c4d0..3fe7363 100644 --- a/src/player.rs +++ b/src/player.rs @@ -106,12 +106,4 @@ impl Player { self.sink.pause(); }; } - - fn set_playback_state(&self, is_paused: bool) { - if is_paused { - self.sink.pause(); - } else { - self.sink.play(); - }; - } } From 2292035b8d12bb3dc8327114c450da378d6e1447 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 18 Jul 2024 13:25:37 -0400 Subject: [PATCH 3/8] refactor(cli): move paths to subcommands --- proto/juno.proto | 15 +++++++---- src/configuration.rs | 31 ++++++++++++++--------- src/file_explorer.rs | 30 ++++++++++++++++++---- src/grpc/client.rs | 59 +++++++++++++++++++++++++++++++++++++------- src/grpc/server.rs | 59 ++++++++++++++++++++++++++++++++++---------- src/main.rs | 20 ++++++++++++--- src/player.rs | 52 ++++++++++++++++++++------------------ 7 files changed, 195 insertions(+), 71 deletions(-) diff --git a/proto/juno.proto b/proto/juno.proto index b81cb82..d2870d7 100644 --- a/proto/juno.proto +++ b/proto/juno.proto @@ -5,15 +5,21 @@ package juno; service JunoServices { rpc Ping (EmptyRequest) returns (PingResponse); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); - rpc SkipSong (EmptyRequest) returns (StatusResponse); + rpc SkipSong (EmptyRequest) returns (EmptyResponse); + rpc Play (EmptyRequest) returns (EmptyResponse); + rpc Pause (EmptyRequest) returns (EmptyResponse); + rpc PlayPause (EmptyRequest) returns (EmptyResponse); +} + +enum Status { + SUCCESS = 0; + ERROR = 1; } message EmptyRequest { } -// TODO: add an enmurator and a "message" so this act as a generic response to -// services that don't need to return valuable data -message StatusResponse { +message EmptyResponse { } message PingResponse { @@ -27,4 +33,3 @@ message GetFilesRequest { message GetFilesResponse { repeated string files = 1; } - diff --git a/src/configuration.rs b/src/configuration.rs index 118f2fb..852bcc2 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -19,13 +19,30 @@ pub enum ConfigMode { #[derive(Subcommand, Debug, Clone)] pub enum Commands { + /// Start the GRPC server Start { - #[arg(help = "Directory to scan for files")] - path: Option, + #[arg(help = "Directory to scan for files", default_value = ".")] + base_path: PathBuf, }, + /// Resume the playback Play, + /// Pause the playback + Pause, + /// Resume the playback if pause, pause if is playing + PlayPause, + /// Skip the current song SkipSong, Set, + /// List the available files + GetFiles { + #[arg( + help = "Directory to scan for files, relative to server base path", + default_value = "." + )] + path: PathBuf, + }, + /// Test server connection + Ping, } #[derive(Parser)] @@ -48,7 +65,6 @@ struct Args { #[derive(Debug)] pub struct Config { pub command: Commands, - pub base_path: PathBuf, pub address: SocketAddr, pub mode: ConfigMode, pub volume: f32, @@ -58,7 +74,6 @@ impl Default for Config { fn default() -> Self { Config { command: Commands::Play, - base_path: env::current_dir().expect("Current directory is not available."), mode: ConfigMode::Server, address: SocketAddr::from_str("[::1]:50051").unwrap(), volume: 1.0, @@ -72,13 +87,7 @@ impl Config { let mut config = Self::default(); config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); config.volume = cli.volume; - config.command = cli.cmd.to_owned(); - - if let Commands::Start { path } = cli.cmd { - if let Some(path) = path { - config.base_path = path; - } - } + config.command = cli.cmd; if grpc::is_socket_in_use(config.address) { config.mode = ConfigMode::Client; diff --git a/src/file_explorer.rs b/src/file_explorer.rs index 13cb57b..5913a42 100644 --- a/src/file_explorer.rs +++ b/src/file_explorer.rs @@ -1,10 +1,11 @@ use ignore::types::TypesBuilder; use ignore::WalkBuilder; +use std::env; use std::path::PathBuf; -use crate::configuration::CONFIG; +use crate::configuration::{Commands, CONFIG}; -pub fn walk_dir(path: &PathBuf) -> Result, &str> { +pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result, &str> { let mut types_builder = TypesBuilder::new(); types_builder.add_defaults(); @@ -16,12 +17,31 @@ pub fn walk_dir(path: &PathBuf) -> Result, &str> { types_builder.select("sound"); - let search_path = CONFIG.base_path.join(path); + let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + + if let Commands::Start { + base_path: config_path, + } = &CONFIG.command + { + base_path = config_path.to_owned(); + }; + + let search_path; + + match scan_dir { + Some(dir) => { + search_path = base_path + .join(dir) + .canonicalize() + .expect("Couldn't canonicalizice the path") + } + None => search_path = base_path.to_owned(), + } // PathBuf.join() can override the hole path, this ensure we're not accessing files outside // base_dir - if !search_path.starts_with(&CONFIG.base_path) { - return Err("Tried to access file or directory outside of server `base_dir` config."); + if !search_path.starts_with(base_path) { + return Err("Tried to access file or directory outside of server `base_path` config."); } let entries: Vec = WalkBuilder::new(search_path) diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 9523785..3778f58 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,11 +1,12 @@ -use crate::configuration::CONFIG; +use core::panic; + +use crate::configuration::{Commands, CONFIG}; use crate::grpc::grpc_juno::EmptyRequest; use super::grpc_juno; use grpc_juno::juno_services_client::JunoServicesClient; use grpc_juno::GetFilesRequest; -use tonic::async_trait; use tonic::transport::Channel; use tonic::Request; @@ -32,6 +33,42 @@ impl GRPCClient { Ok(()) } + pub async fn play(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.play(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn pause(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.pause(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn play_pause(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.play_pause(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + pub async fn skip_song(&self) -> Result<(), Box> { let mut client = self.get_client().await?; @@ -44,17 +81,21 @@ impl GRPCClient { Ok(()) } - pub async fn list_files(&self) -> Result<(), Box> { + pub async fn get_files(&self) -> Result<(), Box> { let mut client = self.get_client().await?; - let request = Request::new(GetFilesRequest { - path: CONFIG.base_path.display().to_string(), - }); + if let Commands::GetFiles { path } = &CONFIG.command { + let request = Request::new(GetFilesRequest { + path: path.display().to_string(), + }); - let response = client.get_files(request).await?.into_inner(); + let response = client.get_files(request).await?.into_inner(); - println!("RESPONSE={:?}", response.files); + println!("RESPONSE={:?}", response.files); - Ok(()) + return Ok(()); + }; + + panic!("Error"); } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index c80641c..631db4c 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,29 +1,29 @@ -use crate::configuration::CONFIG; -use crate::{file_explorer, PlayerAction}; +use crate::configuration::{Commands, CONFIG}; +use crate::file_explorer; use super::grpc_juno; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; -use grpc_juno::{EmptyRequest, GetFilesRequest, GetFilesResponse, PingResponse, StatusResponse}; +use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; use std::error::Error; use std::path::PathBuf; use std::str::FromStr; use tokio::sync::mpsc::Sender; use tonic::transport::Server; -use tonic::{async_trait, Request, Response, Result, Status}; +use tonic::{Request, Response, Result, Status}; #[derive(Debug, Default)] pub struct GRPCServer { - transmitter: Option>, + transmitter: Option>, } impl GRPCServer { - pub fn new(tx: Sender) -> Self { + pub fn new(tx: Sender) -> Self { Self { transmitter: Some(tx), } } - async fn send_message(&self, message: PlayerAction) -> Result<(), Box> { + async fn send_message(&self, message: Commands) -> Result<(), Box> { if let Some(tx) = &self.transmitter { tx.send(message).await?; } @@ -31,7 +31,7 @@ impl GRPCServer { Ok(()) } - pub async fn serve(tx: Sender) -> Result<(), Box> { + pub async fn serve(tx: Sender) -> Result<(), Box> { println!("Starting server on: \"{}\"", CONFIG.address.to_string()); Server::builder() @@ -63,7 +63,7 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files = match file_explorer::walk_dir(&path) { + let files = match file_explorer::walk_dir(Some(&path)) { Ok(files) => files, Err(err) => return Err(Status::invalid_argument(err)), }; @@ -75,14 +75,47 @@ impl JunoServices for GRPCServer { Ok(Response::new(reply)) } - async fn skip_song( + async fn play( &self, _request: Request, - ) -> Result, Status> { - if let Err(_err) = self.send_message(PlayerAction::SkipSong).await { + ) -> Result, Status> { + if let Err(_err) = self.send_message(Commands::Play).await { return Err(Status::internal("An internal error has occurred.")); } - Ok(Response::new(StatusResponse {})) + Ok(Response::new(EmptyResponse {})) + } + + async fn pause( + &self, + _request: Request, + ) -> Result, Status> { + if let Err(_err) = self.send_message(Commands::Pause).await { + return Err(Status::internal("An internal error has occurred.")); + } + + Ok(Response::new(EmptyResponse {})) + } + + async fn play_pause( + &self, + _request: Request, + ) -> Result, Status> { + if let Err(_err) = self.send_message(Commands::PlayPause).await { + return Err(Status::internal("An internal error has occurred.")); + } + + Ok(Response::new(EmptyResponse {})) + } + + async fn skip_song( + &self, + _request: Request, + ) -> Result, Status> { + if let Err(_err) = self.send_message(Commands::SkipSong).await { + return Err(Status::internal("An internal error has occurred.")); + } + + Ok(Response::new(EmptyResponse {})) } } diff --git a/src/main.rs b/src/main.rs index e68622e..96362b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,8 +6,7 @@ use tokio::sync::mpsc; use crate::player::Player; -use self::configuration::{ConfigMode, CONFIG}; -use self::player::PlayerAction; +use self::configuration::{Commands, ConfigMode, CONFIG}; mod configuration; mod file_explorer; @@ -15,7 +14,7 @@ mod grpc; mod player; async fn init_server() -> Result<(), Box> { - let (tx, mut rx) = mpsc::channel::(32); + let (tx, mut rx) = mpsc::channel::(32); tokio::spawn(async move { let _ = grpc::GRPCServer::serve(tx).await; @@ -52,7 +51,20 @@ async fn init_server() -> Result<(), Box> { async fn init_client() -> Result<(), Box> { let client = grpc::GRPCClient::default(); - let _ = client.skip_song().await; + + match &CONFIG.command { + Commands::Play => client.play().await?, + Commands::Pause => client.pause().await?, + Commands::PlayPause => client.play_pause().await?, + Commands::SkipSong => client.skip_song().await?, + Commands::Set => todo!(), + Commands::GetFiles { path: _ } => client.get_files().await?, + Commands::Ping => client.ping().await?, + _ => { + println!("This command doesn't apply to client mode") + } + } + Ok(()) } diff --git a/src/player.rs b/src/player.rs index 3fe7363..557a660 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,16 +6,9 @@ use std::path::PathBuf; use rodio::{OutputStream, Sink}; -use crate::configuration::CONFIG; +use crate::configuration::{self, CONFIG}; use crate::file_explorer::walk_dir; -#[derive(Debug)] -pub enum PlayerAction { - Play, - SkipSong, - Set, -} - pub struct Player { queue: VecDeque, sink: Sink, @@ -32,10 +25,11 @@ impl std::ops::Deref for Player { impl Player { pub fn new() -> Result> { - let queue = walk_dir(&CONFIG.base_path)?; + let queue = walk_dir(None)?; let (stream, stream_handle) = OutputStream::try_default()?; let sink = Sink::try_new(&stream_handle)?; sink.set_volume(CONFIG.volume); + Ok(Player { queue: VecDeque::from(queue), sink, @@ -43,12 +37,20 @@ impl Player { }) } - pub fn handle_message(&mut self, message: PlayerAction) -> Result<(), Box> { + pub fn handle_message( + &mut self, + message: configuration::Commands, + ) -> Result<(), Box> { match message { - PlayerAction::Play => self.play()?, - PlayerAction::SkipSong => self.skip_song()?, - PlayerAction::Set => unimplemented!(), - } + configuration::Commands::Play => self.play(), + configuration::Commands::Pause => self.pause(), + configuration::Commands::PlayPause => self.play_pause(), + configuration::Commands::SkipSong => self.skip_song()?, + configuration::Commands::Set => todo!(), + _ => { + println!("This command doesn't apply to client mode") + } + }; Ok(()) } @@ -76,10 +78,20 @@ impl Player { Ok(()) } - fn play(&mut self) -> Result<(), Box> { + fn play(&mut self) { self.sink.play(); + } - Ok(()) + fn pause(&mut self) { + self.sink.pause(); + } + + fn play_pause(&self) { + if self.sink.is_paused() { + self.sink.play(); + } else { + self.sink.pause(); + }; } fn skip_song(&mut self) -> Result<(), Box> { @@ -98,12 +110,4 @@ impl Player { self.sink.append(rodio::Decoder::new(BufReader::new(file))?); Ok(()) } - - fn _play_pause(&self) { - if self.sink.is_paused() { - self.sink.play(); - } else { - self.sink.pause(); - }; - } } From 9cec453498a37c8228cac3d9d17473bb5d99a0b0 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 18 Jul 2024 14:25:32 -0400 Subject: [PATCH 4/8] chore: cleanup cargo warnings --- src/configuration.rs | 1 - src/grpc.rs | 2 -- src/player.rs | 2 ++ 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/configuration.rs b/src/configuration.rs index 852bcc2..4087599 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,6 +1,5 @@ use clap::{Parser, Subcommand}; use lazy_static::lazy_static; -use std::env; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; diff --git a/src/grpc.rs b/src/grpc.rs index 495e524..13eed50 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,7 +1,5 @@ -use std::error::Error; use std::net::{SocketAddr, TcpListener}; -use tonic::async_trait; pub use self::client::GRPCClient; pub use self::server::GRPCServer; diff --git a/src/player.rs b/src/player.rs index 557a660..7d8ae75 100644 --- a/src/player.rs +++ b/src/player.rs @@ -9,6 +9,7 @@ use rodio::{OutputStream, Sink}; use crate::configuration::{self, CONFIG}; use crate::file_explorer::walk_dir; +#[allow(dead_code)] pub struct Player { queue: VecDeque, sink: Sink, @@ -26,6 +27,7 @@ impl std::ops::Deref for Player { impl Player { pub fn new() -> Result> { let queue = walk_dir(None)?; + // stream needs to exist as long as sink to work let (stream, stream_handle) = OutputStream::try_default()?; let sink = Sink::try_new(&stream_handle)?; sink.set_volume(CONFIG.volume); From 940093d599e6f230724c8df2d260ce0910c37a26 Mon Sep 17 00:00:00 2001 From: aleidk Date: Fri, 19 Jul 2024 16:02:56 -0400 Subject: [PATCH 5/8] refactor(config): remove global config and pass as referenced --- src/configuration.rs | 19 ++++------- src/file_explorer.rs | 51 +++++++++++----------------- src/grpc.rs | 3 +- src/grpc/client.rs | 34 ++++++++++--------- src/grpc/server.rs | 45 +++++++++++++++++++------ src/main.rs | 80 +++++++++++++++++++++++++++----------------- src/player.rs | 14 +++++--- 7 files changed, 138 insertions(+), 108 deletions(-) diff --git a/src/configuration.rs b/src/configuration.rs index 4087599..edbf4dc 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,15 +1,10 @@ use clap::{Parser, Subcommand}; -use lazy_static::lazy_static; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use crate::grpc; -lazy_static! { - pub static ref CONFIG: Config = Config::new(); -} - #[derive(Debug)] pub enum ConfigMode { Server, @@ -22,6 +17,12 @@ pub enum Commands { Start { #[arg(help = "Directory to scan for files", default_value = ".")] base_path: PathBuf, + #[arg( + long, + help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", + default_value = "1.0" + )] + volume: f32, }, /// Resume the playback Play, @@ -52,13 +53,6 @@ struct Args { #[arg(short, long, help = "the port to bind to", default_value = "50051")] port: u16, - - #[arg( - long, - help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", - default_value = "1.0" - )] - volume: f32, } #[derive(Debug)] @@ -85,7 +79,6 @@ impl Config { let mut config = Self::default(); config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); - config.volume = cli.volume; config.command = cli.cmd; if grpc::is_socket_in_use(config.address) { diff --git a/src/file_explorer.rs b/src/file_explorer.rs index 5913a42..682a44f 100644 --- a/src/file_explorer.rs +++ b/src/file_explorer.rs @@ -1,11 +1,9 @@ use ignore::types::TypesBuilder; use ignore::WalkBuilder; -use std::env; +use std::error::Error; use std::path::PathBuf; -use crate::configuration::{Commands, CONFIG}; - -pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result, &str> { +pub fn walk_dir(path: &PathBuf) -> Result, Box> { let mut types_builder = TypesBuilder::new(); types_builder.add_defaults(); @@ -17,34 +15,25 @@ pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result, &str> { types_builder.select("sound"); - let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // + // match path { + // Some(dir) => { + // search_path = base_path + // .join(dir) + // .canonicalize() + // .expect("Couldn't canonicalizice the path") + // } + // None => search_path = base_path.to_owned(), + // } + // + // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside + // // base_dir + // if !search_path.starts_with(base_path) { + // return Err("Tried to access file or directory outside of server `base_path` config."); + // } - if let Commands::Start { - base_path: config_path, - } = &CONFIG.command - { - base_path = config_path.to_owned(); - }; - - let search_path; - - match scan_dir { - Some(dir) => { - search_path = base_path - .join(dir) - .canonicalize() - .expect("Couldn't canonicalizice the path") - } - None => search_path = base_path.to_owned(), - } - - // PathBuf.join() can override the hole path, this ensure we're not accessing files outside - // base_dir - if !search_path.starts_with(base_path) { - return Err("Tried to access file or directory outside of server `base_path` config."); - } - - let entries: Vec = WalkBuilder::new(search_path) + let entries: Vec = WalkBuilder::new(path) .types(types_builder.build().unwrap()) .build() .filter_map(|entry| entry.ok()) diff --git a/src/grpc.rs b/src/grpc.rs index 13eed50..57f24b3 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,11 +1,10 @@ use std::net::{SocketAddr, TcpListener}; - pub use self::client::GRPCClient; pub use self::server::GRPCServer; mod client; -mod server; +pub mod server; pub mod grpc_juno { tonic::include_proto!("juno"); diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 3778f58..435c176 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,6 +1,6 @@ -use core::panic; +use std::net::SocketAddr; +use std::path::PathBuf; -use crate::configuration::{Commands, CONFIG}; use crate::grpc::grpc_juno::EmptyRequest; use super::grpc_juno; @@ -10,13 +10,19 @@ use grpc_juno::GetFilesRequest; use tonic::transport::Channel; use tonic::Request; -#[derive(Debug, Default)] -pub struct GRPCClient {} +#[derive(Debug)] +pub struct GRPCClient { + address: SocketAddr, +} impl GRPCClient { + pub fn new(address: SocketAddr) -> Self { + Self { address } + } + async fn get_client(&self) -> Result, Box> { let client = - JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; + JunoServicesClient::connect(format!("http://{}", self.address.to_string())).await?; Ok(client) } @@ -81,21 +87,17 @@ impl GRPCClient { Ok(()) } - pub async fn get_files(&self) -> Result<(), Box> { + pub async fn get_files(&self, path: &PathBuf) -> Result<(), Box> { let mut client = self.get_client().await?; - if let Commands::GetFiles { path } = &CONFIG.command { - let request = Request::new(GetFilesRequest { - path: path.display().to_string(), - }); + let request = Request::new(GetFilesRequest { + path: path.display().to_string(), + }); - let response = client.get_files(request).await?.into_inner(); + let response = client.get_files(request).await?.into_inner(); - println!("RESPONSE={:?}", response.files); + println!("RESPONSE={:?}", response.files); - return Ok(()); - }; - - panic!("Error"); + return Ok(()); } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 631db4c..9b96ce1 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,42 +1,65 @@ -use crate::configuration::{Commands, CONFIG}; -use crate::file_explorer; +use crate::configuration::Commands; use super::grpc_juno; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; use std::error::Error; +use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tonic::transport::Server; use tonic::{Request, Response, Result, Status}; +type Responder = oneshot::Sender; + +#[derive(Debug)] +pub struct GrpcServerMessage { + pub command: Commands, + pub responder: Responder<()>, +} + #[derive(Debug, Default)] pub struct GRPCServer { - transmitter: Option>, + transmitter: Option>, } impl GRPCServer { - pub fn new(tx: Sender) -> Self { + pub fn new(tx: Sender) -> Self { Self { transmitter: Some(tx), } } - async fn send_message(&self, message: Commands) -> Result<(), Box> { + async fn send_message(&self, command: Commands) -> Result<(), Box> { + let (resp_tx, resp_rx) = oneshot::channel(); + + let message = GrpcServerMessage { + command, + responder: resp_tx, + }; + if let Some(tx) = &self.transmitter { tx.send(message).await?; + + let response = resp_rx.await?; + + return Ok(response); } Ok(()) } - pub async fn serve(tx: Sender) -> Result<(), Box> { - println!("Starting server on: \"{}\"", CONFIG.address.to_string()); + pub async fn serve( + address: SocketAddr, + tx: Sender, + ) -> Result<(), Box> { + println!("Starting server on: \"{}\"", address.to_string()); Server::builder() .add_service(JunoServicesServer::new(GRPCServer::new(tx))) - .serve(CONFIG.address) + .serve(address) .await?; Ok(()) @@ -63,9 +86,9 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files = match file_explorer::walk_dir(Some(&path)) { - Ok(files) => files, - Err(err) => return Err(Status::invalid_argument(err)), + let files: Vec = match self.send_message(Commands::GetFiles { path }).await { + Ok(()) => vec![], + Err(_err) => return Err(Status::internal("An internal error has occurred.")), }; let reply = GetFilesResponse { diff --git a/src/main.rs b/src/main.rs index 96362b0..1fbf9e3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::env; use std::error::Error; use std::time::Duration; use tokio::time::sleep; @@ -6,59 +7,74 @@ use tokio::sync::mpsc; use crate::player::Player; -use self::configuration::{Commands, ConfigMode, CONFIG}; +use self::configuration::{Commands, Config, ConfigMode}; mod configuration; mod file_explorer; mod grpc; mod player; -async fn init_server() -> Result<(), Box> { - let (tx, mut rx) = mpsc::channel::(32); +async fn init_server(config: Config) -> Result<(), Box> { + let (tx, mut rx) = mpsc::channel::(32); tokio::spawn(async move { - let _ = grpc::GRPCServer::serve(tx).await; + let _ = grpc::GRPCServer::serve(config.address, tx).await; }); - let mut player = Player::new().expect("Error creating player"); + let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + let mut volume = 1.0; + + if let Commands::Start { + base_path: config_path, + volume: config_volume, + } = config.command + { + base_path = config_path.to_owned(); + volume = config_volume; + }; + + let mut player = Player::new(base_path, volume).expect("Error creating player"); println!("Listening for incomming messages..."); // This macro will wait on multiple futures and will return when the first one resolves - tokio::select! { - Some(msg) = rx.recv() => { - if let Err(err) = player.handle_message(msg) { - eprintln!("Error handling player action: {}", err); + // TODO: create a break system for shutdown + loop { + tokio::select! { + Some(msg) = rx.recv() => { + // TODO: receive message from player and send it back to server so it can be sent to + // the client + if let Err(err) = player.handle_message(msg.command) { + eprintln!("Error handling player action: {}", err); + } + + if let Err(_) = msg.responder.send(()) { + eprintln!("Error responding to grpc server"); + } + } + _ = async { + loop { + let _ = player.handle_idle(); + sleep(Duration::from_millis(200)).await; + } + } => {} + else => { + println!("player stopped"); } } - _ = async { - loop { - let _ = player.handle_idle(); - sleep(Duration::from_millis(200)).await; - } - } => {println!("player stopped");} } - - // this traps the main thread, it should run last. - while let Some(msg) = rx.recv().await { - if let Err(err) = player.handle_message(msg) { - eprintln!("Error handling player action: {}", err); - } - } - - Ok(()) } -async fn init_client() -> Result<(), Box> { - let client = grpc::GRPCClient::default(); +async fn init_client(config: Config) -> Result<(), Box> { + let client = grpc::GRPCClient::new(config.address); - match &CONFIG.command { + match &config.command { Commands::Play => client.play().await?, Commands::Pause => client.pause().await?, Commands::PlayPause => client.play_pause().await?, Commands::SkipSong => client.skip_song().await?, Commands::Set => todo!(), - Commands::GetFiles { path: _ } => client.get_files().await?, + Commands::GetFiles { path } => client.get_files(&path).await?, Commands::Ping => client.ping().await?, _ => { println!("This command doesn't apply to client mode") @@ -70,9 +86,11 @@ async fn init_client() -> Result<(), Box> { #[tokio::main()] async fn main() -> Result<(), Box> { - match CONFIG.mode { - ConfigMode::Server => init_server().await?, - ConfigMode::Client => init_client().await?, + let config = Config::new(); + + match config.mode { + ConfigMode::Server => init_server(config).await?, + ConfigMode::Client => init_client(config).await?, }; Ok(()) diff --git a/src/player.rs b/src/player.rs index 7d8ae75..5df0207 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use rodio::{OutputStream, Sink}; -use crate::configuration::{self, CONFIG}; +use crate::configuration; use crate::file_explorer::walk_dir; #[allow(dead_code)] @@ -14,6 +14,7 @@ pub struct Player { queue: VecDeque, sink: Sink, stream: OutputStream, + base_dir: PathBuf, } impl std::ops::Deref for Player { @@ -25,17 +26,18 @@ impl std::ops::Deref for Player { } impl Player { - pub fn new() -> Result> { - let queue = walk_dir(None)?; + pub fn new(base_dir: PathBuf, volume: f32) -> Result> { + let queue = walk_dir(&base_dir)?; // stream needs to exist as long as sink to work let (stream, stream_handle) = OutputStream::try_default()?; let sink = Sink::try_new(&stream_handle)?; - sink.set_volume(CONFIG.volume); + sink.set_volume(volume); Ok(Player { queue: VecDeque::from(queue), sink, stream, + base_dir, }) } @@ -80,6 +82,10 @@ impl Player { Ok(()) } + fn get_files(path: &PathBuf) -> Result, Box> { + Ok(walk_dir(&path)?) + } + fn play(&mut self) { self.sink.play(); } From 0df5ed34e54259783acdf8b4e32fad6980f55681 Mon Sep 17 00:00:00 2001 From: aleidk Date: Thu, 25 Jul 2024 16:10:49 -0400 Subject: [PATCH 6/8] refactor(server): allow to send message between player and server --- src/grpc/server.rs | 117 +++++++++++++++++++++++++++++++-------------- src/main.rs | 39 +++++++++++---- src/player.rs | 55 +++++++++++---------- 3 files changed, 143 insertions(+), 68 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9b96ce1..1dcb0b4 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -14,10 +14,26 @@ use tonic::{Request, Response, Result, Status}; type Responder = oneshot::Sender; -#[derive(Debug)] -pub struct GrpcServerMessage { - pub command: Commands, - pub responder: Responder<()>, +pub enum GrpcServerMessage { + Play { + resp: Responder>, + }, + Pause { + resp: Responder>, + }, + PlayPause { + resp: Responder>, + }, + SkipSong { + resp: Responder>, + }, + Set { + resp: Responder>, + }, + GetFiles { + path: PathBuf, + resp: Responder>, + }, } #[derive(Debug, Default)] @@ -32,25 +48,6 @@ impl GRPCServer { } } - async fn send_message(&self, command: Commands) -> Result<(), Box> { - let (resp_tx, resp_rx) = oneshot::channel(); - - let message = GrpcServerMessage { - command, - responder: resp_tx, - }; - - if let Some(tx) = &self.transmitter { - tx.send(message).await?; - - let response = resp_rx.await?; - - return Ok(response); - } - - Ok(()) - } - pub async fn serve( address: SocketAddr, tx: Sender, @@ -86,10 +83,24 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files: Vec = match self.send_message(Commands::GetFiles { path }).await { - Ok(()) => vec![], - Err(_err) => return Err(Status::internal("An internal error has occurred.")), - }; + let mut files: Vec = vec![]; + + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::GetFiles { + resp: resp_tx, + path, + }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + files = match resp_rx.await { + Ok(response) => response, + Err(_err) => return Err(Status::internal("An internal error has occurred.")), + }; + } let reply = GetFilesResponse { files: files.iter().map(|x| x.display().to_string()).collect(), @@ -102,8 +113,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::Play).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Play { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -113,8 +133,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::Pause).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Pause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -124,8 +153,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::PlayPause).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::PlayPause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) @@ -135,8 +173,17 @@ impl JunoServices for GRPCServer { &self, _request: Request, ) -> Result, Status> { - if let Err(_err) = self.send_message(Commands::SkipSong).await { - return Err(Status::internal("An internal error has occurred.")); + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::SkipSong { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } } Ok(Response::new(EmptyResponse {})) diff --git a/src/main.rs b/src/main.rs index 1fbf9e3..51ab7a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,12 +8,41 @@ use tokio::sync::mpsc; use crate::player::Player; use self::configuration::{Commands, Config, ConfigMode}; +use self::grpc::server::GrpcServerMessage; mod configuration; mod file_explorer; mod grpc; mod player; +async fn handle_message(player: &mut Player, message: GrpcServerMessage) { + match message { + GrpcServerMessage::Play { resp } => { + player.play(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::Pause { resp } => { + player.pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::PlayPause { resp } => { + player.play_pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::SkipSong { resp } => { + let _ = match player.skip_song() { + Ok(_) => resp.send(Ok(())), + Err(err) => resp.send(Err(err.to_string())), + }; + } + GrpcServerMessage::Set { resp } => todo!(), + GrpcServerMessage::GetFiles { path, resp } => { + let files = player.get_files(&path).unwrap(); + let _ = resp.send(files); + } + } +} + async fn init_server(config: Config) -> Result<(), Box> { let (tx, mut rx) = mpsc::channel::(32); @@ -42,15 +71,7 @@ async fn init_server(config: Config) -> Result<(), Box> { loop { tokio::select! { Some(msg) = rx.recv() => { - // TODO: receive message from player and send it back to server so it can be sent to - // the client - if let Err(err) = player.handle_message(msg.command) { - eprintln!("Error handling player action: {}", err); - } - - if let Err(_) = msg.responder.send(()) { - eprintln!("Error responding to grpc server"); - } + handle_message(&mut player, msg).await; } _ = async { loop { diff --git a/src/player.rs b/src/player.rs index 5df0207..86ec0ff 100644 --- a/src/player.rs +++ b/src/player.rs @@ -41,24 +41,6 @@ impl Player { }) } - pub fn handle_message( - &mut self, - message: configuration::Commands, - ) -> Result<(), Box> { - match message { - configuration::Commands::Play => self.play(), - configuration::Commands::Pause => self.pause(), - configuration::Commands::PlayPause => self.play_pause(), - configuration::Commands::SkipSong => self.skip_song()?, - configuration::Commands::Set => todo!(), - _ => { - println!("This command doesn't apply to client mode") - } - }; - - Ok(()) - } - pub fn handle_idle(&mut self) -> Result<(), Box> { if self.sink.is_paused() { return Ok(()); @@ -82,19 +64,44 @@ impl Player { Ok(()) } - fn get_files(path: &PathBuf) -> Result, Box> { - Ok(walk_dir(&path)?) + pub fn get_files(&mut self, path: &PathBuf) -> Result, Box> { + // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + // + // match path { + // Some(dir) => { + // } + // None => search_path = base_path.to_owned(), + // } + // + // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside + // // base_dir + // if !search_path.starts_with(base_path) { + // return Err("Tried to access file or directory outside of server `base_path` config."); + // } + + let search_path = self + .base_dir + .join(path) + .canonicalize() + .expect("Couldn't canonicalizice the path"); + + // PathBuf.join() can override the hole path, this ensure we're not accessing files outside base_dir + if !search_path.starts_with(&self.base_dir) { + panic!("Tried to access file or directory outside of server `base_path` config.") + } + + Ok(walk_dir(&search_path)?) } - fn play(&mut self) { + pub fn play(&mut self) { self.sink.play(); } - fn pause(&mut self) { + pub fn pause(&mut self) { self.sink.pause(); } - fn play_pause(&self) { + pub fn play_pause(&self) { if self.sink.is_paused() { self.sink.play(); } else { @@ -102,7 +109,7 @@ impl Player { }; } - fn skip_song(&mut self) -> Result<(), Box> { + pub fn skip_song(&mut self) -> Result<(), Box> { println!("Skipping current song...:"); let file_path = self.queue.pop_front().expect("foo"); self.enqueue_file(file_path)?; From 462c480a45d3b7a96fbdb18aa6ecbbdb380147b3 Mon Sep 17 00:00:00 2001 From: aleidk Date: Fri, 26 Jul 2024 09:15:53 -0400 Subject: [PATCH 7/8] chore: remove cargo warnings --- src/grpc/server.rs | 1 - src/main.rs | 4 +++- src/player.rs | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 1dcb0b4..3443dbf 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,4 +1,3 @@ -use crate::configuration::Commands; use super::grpc_juno; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; diff --git a/src/main.rs b/src/main.rs index 51ab7a3..664e6fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,9 @@ async fn handle_message(player: &mut Player, message: GrpcServerMessage) { Err(err) => resp.send(Err(err.to_string())), }; } - GrpcServerMessage::Set { resp } => todo!(), + GrpcServerMessage::Set { resp } => { + let _ = resp.send(Ok(())); + } GrpcServerMessage::GetFiles { path, resp } => { let files = player.get_files(&path).unwrap(); let _ = resp.send(files); diff --git a/src/player.rs b/src/player.rs index 86ec0ff..ec9cc04 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,7 +6,6 @@ use std::path::PathBuf; use rodio::{OutputStream, Sink}; -use crate::configuration; use crate::file_explorer::walk_dir; #[allow(dead_code)] From d98b645377dea0a6a0e33f87bfbe2bc9fb67101a Mon Sep 17 00:00:00 2001 From: aleidk Date: Fri, 26 Jul 2024 16:28:09 -0400 Subject: [PATCH 8/8] refactor(player): change to dependency injection Use this pattern to decouple the dependencies and allow to test te code and introduce new dependencies in the future --- src/file_explorer.rs | 45 ------------------------------------- src/file_handler.rs | 34 ++++++++++++++++++++++++++++ src/main.rs | 9 +++++--- src/player.rs | 53 ++++++++++++++++++++++++++------------------ 4 files changed, 71 insertions(+), 70 deletions(-) delete mode 100644 src/file_explorer.rs create mode 100644 src/file_handler.rs diff --git a/src/file_explorer.rs b/src/file_explorer.rs deleted file mode 100644 index 682a44f..0000000 --- a/src/file_explorer.rs +++ /dev/null @@ -1,45 +0,0 @@ -use ignore::types::TypesBuilder; -use ignore::WalkBuilder; -use std::error::Error; -use std::path::PathBuf; - -pub fn walk_dir(path: &PathBuf) -> Result, Box> { - let mut types_builder = TypesBuilder::new(); - types_builder.add_defaults(); - - let accepted_filetypes = ["mp3", "flac", "wav"]; - - for filetype in accepted_filetypes { - let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); - } - - types_builder.select("sound"); - - // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); - // - // match path { - // Some(dir) => { - // search_path = base_path - // .join(dir) - // .canonicalize() - // .expect("Couldn't canonicalizice the path") - // } - // None => search_path = base_path.to_owned(), - // } - // - // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside - // // base_dir - // if !search_path.starts_with(base_path) { - // return Err("Tried to access file or directory outside of server `base_path` config."); - // } - - let entries: Vec = WalkBuilder::new(path) - .types(types_builder.build().unwrap()) - .build() - .filter_map(|entry| entry.ok()) - .filter(|entry| !entry.path().is_dir()) - .map(|entry| entry.path().to_path_buf()) - .collect(); - - Ok(entries) -} diff --git a/src/file_handler.rs b/src/file_handler.rs new file mode 100644 index 0000000..ad6769b --- /dev/null +++ b/src/file_handler.rs @@ -0,0 +1,34 @@ +use ignore::types::TypesBuilder; +use ignore::WalkBuilder; +use std::path::PathBuf; + +pub trait FileExplorer { + fn get_files(path: &PathBuf) -> Vec; +} + +pub struct LocalFileSystem; + +impl FileExplorer for LocalFileSystem { + fn get_files(path: &PathBuf) -> Vec { + let mut types_builder = TypesBuilder::new(); + types_builder.add_defaults(); + + let accepted_filetypes = ["mp3", "flac", "wav"]; + + for filetype in accepted_filetypes { + let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); + } + + types_builder.select("sound"); + + let entries: Vec = WalkBuilder::new(path) + .types(types_builder.build().unwrap()) + .build() + .filter_map(|entry| entry.ok()) + .filter(|entry| !entry.path().is_dir()) + .map(|entry| entry.path().to_path_buf()) + .collect(); + + entries + } +} diff --git a/src/main.rs b/src/main.rs index 664e6fa..1cc5604 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,14 +8,15 @@ use tokio::sync::mpsc; use crate::player::Player; use self::configuration::{Commands, Config, ConfigMode}; +use self::file_handler::{FileExplorer, LocalFileSystem}; use self::grpc::server::GrpcServerMessage; mod configuration; -mod file_explorer; +mod file_handler; mod grpc; mod player; -async fn handle_message(player: &mut Player, message: GrpcServerMessage) { +async fn handle_message(player: &mut Player, message: GrpcServerMessage) { match message { GrpcServerMessage::Play { resp } => { player.play(); @@ -64,7 +65,9 @@ async fn init_server(config: Config) -> Result<(), Box> { volume = config_volume; }; - let mut player = Player::new(base_path, volume).expect("Error creating player"); + let mut player = Player::new(LocalFileSystem, base_path).expect("Error creating player"); + + player.set_volume(volume); println!("Listening for incomming messages..."); diff --git a/src/player.rs b/src/player.rs index ec9cc04..6354791 100644 --- a/src/player.rs +++ b/src/player.rs @@ -6,17 +6,18 @@ use std::path::PathBuf; use rodio::{OutputStream, Sink}; -use crate::file_explorer::walk_dir; +use crate::file_handler::FileExplorer; #[allow(dead_code)] -pub struct Player { +pub struct Player { queue: VecDeque, sink: Sink, stream: OutputStream, base_dir: PathBuf, + explorer: T, } -impl std::ops::Deref for Player { +impl std::ops::Deref for Player { type Target = Sink; fn deref(&self) -> &Self::Target { @@ -24,19 +25,19 @@ impl std::ops::Deref for Player { } } -impl Player { - pub fn new(base_dir: PathBuf, volume: f32) -> Result> { - let queue = walk_dir(&base_dir)?; +impl Player { + pub fn new(explorer: T, base_dir: PathBuf) -> Result> { + let queue = T::get_files(&base_dir); // stream needs to exist as long as sink to work let (stream, stream_handle) = OutputStream::try_default()?; let sink = Sink::try_new(&stream_handle)?; - sink.set_volume(volume); Ok(Player { queue: VecDeque::from(queue), sink, stream, base_dir, + explorer, }) } @@ -64,20 +65,6 @@ impl Player { } pub fn get_files(&mut self, path: &PathBuf) -> Result, Box> { - // let mut base_path = env::current_dir().expect("Error accesing the enviroment"); - // - // match path { - // Some(dir) => { - // } - // None => search_path = base_path.to_owned(), - // } - // - // // PathBuf.join() can override the hole path, this ensure we're not accessing files outside - // // base_dir - // if !search_path.starts_with(base_path) { - // return Err("Tried to access file or directory outside of server `base_path` config."); - // } - let search_path = self .base_dir .join(path) @@ -89,7 +76,7 @@ impl Player { panic!("Tried to access file or directory outside of server `base_path` config.") } - Ok(walk_dir(&search_path)?) + Ok(T::get_files(&search_path)) } pub fn play(&mut self) { @@ -124,4 +111,26 @@ impl Player { self.sink.append(rodio::Decoder::new(BufReader::new(file))?); Ok(()) } + + pub fn set_volume(&self, volume: f32) { + self.sink.set_volume(volume); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct MockFileExplorer; + + impl FileExplorer for MockFileExplorer { + fn get_files(_: &PathBuf) -> Vec { + return vec![]; + } + } + + #[test] + fn player_works() { + let _ = Player::new(MockFileExplorer, PathBuf::from(".")).expect("Error creating player"); + } }