diff --git a/Cargo.toml b/Cargo.toml index b8a5deb..4feab30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,11 @@ edition = "2021" [dependencies] clap = { version = "4.5.4", features = ["derive"] } +futures = "0.3.30" ignore = "0.4.22" lazy_static = "1.4.0" prost = "0.12.4" +rodio = "0.17.3" tokio = { version = "1", features = ["full"] } tonic = "0.11.0" diff --git a/README.md b/README.md index 202e32b..94dff6e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,24 @@ The project was split into 2 though: - [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player. - [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player. +## Contributing + +Aside from rust and cargo, you need the following dependencies: + +- alsa-lib devel package +```bash +# fedora +dnf install -y alsa-lib-devel + +``` + +- Tonic protobuf dependencies +```bash +# fedora +dnf install -y protobuf-devel + +``` + ## Similar projects - [Navidrome](https://www.navidrome.org) diff --git a/proto/juno.proto b/proto/juno.proto index aad85d6..b81cb82 100644 --- a/proto/juno.proto +++ b/proto/juno.proto @@ -2,15 +2,21 @@ syntax = "proto3"; package juno; -service JunoRequest { - rpc Ping (PingRequestMessage) returns (PingResponseMessage); +service JunoServices { + rpc Ping (EmptyRequest) returns (PingResponse); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); + rpc SkipSong (EmptyRequest) returns (StatusResponse); } -message PingRequestMessage { +message EmptyRequest { } -message PingResponseMessage { +// TODO: add an enmurator and a "message" so this act as a generic response to +// services that don't need to return valuable data +message StatusResponse { +} + +message PingResponse { string message = 1; } @@ -21,3 +27,4 @@ message GetFilesRequest { message GetFilesResponse { repeated string files = 1; } + diff --git a/src/configuration.rs b/src/configuration.rs index 73bcf2b..49feaeb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,42 +1,73 @@ use clap::Parser; use lazy_static::lazy_static; use std::env; +use std::net::SocketAddr; use std::path::PathBuf; +use std::str::FromStr; + +use crate::grpc; lazy_static! { pub static ref CONFIG: Config = Config::new(); } +#[derive(Debug)] +pub enum ConfigMode { + Server, + Client, +} + #[derive(Parser)] #[command(version, about, long_about = None)] struct Args { #[arg(help = "Directory to scan for files")] path: Option, + #[arg(short, long, help = "the port to bind to", default_value = "50051")] + port: u16, + #[arg( + long, + help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.", + default_value = "1.0" + )] + volume: f32, } #[derive(Debug)] pub struct Config { pub base_path: PathBuf, + pub address: SocketAddr, + pub mode: ConfigMode, + pub volume: f32, } impl Default for Config { fn default() -> Self { Config { base_path: env::current_dir().expect("Current directory is not available."), + mode: ConfigMode::Server, + address: SocketAddr::from_str("[::1]:50051").unwrap(), + volume: 1.0, } } } - impl Config { pub fn new() -> Self { - let mut config = Self::default(); - let cli = Self::get_cli_args(); + let mut config = Self::default(); + config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); + config.volume = cli.volume; + if let Some(path) = cli.path { config.base_path = path; } + if grpc::is_socket_in_use(config.address) { + config.mode = ConfigMode::Client; + } else { + config.mode = ConfigMode::Server; + }; + config } diff --git a/src/file_explorer.rs b/src/file_explorer.rs index e932592..13cb57b 100644 --- a/src/file_explorer.rs +++ b/src/file_explorer.rs @@ -8,7 +8,7 @@ pub fn walk_dir(path: &PathBuf) -> Result, &str> { let mut types_builder = TypesBuilder::new(); types_builder.add_defaults(); - let accepted_filetypes = ["mp3", "flac"]; + let accepted_filetypes = ["mp3", "flac", "wav"]; for filetype in accepted_filetypes { let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); @@ -17,10 +17,6 @@ pub fn walk_dir(path: &PathBuf) -> Result, &str> { types_builder.select("sound"); let search_path = CONFIG.base_path.join(path); - eprintln!( - "DEBUGPRINT[1]: file_explorer.rs:19: search_path={:#?}", - search_path - ); // PathBuf.join() can override the hole path, this ensure we're not accessing files outside // base_dir diff --git a/src/grpc.rs b/src/grpc.rs index 77e1b60..20bf2e0 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -3,8 +3,8 @@ use std::net::{SocketAddr, TcpListener}; use tonic::async_trait; -use self::client::GRPCClient; -use self::server::GRPCServer; +pub use self::client::GRPCClient; +pub use self::server::GRPCServer; mod client; mod server; @@ -18,20 +18,10 @@ pub trait Connection { async fn connect(&self) -> Result<(), Box>; } -fn is_socket_in_use(addr: String) -> bool { - let socket: SocketAddr = addr.parse().expect("Failed to create socket"); - match TcpListener::bind(socket) { - Ok(_) => true, - Err(_) => false, - } -} - -pub fn run() -> Result, Box> { - let addr = "[::1]:50051"; - - if is_socket_in_use(addr.to_string()) { - Ok(Box::new(GRPCServer::new(addr.to_string()))) - } else { - Ok(Box::new(GRPCClient::new(addr.to_string()))) +/// Return true if the addr is already in use, false otherwise +pub fn is_socket_in_use(addr: SocketAddr) -> bool { + match TcpListener::bind(addr) { + Ok(_) => false, + Err(_) => true, } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index b5026b1..816d2ed 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,28 +1,25 @@ +use crate::configuration::CONFIG; +use crate::grpc::grpc_juno::EmptyRequest; + use super::grpc_juno; -use grpc_juno::juno_request_client::JunoRequestClient; +use grpc_juno::juno_services_client::JunoServicesClient; use grpc_juno::GetFilesRequest; use tonic::async_trait; +use tonic::transport::Channel; use tonic::Request; #[derive(Debug, Default)] -pub struct GRPCClient { - address: String, -} - -impl GRPCClient { - pub fn new(address: String) -> Self { - Self { address } - } -} +pub struct GRPCClient {} #[async_trait] impl super::Connection for GRPCClient { async fn connect(&self) -> Result<(), Box> { - let mut client = JunoRequestClient::connect(format!("http://{}", self.address)).await?; + let mut client = + JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; let request = Request::new(GetFilesRequest { - path: "/home/aleidk/Documents/".to_string(), + path: CONFIG.base_path.display().to_string(), }); let response = client.get_files(request).await?.into_inner(); @@ -32,3 +29,36 @@ impl super::Connection for GRPCClient { Ok(()) } } + +impl GRPCClient { + async fn get_client(&self) -> Result, Box> { + let client = + JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; + + Ok(client) + } + + pub async fn ping(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.ping(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } + + pub async fn skip_song(&self) -> Result<(), Box> { + let mut client = self.get_client().await?; + + let request = Request::new(EmptyRequest {}); + + let response = client.skip_song(request).await?.into_inner(); + + println!("RESPONSE={:?}", response); + + Ok(()) + } +} diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 9b1363b..77b1aff 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,33 +1,55 @@ -use crate::file_explorer; +use crate::configuration::CONFIG; +use crate::{file_explorer, PlayerAction}; use super::grpc_juno; -use grpc_juno::juno_request_server::{JunoRequest, JunoRequestServer}; -use grpc_juno::{GetFilesRequest, GetFilesResponse, PingRequestMessage, PingResponseMessage}; +use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; +use grpc_juno::{EmptyRequest, GetFilesRequest, GetFilesResponse, PingResponse, StatusResponse}; use std::error::Error; -use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use tokio::sync::mpsc::Sender; use tonic::transport::Server; use tonic::{async_trait, Request, Response, Result, Status}; #[derive(Debug, Default)] pub struct GRPCServer { - address: String, + transmitter: Option>, } impl GRPCServer { - pub fn new(address: String) -> Self { - Self { address } + pub fn new(tx: Sender) -> Self { + Self { + transmitter: Some(tx), + } + } + + async fn send_message(&self, message: PlayerAction) -> Result<(), Box> { + if let Some(tx) = &self.transmitter { + tx.send(message).await?; + } + + Ok(()) + } + + pub async fn serve(tx: Sender) -> Result<(), Box> { + println!("Starting server on: \"{}\"", CONFIG.address.to_string()); + + Server::builder() + .add_service(JunoServicesServer::new(GRPCServer::new(tx))) + .serve(CONFIG.address) + .await?; + + Ok(()) } } #[tonic::async_trait] -impl JunoRequest for GRPCServer { +impl JunoServices for GRPCServer { async fn ping( &self, - _request: Request, - ) -> Result, Status> { - let reply = PingResponseMessage { + _request: Request, + ) -> Result, Status> { + let reply = PingResponse { message: "pong!".to_string(), }; @@ -52,18 +74,27 @@ impl JunoRequest for GRPCServer { Ok(Response::new(reply)) } + + async fn skip_song( + &self, + _request: Request, + ) -> Result, Status> { + if let Err(_err) = self.send_message(PlayerAction::SkipSong).await { + return Err(Status::internal("An internal error has occurred.")); + } + + Ok(Response::new(StatusResponse {})) + } } #[async_trait] impl super::Connection for GRPCServer { async fn connect(&self) -> Result<(), Box> { - println!("Starting server on: \"{}\"", self.address); - - let socket: SocketAddr = self.address.parse()?; + println!("Starting server on: \"{}\"", CONFIG.address.to_string()); Server::builder() - .add_service(JunoRequestServer::new(GRPCServer::default())) - .serve(socket) + .add_service(JunoServicesServer::new(GRPCServer::default())) + .serve(CONFIG.address) .await?; Ok(()) diff --git a/src/main.rs b/src/main.rs index dee7a89..e68622e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,67 @@ use std::error::Error; +use std::time::Duration; +use tokio::time::sleep; + +use tokio::sync::mpsc; + +use crate::player::Player; + +use self::configuration::{ConfigMode, CONFIG}; +use self::player::PlayerAction; mod configuration; mod file_explorer; mod grpc; +mod player; -#[tokio::main()] -async fn main() -> Result<(), Box> { - let server = grpc::run()?; +async fn init_server() -> Result<(), Box> { + let (tx, mut rx) = mpsc::channel::(32); - server.connect().await?; + tokio::spawn(async move { + let _ = grpc::GRPCServer::serve(tx).await; + }); + + let mut player = Player::new().expect("Error creating player"); + + println!("Listening for incomming messages..."); + + // This macro will wait on multiple futures and will return when the first one resolves + tokio::select! { + Some(msg) = rx.recv() => { + if let Err(err) = player.handle_message(msg) { + eprintln!("Error handling player action: {}", err); + } + } + _ = async { + loop { + let _ = player.handle_idle(); + sleep(Duration::from_millis(200)).await; + } + } => {println!("player stopped");} + } + + // this traps the main thread, it should run last. + while let Some(msg) = rx.recv().await { + if let Err(err) = player.handle_message(msg) { + eprintln!("Error handling player action: {}", err); + } + } + + Ok(()) +} + +async fn init_client() -> Result<(), Box> { + let client = grpc::GRPCClient::default(); + let _ = client.skip_song().await; + Ok(()) +} + +#[tokio::main()] +async fn main() -> Result<(), Box> { + match CONFIG.mode { + ConfigMode::Server => init_server().await?, + ConfigMode::Client => init_client().await?, + }; Ok(()) } diff --git a/src/player.rs b/src/player.rs new file mode 100644 index 0000000..dd1c4d0 --- /dev/null +++ b/src/player.rs @@ -0,0 +1,117 @@ +use std::collections::VecDeque; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::path::PathBuf; + +use rodio::{OutputStream, Sink}; + +use crate::configuration::CONFIG; +use crate::file_explorer::walk_dir; + +#[derive(Debug)] +pub enum PlayerAction { + Play, + SkipSong, + Set, +} + +pub struct Player { + queue: VecDeque, + sink: Sink, + stream: OutputStream, +} + +impl std::ops::Deref for Player { + type Target = Sink; + + fn deref(&self) -> &Self::Target { + &self.sink + } +} + +impl Player { + pub fn new() -> Result> { + let queue = walk_dir(&CONFIG.base_path)?; + let (stream, stream_handle) = OutputStream::try_default()?; + let sink = Sink::try_new(&stream_handle)?; + sink.set_volume(CONFIG.volume); + Ok(Player { + queue: VecDeque::from(queue), + sink, + stream, + }) + } + + pub fn handle_message(&mut self, message: PlayerAction) -> Result<(), Box> { + match message { + PlayerAction::Play => self.play()?, + PlayerAction::SkipSong => self.skip_song()?, + PlayerAction::Set => unimplemented!(), + } + + Ok(()) + } + + pub fn handle_idle(&mut self) -> Result<(), Box> { + if self.sink.is_paused() { + return Ok(()); + } + + if self.queue.len() == 0 { + return Ok(()); + } + + if self.sink.len() != 0 { + return Ok(()); + } + + let file_path = self + .queue + .pop_front() + .expect("There was an error with the queue"); + + self.enqueue_file(file_path)?; + + Ok(()) + } + + fn play(&mut self) -> Result<(), Box> { + self.sink.play(); + + Ok(()) + } + + fn skip_song(&mut self) -> Result<(), Box> { + println!("Skipping current song...:"); + let file_path = self.queue.pop_front().expect("foo"); + self.enqueue_file(file_path)?; + self.sink.skip_one(); + + Ok(()) + } + + fn enqueue_file(&self, file_path: PathBuf) -> Result<(), Box> { + println!("Playing file: {}", file_path.display()); + let file = File::open(file_path)?; + + self.sink.append(rodio::Decoder::new(BufReader::new(file))?); + Ok(()) + } + + fn _play_pause(&self) { + if self.sink.is_paused() { + self.sink.play(); + } else { + self.sink.pause(); + }; + } + + fn set_playback_state(&self, is_paused: bool) { + if is_paused { + self.sink.pause(); + } else { + self.sink.play(); + }; + } +}