main.rs 9.45 KB
use anyhow::{Result, Context};
use tokio_postgres::{Client, NoTls};
use log::{error, info, warn, trace};
use std::{fs,env};
use std::path::Path;
use chrono::Utc;
use clap::{Arg, Command};
use dotenv::dotenv;

#[tokio::main]
async fn main() {
    if env::var("RUST_LOG").is_err() {
        env::set_var("RUST_LOG", "info")
    }
    env_logger::init();
    dotenv().ok();
    let matches = Command::new("Clean Utility")
    .version("1.0")
    .about("Cleans old files and database rows based on retention policies")
    .arg(
        Arg::new("data_dir")
            .long("data-dir")
            .env("MATTERMOST_DATA_DIRECTORY")
            .help("Path to the Mattermost data directory")
            .required(true)
    )
    .arg(
        Arg::new("db_name")
            .short('n')
            .long("db-name")
            .env("DATABASE_NAME")
            .help("Database name")
            .required(true)
    )
    .arg(
        Arg::new("db_user")
            .short('u')
            .long("db-user")
            .env("DATABASE_USER")
            .help("Database user")
            .required(true)
    )
    .arg(
        Arg::new("db_password")
            .short('p')
            .long("db-password")
            .env("PGPASSWORD")
            .help("Database password")
            .required(true),
    )
    .arg(
        Arg::new("db_host")
            .short('h')
            .long("db-host")
            .env("DATABASE_HOST")
            .help("Database host")
            .required(true)
    )
    .arg(
        Arg::new("db_port")
            .short('P')
            .long("db-port")
            .env("DATABASE_PORT")
            .help("Database port")
            .required(true)
    )
    .arg(
        Arg::new("retention_days")
            .short('D')
            .long("retention-days")
            .env("RETENTION_DAYS")
            .help("Number of days to retain data")
            .required(true)
    )
    .arg(
        Arg::new("file_batch_size")
            .short('b')
            .long("file-batch-size")
            .env("FILE_BATCH_SIZE")
            .help("Batch size for file deletion")
            .required(true),
    )
    .arg(
        Arg::new("remove_posts")
            .long("remove-posts")
            .help("Wipe posts older than timestamp")
            .required(false)
    )
    .arg(
        Arg::new("dry_run")
            .long("dry-run")
            .help("Perform a dry run without making any changes")
            .required(false)
    )
    .get_matches();

    let mattermost_data_directory = matches.get_one::<String>("data_dir").unwrap();
    let database_name = matches.get_one::<String>("db_name").unwrap();
    let database_user = matches.get_one::<String>("db_user").unwrap();
    let database_password = matches.get_one::<String>("db_password").unwrap();
    let database_host = matches.get_one::<String>("db_host").unwrap();
    let database_port = matches.get_one::<String>("db_port").unwrap();
    let retention_days = matches.get_one::<String>("retention_days").unwrap();
    let file_batch_size = matches.get_one::<String>("file_batch_size").unwrap();
    let remove_posts = matches.contains_id("remove_posts");
    let dry_run = matches.contains_id("dry_run");

    let retention_days = retention_days.parse::<i64>().expect("fucking hell retention");
    let file_batch_size = file_batch_size.parse::<usize>().expect("fucking hell retention");
    //let file_batch_size = file_batch_size.parse::<usize>().expect("fucking hell retention");
    if let Err(err) = clean(
        mattermost_data_directory,
        database_name,
        database_user,
        database_password,
        database_host,
        database_port,
        retention_days,
        file_batch_size,
        remove_posts,
        dry_run,
    ).await {
        error!("Cleaning operation failed: {}", err);
    } else {
        info!("Cleaning operation completed successfully.");
    }
}

pub async fn clean(
    mattermost_data_directory: &str,
    database_name: &str,
    database_user: &str,
    database_password: &str,
    database_host: &str,
    database_port: &str,
    retention_days: i64,
    file_batch_size: usize,
    remove_posts: bool,
    dry_run: bool,
) -> Result<()> {
    validate(
        mattermost_data_directory,
        database_name,
        database_user,
        database_host,
        retention_days,
        file_batch_size,
    )?;

    let connection_string = format!(
        "postgres://{}:{}@{}:{}/{}?sslmode=disable",
        database_user, database_password, database_host, database_port, database_name
    );
    trace!("Connection string: {}", &connection_string);
    let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await.context("Failed to connect to the database")?;

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            warn!("error happened at spawn {e}");
            eprintln!("connection error: {}", e);
        }
    });
    info!("Connection established: OK");
    let millisecond_epoch = (Utc::now() - chrono::Duration::days(retention_days)).timestamp_millis();

    clean_files(&client, millisecond_epoch, mattermost_data_directory, file_batch_size, dry_run).await?;
    delete_file_info_rows(&client, millisecond_epoch, dry_run).await?;
    if remove_posts {
        delete_post_rows(&client, millisecond_epoch, dry_run).await?;
    } else {
        info!("Skipping posts removal")
    }

    Ok(())
}

async fn clean_files(
    client: &Client,
    millisecond_epoch: i64,
    mattermost_data_directory: &str,
    file_batch_size: usize,
    dry_run: bool,
) -> Result<()> {
    let mut batch = 0;
    let mut more_results = true;

    while more_results {
        more_results = clean_files_batch(
            client,
            millisecond_epoch,
            mattermost_data_directory,
            file_batch_size,
            batch,
            dry_run,
        ).await?;
        batch += 1;
    }

    Ok(())
}

async fn clean_files_batch(
    client: &Client,
    millisecond_epoch: i64,
    mattermost_data_directory: &str,
    file_batch_size: usize,
    batch: usize,
    dry_run: bool,
) -> Result<bool> {
    let query = "
        SELECT path, thumbnailpath, previewpath
        FROM fileinfo
        WHERE createat < $1
        OFFSET $2
        LIMIT $3;
    ";
    trace!("Querying: {}",&query);
    let offset = (batch * file_batch_size) as i64;
    let limit = file_batch_size as i64;
    trace!("params: {} {} {}",&millisecond_epoch, &offset, &limit);
    let rows = client
        .query(query, &[&millisecond_epoch, &offset, &limit])
        .await.context("Failed to fetch file info rows")?;

    let mut more_results = false;

    for row in rows {
        more_results = true;
        let path: String = row.get("path");
        let thumbnail_path: String = row.get("thumbnailpath");
        let preview_path: String = row.get("previewpath");

        if dry_run {
            info!("[DRY RUN] Would remove: {:?}, {:?}, {:?}", path, thumbnail_path, preview_path);
        } else {
            remove_files(mattermost_data_directory, &path, &thumbnail_path, &preview_path).context("Failed to remove files")?;
        }
    }

    Ok(more_results)
}

fn remove_files(base_dir: &str, path: &str, thumbnail_path: &str, preview_path: &str) -> Result<()> {
    let files = [path, thumbnail_path, preview_path];
    let mut num_deleted = 0;
    for file in files {
        if !file.is_empty() {
            let full_path = Path::new(base_dir).join(file);
            if full_path.exists() {
                fs::remove_file(full_path.clone()).context(format!("Failed to delete file: {:?}", &full_path))?;
                trace!("Removed: {:#?} ", &full_path);
                num_deleted += 1;
            } else {
                trace!("Path does not exist: {:#?} ", &full_path);
            }
        }
    }
    if num_deleted > 0 {
      info!("Deleted: {} files. Main file: {}",num_deleted,path);
    } else {
      trace!("No files to be deleted");
    }
    Ok(())
}

async fn delete_file_info_rows(client: &Client, millisecond_epoch: i64, dry_run: bool) -> Result<()> {
    let query = "
        DELETE FROM fileinfo
        WHERE createat < $1;
    ";
    trace!("Querying: {}",&query);
    trace!("Params: {:#?}",&millisecond_epoch);
    if dry_run {
        info!("[DRY RUN] Would delete file info rows older than {}", millisecond_epoch);
        return Ok(());
    }
    let result = client.execute(query, &[&millisecond_epoch]).await.context("Failed to delete file info rows")?;
    info!("Removed {} file information rows", result);
    Ok(())
}

async fn delete_post_rows(client: &Client, millisecond_epoch: i64, dry_run: bool) -> Result<()> {
    let query = "
        DELETE FROM posts
        WHERE createat < $1;
    ";
    trace!("Querying: {}",&query);
    trace!("Params: {:#?}",&millisecond_epoch);
    if dry_run {
        info!("[DRY RUN] Would delete post rows older than {}", millisecond_epoch);
        return Ok(());
    }
    let result = client.execute(query, &[&millisecond_epoch]).await.context("Failed to delete post rows")?;
    info!("Removed {} post rows", result);
    Ok(())
}

fn validate(
    mattermost_data_directory: &str,
    database_name: &str,
    database_user: &str,
    database_host: &str,
    retention_days: i64,
    file_batch_size: usize,
) -> Result<()> {
    if mattermost_data_directory.is_empty()
        || database_name.is_empty()
        || database_user.is_empty()
        || database_host.is_empty()
        || retention_days <= 0
        || file_batch_size == 0
    {
        anyhow::bail!("Invalid input parameters");
    }
    Ok(())
}