improve initial sync behavior
This commit is contained in:
parent
8f980748b5
commit
5aef5ab7b0
@ -1,4 +1,5 @@
|
|||||||
import Imap, { Box } from "imap";
|
import Imap, { Box } from "imap";
|
||||||
|
import { resolve } from "path";
|
||||||
import { getMailbox, getMailboxModseq, updateMailbox, updateMailboxModseq } from "../../db/imap/imap-db";
|
import { getMailbox, getMailboxModseq, updateMailbox, updateMailboxModseq } from "../../db/imap/imap-db";
|
||||||
import { Attrs, AttrsWithEnvelope } from "../../interfaces/mail/attrs.interface";
|
import { Attrs, AttrsWithEnvelope } from "../../interfaces/mail/attrs.interface";
|
||||||
import logger from "../../system/Logger";
|
import logger from "../../system/Logger";
|
||||||
@ -44,7 +45,7 @@ export default class Mailbox {
|
|||||||
this.imap.on("mail", (numNewMsgs: number) => {
|
this.imap.on("mail", (numNewMsgs: number) => {
|
||||||
if (!this.syncing) {
|
if (!this.syncing) {
|
||||||
// if not syncing restart a sync
|
// if not syncing restart a sync
|
||||||
this.syncMail(this.box.uidnext, this.box.uidnext + numNewMsgs);
|
this.syncManager(this.box.uidnext - 1, this.box.uidnext + numNewMsgs - 1);
|
||||||
} else {
|
} else {
|
||||||
// else save number of message to sync latter
|
// else save number of message to sync latter
|
||||||
this.msgToSync += numNewMsgs;
|
this.msgToSync += numNewMsgs;
|
||||||
@ -57,6 +58,10 @@ export default class Mailbox {
|
|||||||
const updateMsg = new updateMessage(info.uid, info.flags);
|
const updateMsg = new updateMessage(info.uid, info.flags);
|
||||||
updateMsg.updateFlags();
|
updateMsg.updateFlags();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.imap.on("expunge", (seqno: number) => {
|
||||||
|
console.log("Message with sequence number " + seqno + " has been deleted from the server.");
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,7 +74,7 @@ export default class Mailbox {
|
|||||||
async initSync(box: Box) {
|
async initSync(box: Box) {
|
||||||
// sync mail only if has new messages
|
// sync mail only if has new messages
|
||||||
if (this.box.uidnext < box.uidnext) {
|
if (this.box.uidnext < box.uidnext) {
|
||||||
this.syncMail(this.box.uidnext, box.uidnext);
|
this.syncManager(this.box.uidnext, box.uidnext);
|
||||||
} else {
|
} else {
|
||||||
logger.log("Mail already up to date");
|
logger.log("Mail already up to date");
|
||||||
}
|
}
|
||||||
@ -91,53 +96,44 @@ export default class Mailbox {
|
|||||||
logger.log("Done fetching new flags");
|
logger.log("Done fetching new flags");
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
logger.log("Flags already up to date")
|
logger.log("Flags already up to date");
|
||||||
}
|
}
|
||||||
this.updateModseq(parseInt(box.highestmodseq));
|
this.updateModseq(parseInt(box.highestmodseq));
|
||||||
}
|
}
|
||||||
|
|
||||||
async syncMail(savedUid: number, currentUid: number) {
|
syncManager = async (savedUid: number, currentUid: number) => {
|
||||||
this.syncing = true;
|
this.syncing = true;
|
||||||
const promises: Promise<unknown>[] = [];
|
logger.log(`Fetching from ${savedUid} to ${currentUid} uid`);
|
||||||
const mails: Attrs[] = [];
|
const nbMessageToSync = currentUid - savedUid;
|
||||||
logger.log(`Syncing from ${savedUid} to ${currentUid} uid`);
|
let STEP = nbMessageToSync > 300 ? Math.floor(nbMessageToSync / 7) : nbMessageToSync;
|
||||||
const f = this.imap.seq.fetch(`${savedUid}:${currentUid}`, {
|
let mails: AttrsWithEnvelope[] = [];
|
||||||
size: true,
|
|
||||||
envelope: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
f.on("message", (msg, seqno) => {
|
for (let i = 0; i < nbMessageToSync; i += STEP) {
|
||||||
msg.once("attributes", (attrs: AttrsWithEnvelope) => {
|
mails = [];
|
||||||
mails.push(attrs);
|
try {
|
||||||
promises.push(saveMessage(attrs, this.id, this.imap));
|
// fetch mails
|
||||||
});
|
let secondUid = savedUid + STEP < currentUid ? savedUid + STEP : currentUid;
|
||||||
});
|
await this.mailFetcher(savedUid, secondUid, mails)
|
||||||
|
logger.log(`Fetched ${STEP} uids (${mails.length} messages)`);
|
||||||
f.once("error", (err) => {
|
// save same in the database
|
||||||
logger.err("Fetch error: " + err);
|
for (let k = 0; k < mails.length; k++) {
|
||||||
});
|
try {
|
||||||
|
const messageId = await saveMessage(mails[k], this.id, this.imap);
|
||||||
f.once("end", async () => {
|
const register = new RegisterMessageInApp(messageId, mails[k], this.id);
|
||||||
let step = 20;
|
|
||||||
for (let i = 0; i < promises.length; i += step) {
|
|
||||||
for (let j = i; j < (i + step && promises.length); j++) {
|
|
||||||
await new Promise((resolve, reject) => {
|
|
||||||
promises[j]
|
|
||||||
.then(async (res: number) => {
|
|
||||||
const register = new RegisterMessageInApp(res, mails[j], this.id);
|
|
||||||
await register.save();
|
await register.save();
|
||||||
resolve("");
|
} catch (error) {
|
||||||
})
|
logger.err("Failed to save a message: " + error);
|
||||||
.catch((err) => {
|
|
||||||
reject(err);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
logger.log(`Saved messages ${i + step > promises.length ? promises.length : i + step}/${mails.length}`);
|
|
||||||
updateMailbox(this.id, mails[i].uid);
|
|
||||||
}
|
}
|
||||||
updateMailbox(this.id, currentUid);
|
savedUid = secondUid;
|
||||||
this.syncing = false;
|
this.box.uidnext += savedUid;
|
||||||
|
|
||||||
|
updateMailbox(this.id, savedUid);
|
||||||
|
} catch (error) {
|
||||||
|
logger.err("Failed to sync message " + error);
|
||||||
|
}
|
||||||
|
logger.log(`Saved messages ${i + STEP > nbMessageToSync ? nbMessageToSync : i + STEP}/${nbMessageToSync}`);
|
||||||
|
}
|
||||||
|
|
||||||
// if has receive new msg during last sync then start a new sync
|
// if has receive new msg during last sync then start a new sync
|
||||||
if (this.msgToSync > 0) {
|
if (this.msgToSync > 0) {
|
||||||
@ -145,8 +141,33 @@ export default class Mailbox {
|
|||||||
this.box.uidnext += this.msgToSync;
|
this.box.uidnext += this.msgToSync;
|
||||||
// reset value to allow to detect new incoming message while syncing
|
// reset value to allow to detect new incoming message while syncing
|
||||||
this.msgToSync = 0;
|
this.msgToSync = 0;
|
||||||
this.syncMail(currentUid, this.box.uidnext);
|
await this.syncManager(currentUid, this.box.uidnext);
|
||||||
}
|
}
|
||||||
|
this.syncing = false;
|
||||||
|
logger.log(`Finished syncing messages`);
|
||||||
|
};
|
||||||
|
|
||||||
|
async mailFetcher(startUid: number, endUid: number, mails: Attrs[]): Promise<any> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const f = this.imap.seq.fetch(`${startUid}:${endUid}`, {
|
||||||
|
size: true,
|
||||||
|
envelope: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
f.on("message", (msg, seqno) => {
|
||||||
|
msg.once("attributes", (attrs: AttrsWithEnvelope) => {
|
||||||
|
mails.push(attrs);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
f.once("error", (err) => {
|
||||||
|
logger.err("Fetch error: " + err);
|
||||||
|
reject(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
f.once("end", async () => {
|
||||||
|
resolve(0);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user