nextsync/src/services/enumerator.rs

127 lines
3.6 KiB
Rust

use super::{
req_props::{Props, ReqProps, Response},
service::Service,
};
use crate::utils::path;
use std::sync::Arc;
use tokio::{task, sync::{mpsc::UnboundedSender, Mutex}};
pub const DEFAULT_DEPTH: u16 = 2;
pub struct Enumerator<'a> {
service: &'a Service,
path: String,
depth: u16,
properties: Vec<Props>,
}
impl<'a> Enumerator<'a> {
pub fn new(service: &'a Service) -> Self {
Enumerator {
service,
path: String::new(),
depth: DEFAULT_DEPTH,
properties: Vec::new(),
}
}
pub fn set_path(mut self, path: String) -> Self {
self.path = path;
self
}
pub fn set_depth(mut self, depth: u16) -> Self {
self.depth = depth;
self
}
pub fn get_properties(mut self, properties: Vec<Props>) -> Self {
self.properties.extend(properties);
self
}
pub async fn enumerate(&self) -> Result<(Vec<Response>, Vec<Response>), std::io::Error> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let files = Arc::new(Mutex::new(Vec::new()));
let folders = Arc::new(Mutex::new(Vec::new()));
let service = Arc::from(self.service.clone());
let tasks_active = Arc::new(Mutex::new(0));
tx.send(self.path.clone());
let mut handles = vec![];
loop {
dbg!(*tasks_active.lock().await);
if let Ok(path) = rx.try_recv() {
handles.push(enumerator_task(EnumeratorTask{
path,
depth: self.depth.clone(),
tx: tx.clone(),
files: Arc::clone(&files),
folders: Arc::clone(&folders),
properties: self.properties.clone(),
service: self.service,
tasks_active: Arc::clone(&tasks_active),
}));
} else if *tasks_active.lock().await <= 0 {
dbg!("brek");
break;
}
}
// Wait for all tasks to complete
for handle in handles {
let _ = handle.await;
}
Ok((
Arc::try_unwrap(files).unwrap().into_inner(),
Arc::try_unwrap(folders).unwrap().into_inner(),
))
}
}
struct EnumeratorTask<'a> {
path: String,
depth: u16,
tx: UnboundedSender<String>,
files: Arc<Mutex<Vec<Response>>>,
folders: Arc<Mutex<Vec<Response>>>,
properties: Vec<Props>,
service: &'a Service,
tasks_active: Arc<Mutex<u16>>,
}
async fn enumerator_task<'a, 'b>(data: EnumeratorTask<'a, 'b>) -> task::JoinHandle<()> {
let current_depth = path::get_depth(&data.path);
*data.tasks_active.lock().await += 1;
tokio::task::spawn(async move {
let res = ReqProps::new(data.service)
.set_path(data.path.clone())
// .set_depth(self.depth)
.get_properties(data.properties)
.send()
.await
.unwrap();
dbg!(&res);
for obj in res.responses {
if obj.is_dir() {
// Avoid enumerating the same folder multiple times
if obj.abs_path() != data.path {
// depth deeper than current + self.depth
if obj.path_depth() > current_depth + data.depth {
data.tx.send(obj.abs_path().to_owned()).unwrap();
}
data.folders.lock().await.push(obj);
}
} else {
data.files.lock().await.push(obj);
}
}
*data.tasks_active.lock().await -= 1;
})
}