feat(enumerator): working enumerator with depth

This commit is contained in:
grimhilt 2024-09-29 22:56:51 +02:00
parent 6989e87a56
commit 487a94f829
3 changed files with 55 additions and 71 deletions

View File

@ -4,9 +4,9 @@ use super::{
}; };
use crate::utils::path; use crate::utils::path;
use std::sync::Arc; use std::sync::Arc;
use tokio::{task, sync::{mpsc::UnboundedSender, Mutex}}; use tokio::{sync::Mutex, task::JoinSet};
pub const DEFAULT_DEPTH: u16 = 2; pub const DEFAULT_DEPTH: u16 = 3;
pub struct Enumerator<'a> { pub struct Enumerator<'a> {
service: &'a Service, service: &'a Service,
@ -49,31 +49,51 @@ impl<'a> Enumerator<'a> {
tx.send(self.path.clone()); tx.send(self.path.clone());
let mut handles = vec![]; let mut taskSet = JoinSet::new();
loop { loop {
dbg!(*tasks_active.lock().await);
if let Ok(path) = rx.try_recv() { if let Ok(path) = rx.try_recv() {
handles.push(enumerator_task(EnumeratorTask{ let current_depth = path::get_depth(&path);
path, *tasks_active.lock().await += 1;
depth: self.depth.clone(), let tx_clone = tx.clone();
tx: tx.clone(), let files_clone = Arc::clone(&files);
files: Arc::clone(&files), let folders_clone = Arc::clone(&folders);
folders: Arc::clone(&folders), let service_clone = Arc::clone(&service);
properties: self.properties.clone(), let properties = self.properties.clone();
service: self.service, let depth = self.depth.clone();
tasks_active: Arc::clone(&tasks_active), let tasks_active_clone = Arc::clone(&tasks_active);
}));
let task = taskSet.spawn(async move {
let res = ReqProps::new(&service_clone)
.set_path(path.clone())
.set_depth(depth)
.get_properties(properties)
.send()
.await
.unwrap();
for obj in res.responses {
if obj.is_dir() {
// Avoid enumerating the same folder multiple times
if obj.abs_path() != &path {
// depth deeper than current + self.depth
if obj.path_depth() >= current_depth + depth {
tx_clone.send(obj.abs_path().to_owned()).unwrap();
}
folders_clone.lock().await.push(obj);
}
} else {
files_clone.lock().await.push(obj);
}
}
*tasks_active_clone.lock().await -= 1;
});
} else if *tasks_active.lock().await <= 0 { } else if *tasks_active.lock().await <= 0 {
dbg!("brek");
break; break;
} }
} }
// Wait for all tasks to complete taskSet.join_all().await;
for handle in handles {
let _ = handle.await;
}
Ok(( Ok((
Arc::try_unwrap(files).unwrap().into_inner(), Arc::try_unwrap(files).unwrap().into_inner(),
@ -81,46 +101,3 @@ impl<'a> Enumerator<'a> {
)) ))
} }
} }
struct EnumeratorTask<'a> {
path: String,
depth: u16,
tx: UnboundedSender<String>,
files: Arc<Mutex<Vec<Response>>>,
folders: Arc<Mutex<Vec<Response>>>,
properties: Vec<Props>,
service: &'a Service,
tasks_active: Arc<Mutex<u16>>,
}
async fn enumerator_task<'a, 'b>(data: EnumeratorTask<'a, 'b>) -> task::JoinHandle<()> {
let current_depth = path::get_depth(&data.path);
*data.tasks_active.lock().await += 1;
tokio::task::spawn(async move {
let res = ReqProps::new(data.service)
.set_path(data.path.clone())
// .set_depth(self.depth)
.get_properties(data.properties)
.send()
.await
.unwrap();
dbg!(&res);
for obj in res.responses {
if obj.is_dir() {
// Avoid enumerating the same folder multiple times
if obj.abs_path() != data.path {
// depth deeper than current + self.depth
if obj.path_depth() > current_depth + data.depth {
data.tx.send(obj.abs_path().to_owned()).unwrap();
}
data.folders.lock().await.push(obj);
}
} else {
data.files.lock().await.push(obj);
}
}
*data.tasks_active.lock().await -= 1;
})
}

View File

@ -108,6 +108,11 @@ impl<'a> ReqProps<'a> {
self self
} }
pub fn set_depth(mut self, depth: u16) -> Self {
self.request.headers.insert("Depth", depth.into());
self
}
pub fn get_properties(mut self, properties: Vec<Props>) -> Self { pub fn get_properties(mut self, properties: Vec<Props>) -> Self {
self.properties.extend(properties); self.properties.extend(properties);
self self

View File

@ -66,7 +66,7 @@ pub struct Request<'a> {
client: ClientConfig, client: ClientConfig,
method: Option<Method>, method: Option<Method>,
url: Option<String>, url: Option<String>,
headers: HeaderMap, pub headers: HeaderMap,
body: Option<String>, body: Option<String>,
} }
@ -90,14 +90,16 @@ impl<'a> Request<'a> {
pub async fn send(&mut self) -> Result<reqwest::Response, reqwest::Error> { pub async fn send(&mut self) -> Result<reqwest::Response, reqwest::Error> {
self.service self.service
.authenticate(self.client.build().request( .authenticate(
self.method.clone().expect("Method must be set"), self.client
{ .build()
.request(self.method.clone().expect("Method must be set"), {
let mut url = self.service.url_base.clone(); let mut url = self.service.url_base.clone();
url.push_str(&self.url.clone().expect("An url must be set")); url.push_str(&self.url.clone().expect("An url must be set"));
url url
}, })
)) .headers(self.headers.clone()),
)
.send() .send()
.await .await
// let mut url = self // let mut url = self