diff --git a/Cargo.toml b/Cargo.toml index 4feab30..b8a5deb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,11 +5,9 @@ edition = "2021" [dependencies] clap = { version = "4.5.4", features = ["derive"] } -futures = "0.3.30" ignore = "0.4.22" lazy_static = "1.4.0" prost = "0.12.4" -rodio = "0.17.3" tokio = { version = "1", features = ["full"] } tonic = "0.11.0" diff --git a/README.md b/README.md index 94dff6e..202e32b 100644 --- a/README.md +++ b/README.md @@ -48,24 +48,6 @@ The project was split into 2 though: - [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player. - [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player. -## Contributing - -Aside from rust and cargo, you need the following dependencies: - -- alsa-lib devel package -```bash -# fedora -dnf install -y alsa-lib-devel - -``` - -- Tonic protobuf dependencies -```bash -# fedora -dnf install -y protobuf-devel - -``` - ## Similar projects - [Navidrome](https://www.navidrome.org) diff --git a/proto/juno.proto b/proto/juno.proto index d2870d7..aad85d6 100644 --- a/proto/juno.proto +++ b/proto/juno.proto @@ -2,27 +2,15 @@ syntax = "proto3"; package juno; -service JunoServices { - rpc Ping (EmptyRequest) returns (PingResponse); +service JunoRequest { + rpc Ping (PingRequestMessage) returns (PingResponseMessage); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); - rpc SkipSong (EmptyRequest) returns (EmptyResponse); - rpc Play (EmptyRequest) returns (EmptyResponse); - rpc Pause (EmptyRequest) returns (EmptyResponse); - rpc PlayPause (EmptyRequest) returns (EmptyResponse); } -enum Status { - SUCCESS = 0; - ERROR = 1; +message PingRequestMessage { } -message EmptyRequest { -} - -message EmptyResponse { -} - -message PingResponse { +message PingResponseMessage { string message = 1; } diff --git a/src/configuration.rs b/src/configuration.rs index edbf4dc..73bcf2b 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,91 +1,41 @@ -use clap::{Parser, Subcommand}; -use std::net::SocketAddr; +use clap::Parser; +use lazy_static::lazy_static; +use std::env; use std::path::PathBuf; -use std::str::FromStr; -use crate::grpc; - -#[derive(Debug)] -pub enum ConfigMode { - Server, - Client, -} - -#[derive(Subcommand, Debug, Clone)] -pub enum Commands { - /// Start the GRPC server - Start { - #[arg(help = "Directory to scan for files", default_value = ".")] - base_path: PathBuf, - #[arg( - long, - help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", - default_value = "1.0" - )] - volume: f32, - }, - /// Resume the playback - Play, - /// Pause the playback - Pause, - /// Resume the playback if pause, pause if is playing - PlayPause, - /// Skip the current song - SkipSong, - Set, - /// List the available files - GetFiles { - #[arg( - help = "Directory to scan for files, relative to server base path", - default_value = "." - )] - path: PathBuf, - }, - /// Test server connection - Ping, +lazy_static! { + pub static ref CONFIG: Config = Config::new(); } #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { - #[command(subcommand)] - cmd: Commands, - - #[arg(short, long, help = "the port to bind to", default_value = "50051")] - port: u16, + #[arg(help = "Directory to scan for files")] + path: Option, } #[derive(Debug)] pub struct Config { - pub command: Commands, - pub address: SocketAddr, - pub mode: ConfigMode, - pub volume: f32, + pub base_path: PathBuf, } impl Default for Config { fn default() -> Self { Config { - command: Commands::Play, - mode: ConfigMode::Server, - address: SocketAddr::from_str("[::1]:50051").unwrap(), - volume: 1.0, + base_path: env::current_dir().expect("Current directory is not available."), } } } + impl Config { pub fn new() -> Self { + let mut config = Self::default(); + let cli = Self::get_cli_args(); - let mut config = Self::default(); - config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); - config.command = cli.cmd; - - if grpc::is_socket_in_use(config.address) { - config.mode = ConfigMode::Client; - } else { - config.mode = ConfigMode::Server; - }; + if let Some(path) = cli.path { + config.base_path = path; + } config } diff --git a/src/file_explorer.rs b/src/file_explorer.rs new file mode 100644 index 0000000..e932592 --- /dev/null +++ b/src/file_explorer.rs @@ -0,0 +1,40 @@ +use ignore::types::TypesBuilder; +use ignore::WalkBuilder; +use std::path::PathBuf; + +use crate::configuration::CONFIG; + +pub fn walk_dir(path: &PathBuf) -> Result, &str> { + let mut types_builder = TypesBuilder::new(); + types_builder.add_defaults(); + + let accepted_filetypes = ["mp3", "flac"]; + + for filetype in accepted_filetypes { + let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); + } + + types_builder.select("sound"); + + let search_path = CONFIG.base_path.join(path); + eprintln!( + "DEBUGPRINT[1]: file_explorer.rs:19: search_path={:#?}", + search_path + ); + + // PathBuf.join() can override the hole path, this ensure we're not accessing files outside + // base_dir + if !search_path.starts_with(&CONFIG.base_path) { + return Err("Tried to access file or directory outside of server `base_dir` config."); + } + + let entries: Vec = WalkBuilder::new(search_path) + .types(types_builder.build().unwrap()) + .build() + .filter_map(|entry| entry.ok()) + .filter(|entry| !entry.path().is_dir()) + .map(|entry| entry.path().to_path_buf()) + .collect(); + + Ok(entries) +} diff --git a/src/file_handler.rs b/src/file_handler.rs deleted file mode 100644 index ad6769b..0000000 --- a/src/file_handler.rs +++ /dev/null @@ -1,34 +0,0 @@ -use ignore::types::TypesBuilder; -use ignore::WalkBuilder; -use std::path::PathBuf; - -pub trait FileExplorer { - fn get_files(path: &PathBuf) -> Vec; -} - -pub struct LocalFileSystem; - -impl FileExplorer for LocalFileSystem { - fn get_files(path: &PathBuf) -> Vec { - let mut types_builder = TypesBuilder::new(); - types_builder.add_defaults(); - - let accepted_filetypes = ["mp3", "flac", "wav"]; - - for filetype in accepted_filetypes { - let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); - } - - types_builder.select("sound"); - - let entries: Vec = WalkBuilder::new(path) - .types(types_builder.build().unwrap()) - .build() - .filter_map(|entry| entry.ok()) - .filter(|entry| !entry.path().is_dir()) - .map(|entry| entry.path().to_path_buf()) - .collect(); - - entries - } -} diff --git a/src/grpc.rs b/src/grpc.rs index 57f24b3..77e1b60 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,19 +1,37 @@ +use std::error::Error; use std::net::{SocketAddr, TcpListener}; -pub use self::client::GRPCClient; -pub use self::server::GRPCServer; +use tonic::async_trait; + +use self::client::GRPCClient; +use self::server::GRPCServer; mod client; -pub mod server; +mod server; pub mod grpc_juno { tonic::include_proto!("juno"); } -/// Return true if the addr is already in use, false otherwise -pub fn is_socket_in_use(addr: SocketAddr) -> bool { - match TcpListener::bind(addr) { - Ok(_) => false, - Err(_) => true, +#[async_trait] +pub trait Connection { + async fn connect(&self) -> Result<(), Box>; +} + +fn is_socket_in_use(addr: String) -> bool { + let socket: SocketAddr = addr.parse().expect("Failed to create socket"); + match TcpListener::bind(socket) { + Ok(_) => true, + Err(_) => false, + } +} + +pub fn run() -> Result, Box> { + let addr = "[::1]:50051"; + + if is_socket_in_use(addr.to_string()) { + Ok(Box::new(GRPCServer::new(addr.to_string()))) + } else { + Ok(Box::new(GRPCClient::new(addr.to_string()))) } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 435c176..b5026b1 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,103 +1,34 @@ -use std::net::SocketAddr; -use std::path::PathBuf; - -use crate::grpc::grpc_juno::EmptyRequest; - use super::grpc_juno; -use grpc_juno::juno_services_client::JunoServicesClient; +use grpc_juno::juno_request_client::JunoRequestClient; use grpc_juno::GetFilesRequest; -use tonic::transport::Channel; +use tonic::async_trait; use tonic::Request; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct GRPCClient { - address: SocketAddr, + address: String, } impl GRPCClient { - pub fn new(address: SocketAddr) -> Self { + pub fn new(address: String) -> Self { Self { address } } +} - async fn get_client(&self) -> Result, Box> { - let client = - JunoServicesClient::connect(format!("http://{}", self.address.to_string())).await?; - - Ok(client) - } - - pub async fn ping(&self) -> Result<(), Box> { - let mut client = self.get_client().await?; - - let request = Request::new(EmptyRequest {}); - - let response = client.ping(request).await?.into_inner(); - - println!("RESPONSE={:?}", response); - - Ok(()) - } - - pub async fn play(&self) -> Result<(), Box> { - let mut client = self.get_client().await?; - - let request = Request::new(EmptyRequest {}); - - let response = client.play(request).await?.into_inner(); - - println!("RESPONSE={:?}", response); - - Ok(()) - } - - pub async fn pause(&self) -> Result<(), Box> { - let mut client = self.get_client().await?; - - let request = Request::new(EmptyRequest {}); - - let response = client.pause(request).await?.into_inner(); - - println!("RESPONSE={:?}", response); - - Ok(()) - } - - pub async fn play_pause(&self) -> Result<(), Box> { - let mut client = self.get_client().await?; - - let request = Request::new(EmptyRequest {}); - - let response = client.play_pause(request).await?.into_inner(); - - println!("RESPONSE={:?}", response); - - Ok(()) - } - - pub async fn skip_song(&self) -> Result<(), Box> { - let mut client = self.get_client().await?; - - let request = Request::new(EmptyRequest {}); - - let response = client.skip_song(request).await?.into_inner(); - - println!("RESPONSE={:?}", response); - - Ok(()) - } - - pub async fn get_files(&self, path: &PathBuf) -> Result<(), Box> { - let mut client = self.get_client().await?; +#[async_trait] +impl super::Connection for GRPCClient { + async fn connect(&self) -> Result<(), Box> { + let mut client = JunoRequestClient::connect(format!("http://{}", self.address)).await?; let request = Request::new(GetFilesRequest { - path: path.display().to_string(), + path: "/home/aleidk/Documents/".to_string(), }); let response = client.get_files(request).await?.into_inner(); println!("RESPONSE={:?}", response.files); - return Ok(()); + Ok(()) } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 3443dbf..9b1363b 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,74 +1,33 @@ +use crate::file_explorer; use super::grpc_juno; -use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; -use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; +use grpc_juno::juno_request_server::{JunoRequest, JunoRequestServer}; +use grpc_juno::{GetFilesRequest, GetFilesResponse, PingRequestMessage, PingResponseMessage}; use std::error::Error; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; use tonic::transport::Server; -use tonic::{Request, Response, Result, Status}; - -type Responder = oneshot::Sender; - -pub enum GrpcServerMessage { - Play { - resp: Responder>, - }, - Pause { - resp: Responder>, - }, - PlayPause { - resp: Responder>, - }, - SkipSong { - resp: Responder>, - }, - Set { - resp: Responder>, - }, - GetFiles { - path: PathBuf, - resp: Responder>, - }, -} +use tonic::{async_trait, Request, Response, Result, Status}; #[derive(Debug, Default)] pub struct GRPCServer { - transmitter: Option>, + address: String, } impl GRPCServer { - pub fn new(tx: Sender) -> Self { - Self { - transmitter: Some(tx), - } - } - - pub async fn serve( - address: SocketAddr, - tx: Sender, - ) -> Result<(), Box> { - println!("Starting server on: \"{}\"", address.to_string()); - - Server::builder() - .add_service(JunoServicesServer::new(GRPCServer::new(tx))) - .serve(address) - .await?; - - Ok(()) + pub fn new(address: String) -> Self { + Self { address } } } #[tonic::async_trait] -impl JunoServices for GRPCServer { +impl JunoRequest for GRPCServer { async fn ping( &self, - _request: Request, - ) -> Result, Status> { - let reply = PingResponse { + _request: Request, + ) -> Result, Status> { + let reply = PingResponseMessage { message: "pong!".to_string(), }; @@ -82,24 +41,10 @@ impl JunoServices for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let mut files: Vec = vec![]; - - if let Some(tx) = &self.transmitter { - let (resp_tx, resp_rx) = oneshot::channel(); - let message = GrpcServerMessage::GetFiles { - resp: resp_tx, - path, - }; - - if let Err(_err) = tx.send(message).await { - return Err(Status::internal("An internal error has occurred.")); - } - - files = match resp_rx.await { - Ok(response) => response, - Err(_err) => return Err(Status::internal("An internal error has occurred.")), - }; - } + let files = match file_explorer::walk_dir(&path) { + Ok(files) => files, + Err(err) => return Err(Status::invalid_argument(err)), + }; let reply = GetFilesResponse { files: files.iter().map(|x| x.display().to_string()).collect(), @@ -107,84 +52,20 @@ impl JunoServices for GRPCServer { Ok(Response::new(reply)) } +} - async fn play( - &self, - _request: Request, - ) -> Result, Status> { - if let Some(tx) = &self.transmitter { - let (resp_tx, resp_rx) = oneshot::channel(); - let message = GrpcServerMessage::Play { resp: resp_tx }; +#[async_trait] +impl super::Connection for GRPCServer { + async fn connect(&self) -> Result<(), Box> { + println!("Starting server on: \"{}\"", self.address); - if let Err(_err) = tx.send(message).await { - return Err(Status::internal("An internal error has occurred.")); - } + let socket: SocketAddr = self.address.parse()?; - if let Err(_err) = resp_rx.await { - return Err(Status::internal("An internal error has occurred.")); - } - } + Server::builder() + .add_service(JunoRequestServer::new(GRPCServer::default())) + .serve(socket) + .await?; - Ok(Response::new(EmptyResponse {})) - } - - async fn pause( - &self, - _request: Request, - ) -> Result, Status> { - if let Some(tx) = &self.transmitter { - let (resp_tx, resp_rx) = oneshot::channel(); - let message = GrpcServerMessage::Pause { resp: resp_tx }; - - if let Err(_err) = tx.send(message).await { - return Err(Status::internal("An internal error has occurred.")); - } - - if let Err(_err) = resp_rx.await { - return Err(Status::internal("An internal error has occurred.")); - } - } - - Ok(Response::new(EmptyResponse {})) - } - - async fn play_pause( - &self, - _request: Request, - ) -> Result, Status> { - if let Some(tx) = &self.transmitter { - let (resp_tx, resp_rx) = oneshot::channel(); - let message = GrpcServerMessage::PlayPause { resp: resp_tx }; - - if let Err(_err) = tx.send(message).await { - return Err(Status::internal("An internal error has occurred.")); - } - - if let Err(_err) = resp_rx.await { - return Err(Status::internal("An internal error has occurred.")); - } - } - - Ok(Response::new(EmptyResponse {})) - } - - async fn skip_song( - &self, - _request: Request, - ) -> Result, Status> { - if let Some(tx) = &self.transmitter { - let (resp_tx, resp_rx) = oneshot::channel(); - let message = GrpcServerMessage::SkipSong { resp: resp_tx }; - - if let Err(_err) = tx.send(message).await { - return Err(Status::internal("An internal error has occurred.")); - } - - if let Err(_err) = resp_rx.await { - return Err(Status::internal("An internal error has occurred.")); - } - } - - Ok(Response::new(EmptyResponse {})) + Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 1cc5604..dee7a89 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,123 +1,14 @@ -use std::env; use std::error::Error; -use std::time::Duration; -use tokio::time::sleep; - -use tokio::sync::mpsc; - -use crate::player::Player; - -use self::configuration::{Commands, Config, ConfigMode}; -use self::file_handler::{FileExplorer, LocalFileSystem}; -use self::grpc::server::GrpcServerMessage; mod configuration; -mod file_handler; +mod file_explorer; mod grpc; -mod player; - -async fn handle_message(player: &mut Player, message: GrpcServerMessage) { - match message { - GrpcServerMessage::Play { resp } => { - player.play(); - let _ = resp.send(Ok(())); - } - GrpcServerMessage::Pause { resp } => { - player.pause(); - let _ = resp.send(Ok(())); - } - GrpcServerMessage::PlayPause { resp } => { - player.play_pause(); - let _ = resp.send(Ok(())); - } - GrpcServerMessage::SkipSong { resp } => { - let _ = match player.skip_song() { - Ok(_) => resp.send(Ok(())), - Err(err) => resp.send(Err(err.to_string())), - }; - } - GrpcServerMessage::Set { resp } => { - let _ = resp.send(Ok(())); - } - GrpcServerMessage::GetFiles { path, resp } => { - let files = player.get_files(&path).unwrap(); - let _ = resp.send(files); - } - } -} - -async fn init_server(config: Config) -> Result<(), Box> { - let (tx, mut rx) = mpsc::channel::(32); - - tokio::spawn(async move { - let _ = grpc::GRPCServer::serve(config.address, tx).await; - }); - - let mut base_path = env::current_dir().expect("Error accesing the enviroment"); - let mut volume = 1.0; - - if let Commands::Start { - base_path: config_path, - volume: config_volume, - } = config.command - { - base_path = config_path.to_owned(); - volume = config_volume; - }; - - let mut player = Player::new(LocalFileSystem, base_path).expect("Error creating player"); - - player.set_volume(volume); - - println!("Listening for incomming messages..."); - - // This macro will wait on multiple futures and will return when the first one resolves - // TODO: create a break system for shutdown - loop { - tokio::select! { - Some(msg) = rx.recv() => { - handle_message(&mut player, msg).await; - } - _ = async { - loop { - let _ = player.handle_idle(); - sleep(Duration::from_millis(200)).await; - } - } => {} - else => { - println!("player stopped"); - } - } - } -} - -async fn init_client(config: Config) -> Result<(), Box> { - let client = grpc::GRPCClient::new(config.address); - - match &config.command { - Commands::Play => client.play().await?, - Commands::Pause => client.pause().await?, - Commands::PlayPause => client.play_pause().await?, - Commands::SkipSong => client.skip_song().await?, - Commands::Set => todo!(), - Commands::GetFiles { path } => client.get_files(&path).await?, - Commands::Ping => client.ping().await?, - _ => { - println!("This command doesn't apply to client mode") - } - } - - Ok(()) -} #[tokio::main()] async fn main() -> Result<(), Box> { - let config = Config::new(); + let server = grpc::run()?; - match config.mode { - ConfigMode::Server => init_server(config).await?, - ConfigMode::Client => init_client(config).await?, - }; + server.connect().await?; Ok(()) } diff --git a/src/player.rs b/src/player.rs deleted file mode 100644 index 6354791..0000000 --- a/src/player.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::collections::VecDeque; -use std::error::Error; -use std::fs::File; -use std::io::BufReader; -use std::path::PathBuf; - -use rodio::{OutputStream, Sink}; - -use crate::file_handler::FileExplorer; - -#[allow(dead_code)] -pub struct Player { - queue: VecDeque, - sink: Sink, - stream: OutputStream, - base_dir: PathBuf, - explorer: T, -} - -impl std::ops::Deref for Player { - type Target = Sink; - - fn deref(&self) -> &Self::Target { - &self.sink - } -} - -impl Player { - pub fn new(explorer: T, base_dir: PathBuf) -> Result> { - let queue = T::get_files(&base_dir); - // stream needs to exist as long as sink to work - let (stream, stream_handle) = OutputStream::try_default()?; - let sink = Sink::try_new(&stream_handle)?; - - Ok(Player { - queue: VecDeque::from(queue), - sink, - stream, - base_dir, - explorer, - }) - } - - pub fn handle_idle(&mut self) -> Result<(), Box> { - if self.sink.is_paused() { - return Ok(()); - } - - if self.queue.len() == 0 { - return Ok(()); - } - - if self.sink.len() != 0 { - return Ok(()); - } - - let file_path = self - .queue - .pop_front() - .expect("There was an error with the queue"); - - self.enqueue_file(file_path)?; - - Ok(()) - } - - pub fn get_files(&mut self, path: &PathBuf) -> Result, Box> { - let search_path = self - .base_dir - .join(path) - .canonicalize() - .expect("Couldn't canonicalizice the path"); - - // PathBuf.join() can override the hole path, this ensure we're not accessing files outside base_dir - if !search_path.starts_with(&self.base_dir) { - panic!("Tried to access file or directory outside of server `base_path` config.") - } - - Ok(T::get_files(&search_path)) - } - - pub fn play(&mut self) { - self.sink.play(); - } - - pub fn pause(&mut self) { - self.sink.pause(); - } - - pub fn play_pause(&self) { - if self.sink.is_paused() { - self.sink.play(); - } else { - self.sink.pause(); - }; - } - - pub fn skip_song(&mut self) -> Result<(), Box> { - println!("Skipping current song...:"); - let file_path = self.queue.pop_front().expect("foo"); - self.enqueue_file(file_path)?; - self.sink.skip_one(); - - Ok(()) - } - - fn enqueue_file(&self, file_path: PathBuf) -> Result<(), Box> { - println!("Playing file: {}", file_path.display()); - let file = File::open(file_path)?; - - self.sink.append(rodio::Decoder::new(BufReader::new(file))?); - Ok(()) - } - - pub fn set_volume(&self, volume: f32) { - self.sink.set_volume(volume); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - struct MockFileExplorer; - - impl FileExplorer for MockFileExplorer { - fn get_files(_: &PathBuf) -> Vec { - return vec![]; - } - } - - #[test] - fn player_works() { - let _ = Player::new(MockFileExplorer, PathBuf::from(".")).expect("Error creating player"); - } -}