diff --git a/Cargo.toml b/Cargo.toml index b8a5deb..4feab30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ edition = "2021" [dependencies] clap = { version = "4.5.4", features = ["derive"] } +futures = "0.3.30" ignore = "0.4.22" lazy_static = "1.4.0" prost = "0.12.4" +rodio = "0.17.3" tokio = { version = "1", features = ["full"] } tonic = "0.11.0" diff --git a/README.md b/README.md index 202e32b..94dff6e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,24 @@ The project was split into 2 though: - [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player. - [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player. +## Contributing + +Aside from rust and cargo, you need the following dependencies: + +- alsa-lib devel package +```bash +# fedora +dnf install -y alsa-lib-devel + +``` + +- Tonic protobuf dependencies +```bash +# fedora +dnf install -y protobuf-devel + +``` + ## Similar projects - [Navidrome](https://www.navidrome.org) diff --git a/proto/juno.proto b/proto/juno.proto index aad85d6..d2870d7 100644 --- a/proto/juno.proto +++ b/proto/juno.proto @@ -2,15 +2,27 @@ syntax = "proto3"; package juno; -service JunoRequest { - rpc Ping (PingRequestMessage) returns (PingResponseMessage); +service JunoServices { + rpc Ping (EmptyRequest) returns (PingResponse); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); + rpc SkipSong (EmptyRequest) returns (EmptyResponse); + rpc Play (EmptyRequest) returns (EmptyResponse); + rpc Pause (EmptyRequest) returns (EmptyResponse); + rpc PlayPause (EmptyRequest) returns (EmptyResponse); } -message PingRequestMessage { +enum Status { + SUCCESS = 0; + ERROR = 1; } -message PingResponseMessage { +message EmptyRequest { +} + +message EmptyResponse { +} + +message PingResponse { string message = 1; } diff --git a/src/configuration.rs b/src/configuration.rs index 73bcf2b..edbf4dc 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,41 +1,91 @@ -use clap::Parser; -use lazy_static::lazy_static; -use std::env; +use clap::{Parser, Subcommand}; +use std::net::SocketAddr; use std::path::PathBuf; +use std::str::FromStr; -lazy_static! { - pub static ref CONFIG: Config = Config::new(); +use crate::grpc; + +#[derive(Debug)] +pub enum ConfigMode { + Server, + Client, +} + +#[derive(Subcommand, Debug, Clone)] +pub enum Commands { + /// Start the GRPC server + Start { + #[arg(help = "Directory to scan for files", default_value = ".")] + base_path: PathBuf, + #[arg( + long, + help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", + default_value = "1.0" + )] + volume: f32, + }, + /// Resume the playback + Play, + /// Pause the playback + Pause, + /// Resume the playback if pause, pause if is playing + PlayPause, + /// Skip the current song + SkipSong, + Set, + /// List the available files + GetFiles { + #[arg( + help = "Directory to scan for files, relative to server base path", + default_value = "." + )] + path: PathBuf, + }, + /// Test server connection + Ping, } #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { - #[arg(help = "Directory to scan for files")] - path: Option, + #[command(subcommand)] + cmd: Commands, + + #[arg(short, long, help = "the port to bind to", default_value = "50051")] + port: u16, } #[derive(Debug)] pub struct Config { - pub base_path: PathBuf, + pub command: Commands, + pub address: SocketAddr, + pub mode: ConfigMode, + pub volume: f32, } impl Default for Config { fn default() -> Self { Config { - base_path: env::current_dir().expect("Current directory is not available."), + command: Commands::Play, + mode: ConfigMode::Server, + address: SocketAddr::from_str("[::1]:50051").unwrap(), + volume: 1.0, } } } - impl Config { pub fn new() -> Self { - let mut config = Self::default(); - let cli = Self::get_cli_args(); - if let Some(path) = cli.path { - config.base_path = path; - } + let mut config = Self::default(); + config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); + config.command = cli.cmd; + + if grpc::is_socket_in_use(config.address) { + config.mode = ConfigMode::Client; + } else { + config.mode = ConfigMode::Server; + }; config } diff --git a/src/file_explorer.rs b/src/file_explorer.rs deleted file mode 100644 index e932592..0000000 --- a/src/file_explorer.rs +++ /dev/null @@ -1,40 +0,0 @@ -use ignore::types::TypesBuilder; -use ignore::WalkBuilder; -use std::path::PathBuf; - -use crate::configuration::CONFIG; - -pub fn walk_dir(path: &PathBuf) -> Result, &str> { - let mut types_builder = TypesBuilder::new(); - types_builder.add_defaults(); - - let accepted_filetypes = ["mp3", "flac"]; - - for filetype in accepted_filetypes { - let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); - } - - types_builder.select("sound"); - - let search_path = CONFIG.base_path.join(path); - eprintln!( - "DEBUGPRINT[1]: file_explorer.rs:19: search_path={:#?}", - search_path - ); - - // PathBuf.join() can override the hole path, this ensure we're not accessing files outside - // base_dir - if !search_path.starts_with(&CONFIG.base_path) { - return Err("Tried to access file or directory outside of server `base_dir` config."); - } - - let entries: Vec = WalkBuilder::new(search_path) - .types(types_builder.build().unwrap()) - .build() - .filter_map(|entry| entry.ok()) - .filter(|entry| !entry.path().is_dir()) - .map(|entry| entry.path().to_path_buf()) - .collect(); - - Ok(entries) -} diff --git a/src/file_handler.rs b/src/file_handler.rs new file mode 100644 index 0000000..ad6769b --- /dev/null +++ b/src/file_handler.rs @@ -0,0 +1,34 @@ +use ignore::types::TypesBuilder; +use ignore::WalkBuilder; +use std::path::PathBuf; + +pub trait FileExplorer { + fn get_files(path: &PathBuf) -> Vec; +} + +pub struct LocalFileSystem; + +impl FileExplorer for LocalFileSystem { + fn get_files(path: &PathBuf) -> Vec { + let mut types_builder = TypesBuilder::new(); + types_builder.add_defaults(); + + let accepted_filetypes = ["mp3", "flac", "wav"]; + + for filetype in accepted_filetypes { + let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); + } + + types_builder.select("sound"); + + let entries: Vec = WalkBuilder::new(path) + .types(types_builder.build().unwrap()) + .build() + .filter_map(|entry| entry.ok()) + .filter(|entry| !entry.path().is_dir()) + .map(|entry| entry.path().to_path_buf()) + .collect(); + + entries + } +} diff --git a/src/grpc.rs b/src/grpc.rs index 77e1b60..57f24b3 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,37 +1,19 @@ -use std::error::Error; use std::net::{SocketAddr, TcpListener}; -use tonic::async_trait; - -use self::client::GRPCClient; -use self::server::GRPCServer; +pub use self::client::GRPCClient; +pub use self::server::GRPCServer; mod client; -mod server; +pub mod server; pub mod grpc_juno { tonic::include_proto!("juno"); } -#[async_trait] -pub trait Connection { - async fn connect(&self) -> Result<(), Box>; -} - -fn is_socket_in_use(addr: String) -> bool { - let socket: SocketAddr = addr.parse().expect("Failed to create socket"); - match TcpListener::bind(socket) { - Ok(_) => true, - Err(_) => false, - } -} - -pub fn run() -> Result, Box> { - let addr = "[::1]:50051"; - - if is_socket_in_use(addr.to_string()) { - Ok(Box::new(GRPCServer::new(addr.to_string()))) - } else { - Ok(Box::new(GRPCClient::new(addr.to_string()))) +/// Return true if the addr is already in use, false otherwise +pub fn is_socket_in_use(addr: SocketAddr) -> bool { + match TcpListener::bind(addr) { + Ok(_) => false, + Err(_) => true, } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index b5026b1..435c176 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,34 +1,103 @@ +use std::net::SocketAddr; +use std::path::PathBuf; + +use crate::grpc::grpc_juno::EmptyRequest; + use super::grpc_juno; -use grpc_juno::juno_request_client::JunoRequestClient; +use grpc_juno::juno_services_client::JunoServicesClient; use grpc_juno::GetFilesRequest; -use tonic::async_trait; +use tonic::transport::Channel; use tonic::Request; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct GRPCClient { - address: String, + address: SocketAddr, } impl GRPCClient { - pub fn new(address: String) -> Self { + pub fn new(address: SocketAddr) -> Self { Self { address } } -} -#[async_trait] -impl super::Connection for GRPCClient { - async fn connect(&self) -> Result<(), Box> { - let mut client = JunoRequestClient::connect(format!("http://{}", self.address)).await?; + async fn get_client(&self) -> Result, Box> { + let client = + JunoServicesClient::connect(format!("http://{}", self.address.to_string())).await?; + + Ok(client) + } + + pub async fn ping(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.ping(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn play(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.play(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn pause(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.pause(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn play_pause(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.play_pause(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn skip_song(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.skip_song(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn get_files(&self, path: &PathBuf) -> Result<(), Box> { + let mut client = self.get_client().await?; let request = Request::new(GetFilesRequest { - path: "/home/aleidk/Documents/".to_string(), + path: path.display().to_string(), }); let response = client.get_files(request).await?.into_inner(); println!("RESPONSE={:?}", response.files); - Ok(()) + return Ok(()); } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9b1363b..3443dbf 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,33 +1,74 @@ -use crate::file_explorer; use super::grpc_juno; -use grpc_juno::juno_request_server::{JunoRequest, JunoRequestServer}; -use grpc_juno::{GetFilesRequest, GetFilesResponse, PingRequestMessage, PingResponseMessage}; +use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; +use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; use std::error::Error; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tonic::transport::Server; -use tonic::{async_trait, Request, Response, Result, Status}; +use tonic::{Request, Response, Result, Status}; + +type Responder = oneshot::Sender; + +pub enum GrpcServerMessage { + Play { + resp: Responder>, + }, + Pause { + resp: Responder>, + }, + PlayPause { + resp: Responder>, + }, + SkipSong { + resp: Responder>, + }, + Set { + resp: Responder>, + }, + GetFiles { + path: PathBuf, + resp: Responder>, + }, +} #[derive(Debug, Default)] pub struct GRPCServer { - address: String, + transmitter: Option>, } impl GRPCServer { - pub fn new(address: String) -> Self { - Self { address } + pub fn new(tx: Sender) -> Self { + Self { + transmitter: Some(tx), + } + } + + pub async fn serve( + address: SocketAddr, + tx: Sender, + ) -> Result<(), Box> { + println!("Starting server on: \"{}\"", address.to_string()); + + Server::builder() + .add_service(JunoServicesServer::new(GRPCServer::new(tx))) + .serve(address) + .await?; + + Ok(()) } } #[tonic::async_trait] -impl JunoRequest for GRPCServer { +impl JunoServices for GRPCServer { async fn ping( &self, - _request: Request, - ) -> Result, Status> { - let reply = PingResponseMessage { + _request: Request, + ) -> Result, Status> { + let reply = PingResponse { message: "pong!".to_string(), }; @@ -41,10 +82,24 @@ impl JunoRequest for GRPCServer { let path = PathBuf::from_str(request.into_inner().path.as_str()) .expect("Failed to create pathbuf"); - let files = match file_explorer::walk_dir(&path) { - Ok(files) => files, - Err(err) => return Err(Status::invalid_argument(err)), - }; + let mut files: Vec = vec![]; + + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::GetFiles { + resp: resp_tx, + path, + }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + files = match resp_rx.await { + Ok(response) => response, + Err(_err) => return Err(Status::internal("An internal error has occurred.")), + }; + } let reply = GetFilesResponse { files: files.iter().map(|x| x.display().to_string()).collect(), @@ -52,20 +107,84 @@ impl JunoRequest for GRPCServer { Ok(Response::new(reply)) } -} -#[async_trait] -impl super::Connection for GRPCServer { - async fn connect(&self) -> Result<(), Box> { - println!("Starting server on: \"{}\"", self.address); + async fn play( + &self, + _request: Request, + ) -> Result, Status> { + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Play { resp: resp_tx }; - let socket: SocketAddr = self.address.parse()?; + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } - Server::builder() - .add_service(JunoRequestServer::new(GRPCServer::default())) - .serve(socket) - .await?; + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } + } - Ok(()) + Ok(Response::new(EmptyResponse {})) + } + + async fn pause( + &self, + _request: Request, + ) -> Result, Status> { + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::Pause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } + } + + Ok(Response::new(EmptyResponse {})) + } + + async fn play_pause( + &self, + _request: Request, + ) -> Result, Status> { + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::PlayPause { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } + } + + Ok(Response::new(EmptyResponse {})) + } + + async fn skip_song( + &self, + _request: Request, + ) -> Result, Status> { + if let Some(tx) = &self.transmitter { + let (resp_tx, resp_rx) = oneshot::channel(); + let message = GrpcServerMessage::SkipSong { resp: resp_tx }; + + if let Err(_err) = tx.send(message).await { + return Err(Status::internal("An internal error has occurred.")); + } + + if let Err(_err) = resp_rx.await { + return Err(Status::internal("An internal error has occurred.")); + } + } + + Ok(Response::new(EmptyResponse {})) } } diff --git a/src/main.rs b/src/main.rs index dee7a89..1cc5604 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,123 @@ +use std::env; use std::error::Error; +use std::time::Duration; +use tokio::time::sleep; + +use tokio::sync::mpsc; + +use crate::player::Player; + +use self::configuration::{Commands, Config, ConfigMode}; +use self::file_handler::{FileExplorer, LocalFileSystem}; +use self::grpc::server::GrpcServerMessage; mod configuration; -mod file_explorer; +mod file_handler; mod grpc; +mod player; -#[tokio::main()] -async fn main() -> Result<(), Box> { - let server = grpc::run()?; +async fn handle_message(player: &mut Player, message: GrpcServerMessage) { + match message { + GrpcServerMessage::Play { resp } => { + player.play(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::Pause { resp } => { + player.pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::PlayPause { resp } => { + player.play_pause(); + let _ = resp.send(Ok(())); + } + GrpcServerMessage::SkipSong { resp } => { + let _ = match player.skip_song() { + Ok(_) => resp.send(Ok(())), + Err(err) => resp.send(Err(err.to_string())), + }; + } + GrpcServerMessage::Set { resp } => { + let _ = resp.send(Ok(())); + } + GrpcServerMessage::GetFiles { path, resp } => { + let files = player.get_files(&path).unwrap(); + let _ = resp.send(files); + } + } +} - server.connect().await?; +async fn init_server(config: Config) -> Result<(), Box> { + let (tx, mut rx) = mpsc::channel::(32); + + tokio::spawn(async move { + let _ = grpc::GRPCServer::serve(config.address, tx).await; + }); + + let mut base_path = env::current_dir().expect("Error accesing the enviroment"); + let mut volume = 1.0; + + if let Commands::Start { + base_path: config_path, + volume: config_volume, + } = config.command + { + base_path = config_path.to_owned(); + volume = config_volume; + }; + + let mut player = Player::new(LocalFileSystem, base_path).expect("Error creating player"); + + player.set_volume(volume); + + println!("Listening for incomming messages..."); + + // This macro will wait on multiple futures and will return when the first one resolves + // TODO: create a break system for shutdown + loop { + tokio::select! { + Some(msg) = rx.recv() => { + handle_message(&mut player, msg).await; + } + _ = async { + loop { + let _ = player.handle_idle(); + sleep(Duration::from_millis(200)).await; + } + } => {} + else => { + println!("player stopped"); + } + } + } +} + +async fn init_client(config: Config) -> Result<(), Box> { + let client = grpc::GRPCClient::new(config.address); + + match &config.command { + Commands::Play => client.play().await?, + Commands::Pause => client.pause().await?, + Commands::PlayPause => client.play_pause().await?, + Commands::SkipSong => client.skip_song().await?, + Commands::Set => todo!(), + Commands::GetFiles { path } => client.get_files(&path).await?, + Commands::Ping => client.ping().await?, + _ => { + println!("This command doesn't apply to client mode") + } + } + + Ok(()) +} + +#[tokio::main()] +async fn main() -> Result<(), Box> { + let config = Config::new(); + + match config.mode { + ConfigMode::Server => init_server(config).await?, + ConfigMode::Client => init_client(config).await?, + }; Ok(()) } diff --git a/src/player.rs b/src/player.rs new file mode 100644 index 0000000..6354791 --- /dev/null +++ b/src/player.rs @@ -0,0 +1,136 @@ +use std::collections::VecDeque; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::path::PathBuf; + +use rodio::{OutputStream, Sink}; + +use crate::file_handler::FileExplorer; + +#[allow(dead_code)] +pub struct Player { + queue: VecDeque, + sink: Sink, + stream: OutputStream, + base_dir: PathBuf, + explorer: T, +} + +impl std::ops::Deref for Player { + type Target = Sink; + + fn deref(&self) -> &Self::Target { + &self.sink + } +} + +impl Player { + pub fn new(explorer: T, base_dir: PathBuf) -> Result> { + let queue = T::get_files(&base_dir); + // stream needs to exist as long as sink to work + let (stream, stream_handle) = OutputStream::try_default()?; + let sink = Sink::try_new(&stream_handle)?; + + Ok(Player { + queue: VecDeque::from(queue), + sink, + stream, + base_dir, + explorer, + }) + } + + pub fn handle_idle(&mut self) -> Result<(), Box> { + if self.sink.is_paused() { + return Ok(()); + } + + if self.queue.len() == 0 { + return Ok(()); + } + + if self.sink.len() != 0 { + return Ok(()); + } + + let file_path = self + .queue + .pop_front() + .expect("There was an error with the queue"); + + self.enqueue_file(file_path)?; + + Ok(()) + } + + pub fn get_files(&mut self, path: &PathBuf) -> Result, Box> { + let search_path = self + .base_dir + .join(path) + .canonicalize() + .expect("Couldn't canonicalizice the path"); + + // PathBuf.join() can override the hole path, this ensure we're not accessing files outside base_dir + if !search_path.starts_with(&self.base_dir) { + panic!("Tried to access file or directory outside of server `base_path` config.") + } + + Ok(T::get_files(&search_path)) + } + + pub fn play(&mut self) { + self.sink.play(); + } + + pub fn pause(&mut self) { + self.sink.pause(); + } + + pub fn play_pause(&self) { + if self.sink.is_paused() { + self.sink.play(); + } else { + self.sink.pause(); + }; + } + + pub fn skip_song(&mut self) -> Result<(), Box> { + println!("Skipping current song...:"); + let file_path = self.queue.pop_front().expect("foo"); + self.enqueue_file(file_path)?; + self.sink.skip_one(); + + Ok(()) + } + + fn enqueue_file(&self, file_path: PathBuf) -> Result<(), Box> { + println!("Playing file: {}", file_path.display()); + let file = File::open(file_path)?; + + self.sink.append(rodio::Decoder::new(BufReader::new(file))?); + Ok(()) + } + + pub fn set_volume(&self, volume: f32) { + self.sink.set_volume(volume); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct MockFileExplorer; + + impl FileExplorer for MockFileExplorer { + fn get_files(_: &PathBuf) -> Vec { + return vec![]; + } + } + + #[test] + fn player_works() { + let _ = Player::new(MockFileExplorer, PathBuf::from(".")).expect("Error creating player"); + } +}