epoch/src/manager.rs

163 lines
4.7 KiB
Rust

use crate::docker::{Container, ContainerBuilder};
use crate::Error;
use bollard::models::ContainerSummary;
use bollard::Docker;
use futures::future::try_join_all;
use futures::StreamExt;
use std::collections::HashMap;
use tracing::{debug, info, instrument};
/// Namespace to manage containers together (like compose projects)
type ServiceGroup = String;
const DEFAULT_GROUP: &str = "NONE";
#[derive(Debug)]
struct Services(HashMap<ServiceGroup, Vec<Container>>);
impl From<&Vec<ContainerSummary>> for Services {
fn from(value: &Vec<ContainerSummary>) -> Self {
let mut services = HashMap::new();
for summary in value.iter().cloned() {
let container = crate::docker::Container::from(summary);
if container.mounts.is_empty() {
continue;
}
let group = match &container.labels.service.group {
// Doesn't have a group defined by the user,
// try to use the compose hash or throw it with the default group
None => container
.labels
.service
.compose_hash
.as_ref()
.map(|s| s.to_string())
.unwrap_or_else(|| DEFAULT_GROUP.to_owned()),
Some(group) => group.to_owned(),
};
let list: &mut Vec<Container> = services.entry(group).or_default();
list.push(container);
}
Self(services)
}
}
#[tracing::instrument(skip(containers))]
pub async fn manage(containers: &Vec<ContainerSummary>) -> crate::Result<()> {
let total_containers = containers.len();
let services = Services::from(containers);
info!(
"Found {} containers, grouped into {} groups",
total_containers,
services.0.len()
);
// TODO: reuse main instance
let docker = Docker::connect_with_local_defaults()?;
// TODO: iterate over groups in parallel
for (group, containers) in services.0.iter() {
handle_group(&docker, group, containers).await?;
}
// Do cleanup to ensure all containers that are created are also deleted
Ok(())
}
#[tracing::instrument(skip(docker, containers))]
async fn handle_group(
docker: &Docker,
_group: &ServiceGroup,
containers: &Vec<Container>,
) -> Result<(), Error> {
// stop containers of service group
try_join_all(
containers
.into_iter()
.map(|container| stop_container(&docker, &container.id)),
)
.await?;
// create container with the same mounts as each container in the group
let backup_container = ContainerBuilder::new().with_image("hello-world").build();
run_backup_container(&docker, &backup_container).await?;
// run the new container
// restart the containers
try_join_all(
containers
.into_iter()
.map(|container| start_container(&docker, &container.id)),
)
.await?;
Ok(())
}
async fn stop_container(
docker: &Docker,
container_id: impl AsRef<str>,
) -> crate::Result<()> {
let container_id = container_id.as_ref();
info!("Stoping container: {}", container_id);
let stop_opts = bollard::query_parameters::StopContainerOptionsBuilder::new().build();
docker
.stop_container(container_id, Some(stop_opts))
.await?;
Ok(())
}
async fn start_container(
docker: &Docker,
container_id: impl AsRef<str>,
) -> crate::Result<()> {
let container_id = container_id.as_ref();
info!("Starting container: {}", container_id);
let start_opts = bollard::query_parameters::StartContainerOptionsBuilder::new().build();
docker
.start_container(container_id, Some(start_opts))
.await?;
Ok(())
}
#[instrument(skip_all, fields(container_id))]
async fn run_backup_container(docker: &Docker, container: &Container) -> crate::Result<()> {
info!("Creating container from image {:?}",&container.image);
debug!("{:#?}", &container);
let create_opts = bollard::query_parameters::CreateContainerOptionsBuilder::default().build();
let create_conf = bollard::models::ContainerCreateBody::from(container);
let response = docker
.create_container(Some(create_opts), create_conf)
.await?;
tracing::Span::current().record("container_id", &response.id);
start_container(docker, &response.id).await?;
let wait_opts = bollard::query_parameters::WaitContainerOptionsBuilder::default().build();
let mut stream = docker.wait_container(&response.id, Some(wait_opts));
while let Some(status) = stream.next().await {
debug!("Received status: {:?}", status?);
}
stop_container(docker, &response.id).await?;
Ok(())
}