refactor(config): remove global config and pass as referenced

This commit is contained in:
Alexander Navarro 2024-07-19 16:02:56 -04:00
parent 9cec453498
commit 940093d599
7 changed files with 138 additions and 108 deletions

View file

@ -1,15 +1,10 @@
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use lazy_static::lazy_static;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use crate::grpc; use crate::grpc;
lazy_static! {
pub static ref CONFIG: Config = Config::new();
}
#[derive(Debug)] #[derive(Debug)]
pub enum ConfigMode { pub enum ConfigMode {
Server, Server,
@ -22,6 +17,12 @@ pub enum Commands {
Start { Start {
#[arg(help = "Directory to scan for files", default_value = ".")] #[arg(help = "Directory to scan for files", default_value = ".")]
base_path: PathBuf, base_path: PathBuf,
#[arg(
long,
help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.",
default_value = "1.0"
)]
volume: f32,
}, },
/// Resume the playback /// Resume the playback
Play, Play,
@ -52,13 +53,6 @@ struct Args {
#[arg(short, long, help = "the port to bind to", default_value = "50051")] #[arg(short, long, help = "the port to bind to", default_value = "50051")]
port: u16, port: u16,
#[arg(
long,
help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.",
default_value = "1.0"
)]
volume: f32,
} }
#[derive(Debug)] #[derive(Debug)]
@ -85,7 +79,6 @@ impl Config {
let mut config = Self::default(); let mut config = Self::default();
config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap(); config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap();
config.volume = cli.volume;
config.command = cli.cmd; config.command = cli.cmd;
if grpc::is_socket_in_use(config.address) { if grpc::is_socket_in_use(config.address) {

View file

@ -1,11 +1,9 @@
use ignore::types::TypesBuilder; use ignore::types::TypesBuilder;
use ignore::WalkBuilder; use ignore::WalkBuilder;
use std::env; use std::error::Error;
use std::path::PathBuf; use std::path::PathBuf;
use crate::configuration::{Commands, CONFIG}; pub fn walk_dir(path: &PathBuf) -> Result<Vec<PathBuf>, Box<dyn Error>> {
pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result<Vec<PathBuf>, &str> {
let mut types_builder = TypesBuilder::new(); let mut types_builder = TypesBuilder::new();
types_builder.add_defaults(); types_builder.add_defaults();
@ -17,34 +15,25 @@ pub fn walk_dir(scan_dir: Option<&PathBuf>) -> Result<Vec<PathBuf>, &str> {
types_builder.select("sound"); types_builder.select("sound");
let mut base_path = env::current_dir().expect("Error accesing the enviroment"); // let mut base_path = env::current_dir().expect("Error accesing the enviroment");
//
// match path {
// Some(dir) => {
// search_path = base_path
// .join(dir)
// .canonicalize()
// .expect("Couldn't canonicalizice the path")
// }
// None => search_path = base_path.to_owned(),
// }
//
// // PathBuf.join() can override the hole path, this ensure we're not accessing files outside
// // base_dir
// if !search_path.starts_with(base_path) {
// return Err("Tried to access file or directory outside of server `base_path` config.");
// }
if let Commands::Start { let entries: Vec<PathBuf> = WalkBuilder::new(path)
base_path: config_path,
} = &CONFIG.command
{
base_path = config_path.to_owned();
};
let search_path;
match scan_dir {
Some(dir) => {
search_path = base_path
.join(dir)
.canonicalize()
.expect("Couldn't canonicalizice the path")
}
None => search_path = base_path.to_owned(),
}
// PathBuf.join() can override the hole path, this ensure we're not accessing files outside
// base_dir
if !search_path.starts_with(base_path) {
return Err("Tried to access file or directory outside of server `base_path` config.");
}
let entries: Vec<PathBuf> = WalkBuilder::new(search_path)
.types(types_builder.build().unwrap()) .types(types_builder.build().unwrap())
.build() .build()
.filter_map(|entry| entry.ok()) .filter_map(|entry| entry.ok())

View file

@ -1,11 +1,10 @@
use std::net::{SocketAddr, TcpListener}; use std::net::{SocketAddr, TcpListener};
pub use self::client::GRPCClient; pub use self::client::GRPCClient;
pub use self::server::GRPCServer; pub use self::server::GRPCServer;
mod client; mod client;
mod server; pub mod server;
pub mod grpc_juno { pub mod grpc_juno {
tonic::include_proto!("juno"); tonic::include_proto!("juno");

View file

@ -1,6 +1,6 @@
use core::panic; use std::net::SocketAddr;
use std::path::PathBuf;
use crate::configuration::{Commands, CONFIG};
use crate::grpc::grpc_juno::EmptyRequest; use crate::grpc::grpc_juno::EmptyRequest;
use super::grpc_juno; use super::grpc_juno;
@ -10,13 +10,19 @@ use grpc_juno::GetFilesRequest;
use tonic::transport::Channel; use tonic::transport::Channel;
use tonic::Request; use tonic::Request;
#[derive(Debug, Default)] #[derive(Debug)]
pub struct GRPCClient {} pub struct GRPCClient {
address: SocketAddr,
}
impl GRPCClient { impl GRPCClient {
pub fn new(address: SocketAddr) -> Self {
Self { address }
}
async fn get_client(&self) -> Result<JunoServicesClient<Channel>, Box<dyn std::error::Error>> { async fn get_client(&self) -> Result<JunoServicesClient<Channel>, Box<dyn std::error::Error>> {
let client = let client =
JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?; JunoServicesClient::connect(format!("http://{}", self.address.to_string())).await?;
Ok(client) Ok(client)
} }
@ -81,21 +87,17 @@ impl GRPCClient {
Ok(()) Ok(())
} }
pub async fn get_files(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn get_files(&self, path: &PathBuf) -> Result<(), Box<dyn std::error::Error>> {
let mut client = self.get_client().await?; let mut client = self.get_client().await?;
if let Commands::GetFiles { path } = &CONFIG.command { let request = Request::new(GetFilesRequest {
let request = Request::new(GetFilesRequest { path: path.display().to_string(),
path: path.display().to_string(), });
});
let response = client.get_files(request).await?.into_inner(); let response = client.get_files(request).await?.into_inner();
println!("RESPONSE={:?}", response.files); println!("RESPONSE={:?}", response.files);
return Ok(()); return Ok(());
};
panic!("Error");
} }
} }

View file

@ -1,42 +1,65 @@
use crate::configuration::{Commands, CONFIG}; use crate::configuration::Commands;
use crate::file_explorer;
use super::grpc_juno; use super::grpc_juno;
use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer}; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer};
use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse}; use grpc_juno::{EmptyRequest, EmptyResponse, GetFilesRequest, GetFilesResponse, PingResponse};
use std::error::Error; use std::error::Error;
use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tonic::transport::Server; use tonic::transport::Server;
use tonic::{Request, Response, Result, Status}; use tonic::{Request, Response, Result, Status};
type Responder<T> = oneshot::Sender<T>;
#[derive(Debug)]
pub struct GrpcServerMessage {
pub command: Commands,
pub responder: Responder<()>,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct GRPCServer { pub struct GRPCServer {
transmitter: Option<Sender<Commands>>, transmitter: Option<Sender<GrpcServerMessage>>,
} }
impl GRPCServer { impl GRPCServer {
pub fn new(tx: Sender<Commands>) -> Self { pub fn new(tx: Sender<GrpcServerMessage>) -> Self {
Self { Self {
transmitter: Some(tx), transmitter: Some(tx),
} }
} }
async fn send_message(&self, message: Commands) -> Result<(), Box<dyn Error>> { async fn send_message(&self, command: Commands) -> Result<(), Box<dyn Error>> {
let (resp_tx, resp_rx) = oneshot::channel();
let message = GrpcServerMessage {
command,
responder: resp_tx,
};
if let Some(tx) = &self.transmitter { if let Some(tx) = &self.transmitter {
tx.send(message).await?; tx.send(message).await?;
let response = resp_rx.await?;
return Ok(response);
} }
Ok(()) Ok(())
} }
pub async fn serve(tx: Sender<Commands>) -> Result<(), Box<dyn Error>> { pub async fn serve(
println!("Starting server on: \"{}\"", CONFIG.address.to_string()); address: SocketAddr,
tx: Sender<GrpcServerMessage>,
) -> Result<(), Box<dyn Error>> {
println!("Starting server on: \"{}\"", address.to_string());
Server::builder() Server::builder()
.add_service(JunoServicesServer::new(GRPCServer::new(tx))) .add_service(JunoServicesServer::new(GRPCServer::new(tx)))
.serve(CONFIG.address) .serve(address)
.await?; .await?;
Ok(()) Ok(())
@ -63,9 +86,9 @@ impl JunoServices for GRPCServer {
let path = PathBuf::from_str(request.into_inner().path.as_str()) let path = PathBuf::from_str(request.into_inner().path.as_str())
.expect("Failed to create pathbuf"); .expect("Failed to create pathbuf");
let files = match file_explorer::walk_dir(Some(&path)) { let files: Vec<PathBuf> = match self.send_message(Commands::GetFiles { path }).await {
Ok(files) => files, Ok(()) => vec![],
Err(err) => return Err(Status::invalid_argument(err)), Err(_err) => return Err(Status::internal("An internal error has occurred.")),
}; };
let reply = GetFilesResponse { let reply = GetFilesResponse {

View file

@ -1,3 +1,4 @@
use std::env;
use std::error::Error; use std::error::Error;
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
@ -6,59 +7,74 @@ use tokio::sync::mpsc;
use crate::player::Player; use crate::player::Player;
use self::configuration::{Commands, ConfigMode, CONFIG}; use self::configuration::{Commands, Config, ConfigMode};
mod configuration; mod configuration;
mod file_explorer; mod file_explorer;
mod grpc; mod grpc;
mod player; mod player;
async fn init_server() -> Result<(), Box<dyn Error>> { async fn init_server(config: Config) -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::channel::<Commands>(32); let (tx, mut rx) = mpsc::channel::<grpc::server::GrpcServerMessage>(32);
tokio::spawn(async move { tokio::spawn(async move {
let _ = grpc::GRPCServer::serve(tx).await; let _ = grpc::GRPCServer::serve(config.address, tx).await;
}); });
let mut player = Player::new().expect("Error creating player"); let mut base_path = env::current_dir().expect("Error accesing the enviroment");
let mut volume = 1.0;
if let Commands::Start {
base_path: config_path,
volume: config_volume,
} = config.command
{
base_path = config_path.to_owned();
volume = config_volume;
};
let mut player = Player::new(base_path, volume).expect("Error creating player");
println!("Listening for incomming messages..."); println!("Listening for incomming messages...");
// This macro will wait on multiple futures and will return when the first one resolves // This macro will wait on multiple futures and will return when the first one resolves
tokio::select! { // TODO: create a break system for shutdown
Some(msg) = rx.recv() => { loop {
if let Err(err) = player.handle_message(msg) { tokio::select! {
eprintln!("Error handling player action: {}", err); Some(msg) = rx.recv() => {
// TODO: receive message from player and send it back to server so it can be sent to
// the client
if let Err(err) = player.handle_message(msg.command) {
eprintln!("Error handling player action: {}", err);
}
if let Err(_) = msg.responder.send(()) {
eprintln!("Error responding to grpc server");
}
}
_ = async {
loop {
let _ = player.handle_idle();
sleep(Duration::from_millis(200)).await;
}
} => {}
else => {
println!("player stopped");
} }
} }
_ = async {
loop {
let _ = player.handle_idle();
sleep(Duration::from_millis(200)).await;
}
} => {println!("player stopped");}
} }
// this traps the main thread, it should run last.
while let Some(msg) = rx.recv().await {
if let Err(err) = player.handle_message(msg) {
eprintln!("Error handling player action: {}", err);
}
}
Ok(())
} }
async fn init_client() -> Result<(), Box<dyn Error>> { async fn init_client(config: Config) -> Result<(), Box<dyn Error>> {
let client = grpc::GRPCClient::default(); let client = grpc::GRPCClient::new(config.address);
match &CONFIG.command { match &config.command {
Commands::Play => client.play().await?, Commands::Play => client.play().await?,
Commands::Pause => client.pause().await?, Commands::Pause => client.pause().await?,
Commands::PlayPause => client.play_pause().await?, Commands::PlayPause => client.play_pause().await?,
Commands::SkipSong => client.skip_song().await?, Commands::SkipSong => client.skip_song().await?,
Commands::Set => todo!(), Commands::Set => todo!(),
Commands::GetFiles { path: _ } => client.get_files().await?, Commands::GetFiles { path } => client.get_files(&path).await?,
Commands::Ping => client.ping().await?, Commands::Ping => client.ping().await?,
_ => { _ => {
println!("This command doesn't apply to client mode") println!("This command doesn't apply to client mode")
@ -70,9 +86,11 @@ async fn init_client() -> Result<(), Box<dyn Error>> {
#[tokio::main()] #[tokio::main()]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
match CONFIG.mode { let config = Config::new();
ConfigMode::Server => init_server().await?,
ConfigMode::Client => init_client().await?, match config.mode {
ConfigMode::Server => init_server(config).await?,
ConfigMode::Client => init_client(config).await?,
}; };
Ok(()) Ok(())

View file

@ -6,7 +6,7 @@ use std::path::PathBuf;
use rodio::{OutputStream, Sink}; use rodio::{OutputStream, Sink};
use crate::configuration::{self, CONFIG}; use crate::configuration;
use crate::file_explorer::walk_dir; use crate::file_explorer::walk_dir;
#[allow(dead_code)] #[allow(dead_code)]
@ -14,6 +14,7 @@ pub struct Player {
queue: VecDeque<PathBuf>, queue: VecDeque<PathBuf>,
sink: Sink, sink: Sink,
stream: OutputStream, stream: OutputStream,
base_dir: PathBuf,
} }
impl std::ops::Deref for Player { impl std::ops::Deref for Player {
@ -25,17 +26,18 @@ impl std::ops::Deref for Player {
} }
impl Player { impl Player {
pub fn new() -> Result<Self, Box<dyn Error>> { pub fn new(base_dir: PathBuf, volume: f32) -> Result<Self, Box<dyn Error>> {
let queue = walk_dir(None)?; let queue = walk_dir(&base_dir)?;
// stream needs to exist as long as sink to work // stream needs to exist as long as sink to work
let (stream, stream_handle) = OutputStream::try_default()?; let (stream, stream_handle) = OutputStream::try_default()?;
let sink = Sink::try_new(&stream_handle)?; let sink = Sink::try_new(&stream_handle)?;
sink.set_volume(CONFIG.volume); sink.set_volume(volume);
Ok(Player { Ok(Player {
queue: VecDeque::from(queue), queue: VecDeque::from(queue),
sink, sink,
stream, stream,
base_dir,
}) })
} }
@ -80,6 +82,10 @@ impl Player {
Ok(()) Ok(())
} }
fn get_files(path: &PathBuf) -> Result<Vec<PathBuf>, Box<dyn Error>> {
Ok(walk_dir(&path)?)
}
fn play(&mut self) { fn play(&mut self) {
self.sink.play(); self.sink.play();
} }