feat: implement basic music player and grpc server

This commit is contained in:
Alexander Navarro 2024-05-28 20:11:53 -04:00
parent ad61cf68c6
commit 7382b06bdf
10 changed files with 336 additions and 61 deletions

View file

@ -5,9 +5,11 @@ edition = "2021"
[dependencies] [dependencies]
clap = { version = "4.5.4", features = ["derive"] } clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3.30"
ignore = "0.4.22" ignore = "0.4.22"
lazy_static = "1.4.0" lazy_static = "1.4.0"
prost = "0.12.4" prost = "0.12.4"
rodio = "0.17.3"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tonic = "0.11.0" tonic = "0.11.0"

View file

@ -48,6 +48,24 @@ The project was split into 2 though:
- [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player. - [Fuuka](https://megamitensei.fandom.com/wiki/Fuuka_Yamagishi), the navi of SEES in Persona 3, you can ask her to change the musing in tartarus. It act as the frontend to interact with the player.
- [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player. - [Juno](https://megamitensei.fandom.com/wiki/Juno), the persona of Fuuka that grants her the ability to communicate telepatically to her teamates. It act as the music player.
## Contributing
Aside from rust and cargo, you need the following dependencies:
- alsa-lib devel package
```bash
# fedora
dnf install -y alsa-lib-devel
```
- Tonic protobuf dependencies
```bash
# fedora
dnf install -y protobuf-devel
```
## Similar projects ## Similar projects
- [Navidrome](https://www.navidrome.org) - [Navidrome](https://www.navidrome.org)

View file

@ -2,15 +2,21 @@ syntax = "proto3";
package juno; package juno;
service JunoRequest { service JunoServices {
rpc Ping (PingRequestMessage) returns (PingResponseMessage); rpc Ping (EmptyRequest) returns (PingResponse);
rpc GetFiles (GetFilesRequest) returns (GetFilesResponse); rpc GetFiles (GetFilesRequest) returns (GetFilesResponse);
rpc SkipSong (EmptyRequest) returns (StatusResponse);
} }
message PingRequestMessage { message EmptyRequest {
} }
message PingResponseMessage { // TODO: add an enmurator and a "message" so this act as a generic response to
// services that don't need to return valuable data
message StatusResponse {
}
message PingResponse {
string message = 1; string message = 1;
} }
@ -21,3 +27,4 @@ message GetFilesRequest {
message GetFilesResponse { message GetFilesResponse {
repeated string files = 1; repeated string files = 1;
} }

View file

@ -1,42 +1,73 @@
use clap::Parser; use clap::Parser;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::env; use std::env;
use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr;
use crate::grpc;
lazy_static! { lazy_static! {
pub static ref CONFIG: Config = Config::new(); pub static ref CONFIG: Config = Config::new();
} }
#[derive(Debug)]
pub enum ConfigMode {
Server,
Client,
}
#[derive(Parser)] #[derive(Parser)]
#[command(version, about, long_about = None)] #[command(version, about, long_about = None)]
struct Args { struct Args {
#[arg(help = "Directory to scan for files")] #[arg(help = "Directory to scan for files")]
path: Option<PathBuf>, path: Option<PathBuf>,
#[arg(short, long, help = "the port to bind to", default_value = "50051")]
port: u16,
#[arg(
long,
help = "The value 1.0 is the “normal” volume. Any value other than 1.0 will multiply each sample by this value.",
default_value = "1.0"
)]
volume: f32,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Config { pub struct Config {
pub base_path: PathBuf, pub base_path: PathBuf,
pub address: SocketAddr,
pub mode: ConfigMode,
pub volume: f32,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Config { Config {
base_path: env::current_dir().expect("Current directory is not available."), base_path: env::current_dir().expect("Current directory is not available."),
mode: ConfigMode::Server,
address: SocketAddr::from_str("[::1]:50051").unwrap(),
volume: 1.0,
} }
} }
} }
impl Config { impl Config {
pub fn new() -> Self { pub fn new() -> Self {
let mut config = Self::default();
let cli = Self::get_cli_args(); let cli = Self::get_cli_args();
let mut config = Self::default();
config.address = SocketAddr::from_str(format!("[::1]:{}", cli.port).as_str()).unwrap();
config.volume = cli.volume;
if let Some(path) = cli.path { if let Some(path) = cli.path {
config.base_path = path; config.base_path = path;
} }
if grpc::is_socket_in_use(config.address) {
config.mode = ConfigMode::Client;
} else {
config.mode = ConfigMode::Server;
};
config config
} }

View file

@ -8,7 +8,7 @@ pub fn walk_dir(path: &PathBuf) -> Result<Vec<PathBuf>, &str> {
let mut types_builder = TypesBuilder::new(); let mut types_builder = TypesBuilder::new();
types_builder.add_defaults(); types_builder.add_defaults();
let accepted_filetypes = ["mp3", "flac"]; let accepted_filetypes = ["mp3", "flac", "wav"];
for filetype in accepted_filetypes { for filetype in accepted_filetypes {
let _ = types_builder.add("sound", format!("*.{}", filetype).as_str()); let _ = types_builder.add("sound", format!("*.{}", filetype).as_str());
@ -17,10 +17,6 @@ pub fn walk_dir(path: &PathBuf) -> Result<Vec<PathBuf>, &str> {
types_builder.select("sound"); types_builder.select("sound");
let search_path = CONFIG.base_path.join(path); let search_path = CONFIG.base_path.join(path);
eprintln!(
"DEBUGPRINT[1]: file_explorer.rs:19: search_path={:#?}",
search_path
);
// PathBuf.join() can override the hole path, this ensure we're not accessing files outside // PathBuf.join() can override the hole path, this ensure we're not accessing files outside
// base_dir // base_dir

View file

@ -3,8 +3,8 @@ use std::net::{SocketAddr, TcpListener};
use tonic::async_trait; use tonic::async_trait;
use self::client::GRPCClient; pub use self::client::GRPCClient;
use self::server::GRPCServer; pub use self::server::GRPCServer;
mod client; mod client;
mod server; mod server;
@ -18,20 +18,10 @@ pub trait Connection {
async fn connect(&self) -> Result<(), Box<dyn Error>>; async fn connect(&self) -> Result<(), Box<dyn Error>>;
} }
fn is_socket_in_use(addr: String) -> bool { /// Return true if the addr is already in use, false otherwise
let socket: SocketAddr = addr.parse().expect("Failed to create socket"); pub fn is_socket_in_use(addr: SocketAddr) -> bool {
match TcpListener::bind(socket) { match TcpListener::bind(addr) {
Ok(_) => true, Ok(_) => false,
Err(_) => false, Err(_) => true,
}
}
pub fn run() -> Result<Box<dyn Connection>, Box<dyn Error>> {
let addr = "[::1]:50051";
if is_socket_in_use(addr.to_string()) {
Ok(Box::new(GRPCServer::new(addr.to_string())))
} else {
Ok(Box::new(GRPCClient::new(addr.to_string())))
} }
} }

View file

@ -1,28 +1,25 @@
use crate::configuration::CONFIG;
use crate::grpc::grpc_juno::EmptyRequest;
use super::grpc_juno; use super::grpc_juno;
use grpc_juno::juno_request_client::JunoRequestClient; use grpc_juno::juno_services_client::JunoServicesClient;
use grpc_juno::GetFilesRequest; use grpc_juno::GetFilesRequest;
use tonic::async_trait; use tonic::async_trait;
use tonic::transport::Channel;
use tonic::Request; use tonic::Request;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct GRPCClient { pub struct GRPCClient {}
address: String,
}
impl GRPCClient {
pub fn new(address: String) -> Self {
Self { address }
}
}
#[async_trait] #[async_trait]
impl super::Connection for GRPCClient { impl super::Connection for GRPCClient {
async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> { async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut client = JunoRequestClient::connect(format!("http://{}", self.address)).await?; let mut client =
JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?;
let request = Request::new(GetFilesRequest { let request = Request::new(GetFilesRequest {
path: "/home/aleidk/Documents/".to_string(), path: CONFIG.base_path.display().to_string(),
}); });
let response = client.get_files(request).await?.into_inner(); let response = client.get_files(request).await?.into_inner();
@ -32,3 +29,36 @@ impl super::Connection for GRPCClient {
Ok(()) Ok(())
} }
} }
impl GRPCClient {
async fn get_client(&self) -> Result<JunoServicesClient<Channel>, Box<dyn std::error::Error>> {
let client =
JunoServicesClient::connect(format!("http://{}", CONFIG.address.to_string())).await?;
Ok(client)
}
pub async fn ping(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut client = self.get_client().await?;
let request = Request::new(EmptyRequest {});
let response = client.ping(request).await?.into_inner();
println!("RESPONSE={:?}", response);
Ok(())
}
pub async fn skip_song(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut client = self.get_client().await?;
let request = Request::new(EmptyRequest {});
let response = client.skip_song(request).await?.into_inner();
println!("RESPONSE={:?}", response);
Ok(())
}
}

View file

@ -1,33 +1,55 @@
use crate::file_explorer; use crate::configuration::CONFIG;
use crate::{file_explorer, PlayerAction};
use super::grpc_juno; use super::grpc_juno;
use grpc_juno::juno_request_server::{JunoRequest, JunoRequestServer}; use grpc_juno::juno_services_server::{JunoServices, JunoServicesServer};
use grpc_juno::{GetFilesRequest, GetFilesResponse, PingRequestMessage, PingResponseMessage}; use grpc_juno::{EmptyRequest, GetFilesRequest, GetFilesResponse, PingResponse, StatusResponse};
use std::error::Error; use std::error::Error;
use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use tokio::sync::mpsc::Sender;
use tonic::transport::Server; use tonic::transport::Server;
use tonic::{async_trait, Request, Response, Result, Status}; use tonic::{async_trait, Request, Response, Result, Status};
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct GRPCServer { pub struct GRPCServer {
address: String, transmitter: Option<Sender<PlayerAction>>,
} }
impl GRPCServer { impl GRPCServer {
pub fn new(address: String) -> Self { pub fn new(tx: Sender<PlayerAction>) -> Self {
Self { address } Self {
transmitter: Some(tx),
}
}
async fn send_message(&self, message: PlayerAction) -> Result<(), Box<dyn Error>> {
if let Some(tx) = &self.transmitter {
tx.send(message).await?;
}
Ok(())
}
pub async fn serve(tx: Sender<PlayerAction>) -> Result<(), Box<dyn Error>> {
println!("Starting server on: \"{}\"", CONFIG.address.to_string());
Server::builder()
.add_service(JunoServicesServer::new(GRPCServer::new(tx)))
.serve(CONFIG.address)
.await?;
Ok(())
} }
} }
#[tonic::async_trait] #[tonic::async_trait]
impl JunoRequest for GRPCServer { impl JunoServices for GRPCServer {
async fn ping( async fn ping(
&self, &self,
_request: Request<PingRequestMessage>, _request: Request<EmptyRequest>,
) -> Result<Response<PingResponseMessage>, Status> { ) -> Result<Response<PingResponse>, Status> {
let reply = PingResponseMessage { let reply = PingResponse {
message: "pong!".to_string(), message: "pong!".to_string(),
}; };
@ -52,18 +74,27 @@ impl JunoRequest for GRPCServer {
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
async fn skip_song(
&self,
_request: Request<EmptyRequest>,
) -> Result<Response<StatusResponse>, Status> {
if let Err(_err) = self.send_message(PlayerAction::SkipSong).await {
return Err(Status::internal("An internal error has occurred."));
}
Ok(Response::new(StatusResponse {}))
}
} }
#[async_trait] #[async_trait]
impl super::Connection for GRPCServer { impl super::Connection for GRPCServer {
async fn connect(&self) -> Result<(), Box<dyn Error>> { async fn connect(&self) -> Result<(), Box<dyn Error>> {
println!("Starting server on: \"{}\"", self.address); println!("Starting server on: \"{}\"", CONFIG.address.to_string());
let socket: SocketAddr = self.address.parse()?;
Server::builder() Server::builder()
.add_service(JunoRequestServer::new(GRPCServer::default())) .add_service(JunoServicesServer::new(GRPCServer::default()))
.serve(socket) .serve(CONFIG.address)
.await?; .await?;
Ok(()) Ok(())

View file

@ -1,14 +1,67 @@
use std::error::Error; use std::error::Error;
use std::time::Duration;
use tokio::time::sleep;
use tokio::sync::mpsc;
use crate::player::Player;
use self::configuration::{ConfigMode, CONFIG};
use self::player::PlayerAction;
mod configuration; mod configuration;
mod file_explorer; mod file_explorer;
mod grpc; mod grpc;
mod player;
#[tokio::main()] async fn init_server() -> Result<(), Box<dyn Error>> {
async fn main() -> Result<(), Box<dyn Error>> { let (tx, mut rx) = mpsc::channel::<PlayerAction>(32);
let server = grpc::run()?;
server.connect().await?; tokio::spawn(async move {
let _ = grpc::GRPCServer::serve(tx).await;
});
let mut player = Player::new().expect("Error creating player");
println!("Listening for incomming messages...");
// This macro will wait on multiple futures and will return when the first one resolves
tokio::select! {
Some(msg) = rx.recv() => {
if let Err(err) = player.handle_message(msg) {
eprintln!("Error handling player action: {}", err);
}
}
_ = async {
loop {
let _ = player.handle_idle();
sleep(Duration::from_millis(200)).await;
}
} => {println!("player stopped");}
}
// this traps the main thread, it should run last.
while let Some(msg) = rx.recv().await {
if let Err(err) = player.handle_message(msg) {
eprintln!("Error handling player action: {}", err);
}
}
Ok(())
}
async fn init_client() -> Result<(), Box<dyn Error>> {
let client = grpc::GRPCClient::default();
let _ = client.skip_song().await;
Ok(())
}
#[tokio::main()]
async fn main() -> Result<(), Box<dyn Error>> {
match CONFIG.mode {
ConfigMode::Server => init_server().await?,
ConfigMode::Client => init_client().await?,
};
Ok(()) Ok(())
} }

117
src/player.rs Normal file
View file

@ -0,0 +1,117 @@
use std::collections::VecDeque;
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use rodio::{OutputStream, Sink};
use crate::configuration::CONFIG;
use crate::file_explorer::walk_dir;
#[derive(Debug)]
pub enum PlayerAction {
Play,
SkipSong,
Set,
}
pub struct Player {
queue: VecDeque<PathBuf>,
sink: Sink,
stream: OutputStream,
}
impl std::ops::Deref for Player {
type Target = Sink;
fn deref(&self) -> &Self::Target {
&self.sink
}
}
impl Player {
pub fn new() -> Result<Self, Box<dyn Error>> {
let queue = walk_dir(&CONFIG.base_path)?;
let (stream, stream_handle) = OutputStream::try_default()?;
let sink = Sink::try_new(&stream_handle)?;
sink.set_volume(CONFIG.volume);
Ok(Player {
queue: VecDeque::from(queue),
sink,
stream,
})
}
pub fn handle_message(&mut self, message: PlayerAction) -> Result<(), Box<dyn Error>> {
match message {
PlayerAction::Play => self.play()?,
PlayerAction::SkipSong => self.skip_song()?,
PlayerAction::Set => unimplemented!(),
}
Ok(())
}
pub fn handle_idle(&mut self) -> Result<(), Box<dyn Error>> {
if self.sink.is_paused() {
return Ok(());
}
if self.queue.len() == 0 {
return Ok(());
}
if self.sink.len() != 0 {
return Ok(());
}
let file_path = self
.queue
.pop_front()
.expect("There was an error with the queue");
self.enqueue_file(file_path)?;
Ok(())
}
fn play(&mut self) -> Result<(), Box<dyn Error>> {
self.sink.play();
Ok(())
}
fn skip_song(&mut self) -> Result<(), Box<dyn Error>> {
println!("Skipping current song...:");
let file_path = self.queue.pop_front().expect("foo");
self.enqueue_file(file_path)?;
self.sink.skip_one();
Ok(())
}
fn enqueue_file(&self, file_path: PathBuf) -> Result<(), Box<dyn Error>> {
println!("Playing file: {}", file_path.display());
let file = File::open(file_path)?;
self.sink.append(rodio::Decoder::new(BufReader::new(file))?);
Ok(())
}
fn _play_pause(&self) {
if self.sink.is_paused() {
self.sink.play();
} else {
self.sink.pause();
};
}
fn set_playback_state(&self, is_paused: bool) {
if is_paused {
self.sink.pause();
} else {
self.sink.play();
};
}
}