fmt
Showing
1 changed file
with
285 additions
and
237 deletions
1 | use anyhow::{Result, Context}; | 1 | use anyhow::{Context, Result}; |
2 | use tokio_postgres::{Client, NoTls}; | ||
3 | use log::{error, info, warn, trace}; | ||
4 | use std::{fs,env}; | ||
5 | use std::path::Path; | ||
6 | use chrono::Utc; | 2 | use chrono::Utc; |
7 | use clap::{Arg, Command}; | 3 | use clap::{Arg, Command}; |
8 | use dotenv::dotenv; | 4 | use dotenv::dotenv; |
5 | use log::{error, info, trace, warn}; | ||
6 | use std::path::Path; | ||
7 | use std::{env, fs}; | ||
8 | use tokio_postgres::{Client, NoTls}; | ||
9 | 9 | ||
10 | #[tokio::main] | 10 | #[tokio::main] |
11 | async fn main() { | 11 | async fn main() { |
12 | if env::var("RUST_LOG").is_err() { | 12 | if env::var("RUST_LOG").is_err() { |
13 | env::set_var("RUST_LOG", "info") | 13 | env::set_var("RUST_LOG", "info") |
14 | } | 14 | } |
15 | env_logger::init(); | 15 | env_logger::init(); |
16 | dotenv().ok(); | 16 | dotenv().ok(); |
17 | let matches = Command::new("Clean Utility") | 17 | let matches = Command::new("Clean Utility") |
18 | .version("1.0") | 18 | .version("1.0") |
19 | .about("Cleans old files and database rows based on retention policies") | 19 | .about("Cleans old files and database rows based on retention policies") |
20 | .arg( | 20 | .arg( |
21 | Arg::new("data_dir") | 21 | Arg::new("data_dir") |
22 | .long("data-dir") | 22 | .long("data-dir") |
23 | .env("MATTERMOST_DATA_DIRECTORY") | 23 | .env("MATTERMOST_DATA_DIRECTORY") |
24 | .help("Path to the Mattermost data directory") | 24 | .help("Path to the Mattermost data directory") |
25 | .required(true) | 25 | .required(true), |
26 | ) | 26 | ) |
27 | .arg( | 27 | .arg( |
28 | Arg::new("db_name") | 28 | Arg::new("db_name") |
29 | .short('n') | 29 | .short('n') |
30 | .long("db-name") | 30 | .long("db-name") |
31 | .env("DATABASE_NAME") | 31 | .env("DATABASE_NAME") |
32 | .help("Database name") | 32 | .help("Database name") |
33 | .required(true) | 33 | .required(true), |
34 | ) | 34 | ) |
35 | .arg( | 35 | .arg( |
36 | Arg::new("db_user") | 36 | Arg::new("db_user") |
37 | .short('u') | 37 | .short('u') |
38 | .long("db-user") | 38 | .long("db-user") |
39 | .env("DATABASE_USER") | 39 | .env("DATABASE_USER") |
40 | .help("Database user") | 40 | .help("Database user") |
41 | .required(true) | 41 | .required(true), |
42 | ) | 42 | ) |
43 | .arg( | 43 | .arg( |
44 | Arg::new("db_password") | 44 | Arg::new("db_password") |
45 | .short('p') | 45 | .short('p') |
46 | .long("db-password") | 46 | .long("db-password") |
47 | .env("PGPASSWORD") | 47 | .env("PGPASSWORD") |
48 | .help("Database password") | 48 | .help("Database password") |
49 | .required(true), | 49 | .required(true), |
50 | ) | 50 | ) |
51 | .arg( | 51 | .arg( |
52 | Arg::new("db_host") | 52 | Arg::new("db_host") |
53 | .short('h') | 53 | .short('h') |
54 | .long("db-host") | 54 | .long("db-host") |
55 | .env("DATABASE_HOST") | 55 | .env("DATABASE_HOST") |
56 | .help("Database host") | 56 | .help("Database host") |
57 | .required(true) | 57 | .required(true), |
58 | ) | 58 | ) |
59 | .arg( | 59 | .arg( |
60 | Arg::new("db_port") | 60 | Arg::new("db_port") |
61 | .short('P') | 61 | .short('P') |
62 | .long("db-port") | 62 | .long("db-port") |
63 | .env("DATABASE_PORT") | 63 | .env("DATABASE_PORT") |
64 | .help("Database port") | 64 | .help("Database port") |
65 | .required(true) | 65 | .required(true), |
66 | ) | 66 | ) |
67 | .arg( | 67 | .arg( |
68 | Arg::new("retention_days") | 68 | Arg::new("retention_days") |
69 | .short('D') | 69 | .short('D') |
70 | .long("retention-days") | 70 | .long("retention-days") |
71 | .env("RETENTION_DAYS") | 71 | .env("RETENTION_DAYS") |
72 | .help("Number of days to retain data") | 72 | .help("Number of days to retain data") |
73 | .required(true) | 73 | .required(true), |
74 | ) | 74 | ) |
75 | .arg( | 75 | .arg( |
76 | Arg::new("file_batch_size") | 76 | Arg::new("file_batch_size") |
77 | .short('b') | 77 | .short('b') |
78 | .long("file-batch-size") | 78 | .long("file-batch-size") |
79 | .env("FILE_BATCH_SIZE") | 79 | .env("FILE_BATCH_SIZE") |
80 | .help("Batch size for file deletion") | 80 | .help("Batch size for file deletion") |
81 | .required(true), | 81 | .required(true), |
82 | ) | 82 | ) |
83 | .arg( | 83 | .arg( |
84 | Arg::new("remove_posts") | 84 | Arg::new("remove_posts") |
85 | .long("remove-posts") | 85 | .long("remove-posts") |
86 | .help("Wipe posts older than timestamp") | 86 | .help("Wipe posts older than timestamp") |
87 | .required(false) | 87 | .required(false), |
88 | ) | 88 | ) |
89 | .arg( | 89 | .arg( |
90 | Arg::new("dry_run") | 90 | Arg::new("dry_run") |
91 | .long("dry-run") | 91 | .long("dry-run") |
92 | .help("Perform a dry run without making any changes") | 92 | .help("Perform a dry run without making any changes") |
93 | .required(false) | 93 | .required(false), |
94 | ) | 94 | ) |
95 | .get_matches(); | 95 | .get_matches(); |
96 | 96 | ||
97 | let mattermost_data_directory = matches.get_one::<String>("data_dir").unwrap(); | 97 | let mattermost_data_directory = matches.get_one::<String>("data_dir").unwrap(); |
98 | let database_name = matches.get_one::<String>("db_name").unwrap(); | 98 | let database_name = matches.get_one::<String>("db_name").unwrap(); |
99 | let database_user = matches.get_one::<String>("db_user").unwrap(); | 99 | let database_user = matches.get_one::<String>("db_user").unwrap(); |
100 | let database_password = matches.get_one::<String>("db_password").unwrap(); | 100 | let database_password = matches.get_one::<String>("db_password").unwrap(); |
101 | let database_host = matches.get_one::<String>("db_host").unwrap(); | 101 | let database_host = matches.get_one::<String>("db_host").unwrap(); |
102 | let database_port = matches.get_one::<String>("db_port").unwrap(); | 102 | let database_port = matches.get_one::<String>("db_port").unwrap(); |
103 | let retention_days = matches.get_one::<String>("retention_days").unwrap(); | 103 | let retention_days = matches.get_one::<String>("retention_days").unwrap(); |
104 | let file_batch_size = matches.get_one::<String>("file_batch_size").unwrap(); | 104 | let file_batch_size = matches.get_one::<String>("file_batch_size").unwrap(); |
105 | let remove_posts = matches.contains_id("remove_posts"); | 105 | let remove_posts = matches.contains_id("remove_posts"); |
106 | let dry_run = matches.contains_id("dry_run"); | 106 | let dry_run = matches.contains_id("dry_run"); |
107 | 107 | ||
108 | let retention_days = retention_days.parse::<i64>().expect("fucking hell retention"); | 108 | let retention_days = retention_days |
109 | let file_batch_size = file_batch_size.parse::<usize>().expect("fucking hell retention"); | 109 | .parse::<i64>() |
110 | //let file_batch_size = file_batch_size.parse::<usize>().expect("fucking hell retention"); | 110 | .expect("fucking hell retention"); |
111 | if let Err(err) = clean( | 111 | let file_batch_size = file_batch_size |
112 | mattermost_data_directory, | 112 | .parse::<usize>() |
113 | database_name, | 113 | .expect("fucking hell batch size"); |
114 | database_user, | 114 | |
115 | database_password, | 115 | if let Err(err) = clean( |
116 | database_host, | 116 | mattermost_data_directory, |
117 | database_port, | 117 | database_name, |
118 | retention_days, | 118 | database_user, |
119 | file_batch_size, | 119 | database_password, |
120 | remove_posts, | 120 | database_host, |
121 | dry_run, | 121 | database_port, |
122 | ).await { | 122 | retention_days, |
123 | error!("Cleaning operation failed: {}", err); | 123 | file_batch_size, |
124 | } else { | 124 | remove_posts, |
125 | info!("Cleaning operation completed successfully."); | 125 | dry_run, |
126 | } | 126 | ) |
127 | .await | ||
128 | { | ||
129 | error!("Cleaning operation failed: {}", err); | ||
130 | } else { | ||
131 | info!("Cleaning operation completed successfully."); | ||
132 | } | ||
127 | } | 133 | } |
128 | 134 | ||
129 | pub async fn clean( | 135 | pub async fn clean( |
130 | mattermost_data_directory: &str, | 136 | mattermost_data_directory: &str, |
131 | database_name: &str, | 137 | database_name: &str, |
132 | database_user: &str, | 138 | database_user: &str, |
133 | database_password: &str, | 139 | database_password: &str, |
134 | database_host: &str, | 140 | database_host: &str, |
135 | database_port: &str, | 141 | database_port: &str, |
136 | retention_days: i64, | 142 | retention_days: i64, |
137 | file_batch_size: usize, | 143 | file_batch_size: usize, |
138 | remove_posts: bool, | 144 | remove_posts: bool, |
139 | dry_run: bool, | 145 | dry_run: bool, |
140 | ) -> Result<()> { | 146 | ) -> Result<()> { |
141 | validate( | 147 | validate( |
142 | mattermost_data_directory, | 148 | mattermost_data_directory, |
143 | database_name, | 149 | database_name, |
144 | database_user, | 150 | database_user, |
145 | database_host, | 151 | database_host, |
146 | retention_days, | 152 | retention_days, |
147 | file_batch_size, | 153 | file_batch_size, |
148 | )?; | 154 | )?; |
149 | |||
150 | let connection_string = format!( | ||
151 | "postgres://{}:{}@{}:{}/{}?sslmode=disable", | ||
152 | database_user, database_password, database_host, database_port, database_name | ||
153 | ); | ||
154 | trace!("Connection string: {}", &connection_string); | ||
155 | let (client, connection) = tokio_postgres::connect(&connection_string, NoTls).await.context("Failed to connect to the database")?; | ||
156 | 155 | ||
157 | tokio::spawn(async move { | 156 | let connection_string = format!( |
158 | if let Err(e) = connection.await { | 157 | "postgres://{}:{}@{}:{}/{}?sslmode=disable", |
159 | warn!("error happened at spawn {e}"); | 158 | database_user, database_password, database_host, database_port, database_name |
160 | eprintln!("connection error: {}", e); | 159 | ); |
161 | } | 160 | trace!("Connection string: {}", &connection_string); |
162 | }); | 161 | let (client, connection) = tokio_postgres::connect(&connection_string, NoTls) |
163 | info!("Connection established: OK"); | 162 | .await |
164 | let millisecond_epoch = (Utc::now() - chrono::Duration::days(retention_days)).timestamp_millis(); | 163 | .context("Failed to connect to the database")?; |
165 | 164 | ||
166 | clean_files(&client, millisecond_epoch, mattermost_data_directory, file_batch_size, dry_run).await?; | 165 | tokio::spawn(async move { |
167 | delete_file_info_rows(&client, millisecond_epoch, dry_run).await?; | 166 | if let Err(e) = connection.await { |
168 | if remove_posts { | 167 | warn!("error happened at spawn {e}"); |
169 | delete_post_rows(&client, millisecond_epoch, dry_run).await?; | 168 | eprintln!("connection error: {}", e); |
170 | } else { | ||
171 | info!("Skipping posts removal") | ||
172 | } | 169 | } |
170 | }); | ||
171 | info!("Connection established: OK"); | ||
172 | let millisecond_epoch = (Utc::now() - chrono::Duration::days(retention_days)).timestamp_millis(); | ||
173 | |||
174 | clean_files( | ||
175 | &client, | ||
176 | millisecond_epoch, | ||
177 | mattermost_data_directory, | ||
178 | file_batch_size, | ||
179 | dry_run, | ||
180 | ) | ||
181 | .await?; | ||
182 | delete_file_info_rows(&client, millisecond_epoch, dry_run).await?; | ||
183 | if remove_posts { | ||
184 | delete_post_rows(&client, millisecond_epoch, dry_run).await?; | ||
185 | } else { | ||
186 | info!("Skipping posts removal") | ||
187 | } | ||
173 | 188 | ||
174 | Ok(()) | 189 | Ok(()) |
175 | } | 190 | } |
176 | 191 | ||
177 | async fn clean_files( | 192 | async fn clean_files( |
178 | client: &Client, | 193 | client: &Client, |
179 | millisecond_epoch: i64, | 194 | millisecond_epoch: i64, |
180 | mattermost_data_directory: &str, | 195 | mattermost_data_directory: &str, |
181 | file_batch_size: usize, | 196 | file_batch_size: usize, |
182 | dry_run: bool, | 197 | dry_run: bool, |
183 | ) -> Result<()> { | 198 | ) -> Result<()> { |
184 | let mut batch = 0; | 199 | let mut batch = 0; |
185 | let mut more_results = true; | 200 | let mut more_results = true; |
186 | 201 | ||
187 | while more_results { | 202 | while more_results { |
188 | more_results = clean_files_batch( | 203 | more_results = clean_files_batch( |
189 | client, | 204 | client, |
190 | millisecond_epoch, | 205 | millisecond_epoch, |
191 | mattermost_data_directory, | 206 | mattermost_data_directory, |
192 | file_batch_size, | 207 | file_batch_size, |
193 | batch, | 208 | batch, |
194 | dry_run, | 209 | dry_run, |
195 | ).await?; | 210 | ) |
196 | batch += 1; | 211 | .await?; |
197 | } | 212 | batch += 1; |
213 | } | ||
198 | 214 | ||
199 | Ok(()) | 215 | Ok(()) |
200 | } | 216 | } |
201 | 217 | ||
202 | async fn clean_files_batch( | 218 | async fn clean_files_batch( |
203 | client: &Client, | 219 | client: &Client, |
204 | millisecond_epoch: i64, | 220 | millisecond_epoch: i64, |
205 | mattermost_data_directory: &str, | 221 | mattermost_data_directory: &str, |
206 | file_batch_size: usize, | 222 | file_batch_size: usize, |
207 | batch: usize, | 223 | batch: usize, |
208 | dry_run: bool, | 224 | dry_run: bool, |
209 | ) -> Result<bool> { | 225 | ) -> Result<bool> { |
210 | let query = " | 226 | let query = " |
211 | SELECT path, thumbnailpath, previewpath | 227 | SELECT path, thumbnailpath, previewpath |
212 | FROM fileinfo | 228 | FROM fileinfo |
213 | WHERE createat < $1 | 229 | WHERE createat < $1 |
214 | OFFSET $2 | 230 | OFFSET $2 |
215 | LIMIT $3; | 231 | LIMIT $3; |
216 | "; | 232 | "; |
217 | trace!("Querying: {}",&query); | 233 | trace!("Querying: {}", &query); |
218 | let offset = (batch * file_batch_size) as i64; | 234 | let offset = (batch * file_batch_size) as i64; |
219 | let limit = file_batch_size as i64; | 235 | let limit = file_batch_size as i64; |
220 | trace!("params: {} {} {}",&millisecond_epoch, &offset, &limit); | 236 | trace!("params: {} {} {}", &millisecond_epoch, &offset, &limit); |
221 | let rows = client | 237 | let rows = client |
222 | .query(query, &[&millisecond_epoch, &offset, &limit]) | 238 | .query(query, &[&millisecond_epoch, &offset, &limit]) |
223 | .await.context("Failed to fetch file info rows")?; | 239 | .await |
240 | .context("Failed to fetch file info rows")?; | ||
224 | 241 | ||
225 | let mut more_results = false; | 242 | let mut more_results = false; |
226 | 243 | ||
227 | for row in rows { | 244 | for row in rows { |
228 | more_results = true; | 245 | more_results = true; |
229 | let path: String = row.get("path"); | 246 | let path: String = row.get("path"); |
230 | let thumbnail_path: String = row.get("thumbnailpath"); | 247 | let thumbnail_path: String = row.get("thumbnailpath"); |
231 | let preview_path: String = row.get("previewpath"); | 248 | let preview_path: String = row.get("previewpath"); |
232 | 249 | ||
233 | if dry_run { | 250 | if dry_run { |
234 | info!("[DRY RUN] Would remove: {:?}, {:?}, {:?}", path, thumbnail_path, preview_path); | 251 | info!( |
235 | } else { | 252 | "[DRY RUN] Would remove: {:?}, {:?}, {:?}", |
236 | remove_files(mattermost_data_directory, &path, &thumbnail_path, &preview_path).context("Failed to remove files")?; | 253 | path, thumbnail_path, preview_path |
237 | } | 254 | ); |
255 | } else { | ||
256 | remove_files( | ||
257 | mattermost_data_directory, | ||
258 | &path, | ||
259 | &thumbnail_path, | ||
260 | &preview_path, | ||
261 | ) | ||
262 | .context("Failed to remove files")?; | ||
238 | } | 263 | } |
264 | } | ||
239 | 265 | ||
240 | Ok(more_results) | 266 | Ok(more_results) |
241 | } | 267 | } |
242 | 268 | ||
243 | fn remove_files(base_dir: &str, path: &str, thumbnail_path: &str, preview_path: &str) -> Result<()> { | 269 | fn remove_files( |
244 | let files = [path, thumbnail_path, preview_path]; | 270 | base_dir: &str, |
245 | let mut num_deleted = 0; | 271 | path: &str, |
246 | for file in files { | 272 | thumbnail_path: &str, |
247 | if !file.is_empty() { | 273 | preview_path: &str, |
248 | let full_path = Path::new(base_dir).join(file); | 274 | ) -> Result<()> { |
249 | if full_path.exists() { | 275 | let files = [path, thumbnail_path, preview_path]; |
250 | fs::remove_file(full_path.clone()).context(format!("Failed to delete file: {:?}", &full_path))?; | 276 | let mut num_deleted = 0; |
251 | trace!("Removed: {:#?} ", &full_path); | 277 | for file in files { |
252 | num_deleted += 1; | 278 | if !file.is_empty() { |
253 | } else { | 279 | let full_path = Path::new(base_dir).join(file); |
254 | trace!("Path does not exist: {:#?} ", &full_path); | 280 | if full_path.exists() { |
255 | } | 281 | fs::remove_file(full_path.clone()) |
256 | } | 282 | .context(format!("Failed to delete file: {:?}", &full_path))?; |
257 | } | 283 | trace!("Removed: {:#?} ", &full_path); |
258 | if num_deleted > 0 { | 284 | num_deleted += 1; |
259 | info!("Deleted: {} files. Main file: {}",num_deleted,path); | 285 | } else { |
260 | } else { | 286 | trace!("Path does not exist: {:#?} ", &full_path); |
261 | trace!("No files to be deleted"); | 287 | } |
262 | } | 288 | } |
263 | Ok(()) | 289 | } |
290 | if num_deleted > 0 { | ||
291 | info!("Deleted: {} files. Main file: {}", num_deleted, path); | ||
292 | } else { | ||
293 | trace!("No files to be deleted"); | ||
294 | } | ||
295 | Ok(()) | ||
264 | } | 296 | } |
265 | 297 | ||
266 | async fn delete_file_info_rows(client: &Client, millisecond_epoch: i64, dry_run: bool) -> Result<()> { | 298 | async fn delete_file_info_rows( |
267 | let query = " | 299 | client: &Client, |
300 | millisecond_epoch: i64, | ||
301 | dry_run: bool, | ||
302 | ) -> Result<()> { | ||
303 | let query = " | ||
268 | DELETE FROM fileinfo | 304 | DELETE FROM fileinfo |
269 | WHERE createat < $1; | 305 | WHERE createat < $1; |
270 | "; | 306 | "; |
271 | trace!("Querying: {}",&query); | 307 | trace!("Querying: {}", &query); |
272 | trace!("Params: {:#?}",&millisecond_epoch); | 308 | trace!("Params: {:#?}", &millisecond_epoch); |
273 | if dry_run { | 309 | if dry_run { |
274 | info!("[DRY RUN] Would delete file info rows older than {}", millisecond_epoch); | 310 | info!( |
275 | return Ok(()); | 311 | "[DRY RUN] Would delete file info rows older than {}", |
276 | } | 312 | millisecond_epoch |
277 | let result = client.execute(query, &[&millisecond_epoch]).await.context("Failed to delete file info rows")?; | 313 | ); |
278 | info!("Removed {} file information rows", result); | 314 | return Ok(()); |
279 | Ok(()) | 315 | } |
316 | let result = client | ||
317 | .execute(query, &[&millisecond_epoch]) | ||
318 | .await | ||
319 | .context("Failed to delete file info rows")?; | ||
320 | info!("Removed {} file information rows", result); | ||
321 | Ok(()) | ||
280 | } | 322 | } |
281 | 323 | ||
282 | async fn delete_post_rows(client: &Client, millisecond_epoch: i64, dry_run: bool) -> Result<()> { | 324 | async fn delete_post_rows(client: &Client, millisecond_epoch: i64, dry_run: bool) -> Result<()> { |
283 | let query = " | 325 | let query = " |
284 | DELETE FROM posts | 326 | DELETE FROM posts |
285 | WHERE createat < $1; | 327 | WHERE createat < $1; |
286 | "; | 328 | "; |
287 | trace!("Querying: {}",&query); | 329 | trace!("Querying: {}", &query); |
288 | trace!("Params: {:#?}",&millisecond_epoch); | 330 | trace!("Params: {:#?}", &millisecond_epoch); |
289 | if dry_run { | 331 | if dry_run { |
290 | info!("[DRY RUN] Would delete post rows older than {}", millisecond_epoch); | 332 | info!( |
291 | return Ok(()); | 333 | "[DRY RUN] Would delete post rows older than {}", |
292 | } | 334 | millisecond_epoch |
293 | let result = client.execute(query, &[&millisecond_epoch]).await.context("Failed to delete post rows")?; | 335 | ); |
294 | info!("Removed {} post rows", result); | 336 | return Ok(()); |
295 | Ok(()) | 337 | } |
338 | let result = client | ||
339 | .execute(query, &[&millisecond_epoch]) | ||
340 | .await | ||
341 | .context("Failed to delete post rows")?; | ||
342 | info!("Removed {} post rows", result); | ||
343 | Ok(()) | ||
296 | } | 344 | } |
297 | 345 | ||
298 | fn validate( | 346 | fn validate( |
299 | mattermost_data_directory: &str, | 347 | mattermost_data_directory: &str, |
300 | database_name: &str, | 348 | database_name: &str, |
301 | database_user: &str, | 349 | database_user: &str, |
302 | database_host: &str, | 350 | database_host: &str, |
303 | retention_days: i64, | 351 | retention_days: i64, |
304 | file_batch_size: usize, | 352 | file_batch_size: usize, |
305 | ) -> Result<()> { | 353 | ) -> Result<()> { |
306 | if mattermost_data_directory.is_empty() | 354 | if mattermost_data_directory.is_empty() |
307 | || database_name.is_empty() | 355 | || database_name.is_empty() |
308 | || database_user.is_empty() | 356 | || database_user.is_empty() |
309 | || database_host.is_empty() | 357 | || database_host.is_empty() |
310 | || retention_days <= 0 | 358 | || retention_days <= 0 |
311 | || file_batch_size == 0 | 359 | || file_batch_size == 0 |
312 | { | 360 | { |
313 | anyhow::bail!("Invalid input parameters"); | 361 | anyhow::bail!("Invalid input parameters"); |
314 | } | 362 | } |
315 | Ok(()) | 363 | Ok(()) |
316 | } | 364 | } | ... | ... |
-
Please register or sign in to post a comment