update architecture and types

This commit is contained in:
grimhilt 2023-04-07 17:43:08 +02:00
parent d641b01758
commit 46ed3a1f41
6 changed files with 366 additions and 16 deletions

View File

@ -1,8 +1,8 @@
import { RoomType } from "../mails/saveMessage"; import { RoomType } from "../../mails/message/saveMessage";
import { hasSameElements } from "../utils/array"; import { hasSameElements } from "../../utils/array";
import { transformEmojis } from "../utils/string"; import { transformEmojis } from "../../utils/string";
import { execQueryAsync, execQueryAsyncWithId, execQuery } from "./db"; import { execQueryAsync, execQueryAsyncWithId, execQuery } from "../db";
import { queryFromId, queryToId, queryCcId } from "./utils/addressQueries"; import { queryFromId, queryToId, queryCcId } from "../utils/addressQueries";
export async function getAllMembers(messageId: number) { export async function getAllMembers(messageId: number) {
const query = ` const query = `

View File

@ -1,17 +1,24 @@
import { transformEmojis } from "../../utils/string"; import { transformEmojis } from "../../utils/string";
import { execQuery, execQueryAsync, execQueryAsyncWithId } from "../db"; import { execQuery, execQueryAsync, execQueryAsyncWithId } from "../db";
export async function registerMessage(timestamp, rfc822size, messageId) { export async function registerMessage(timestamp: string, rfc822size: number, messageID: string) {
const query = ` const query = `
INSERT INTO message INSERT INTO message
(idate, messageID, rfc822size) VALUES (?, ?, ?) (idate, messageID, rfc822size) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE message_id = LAST_INSERT_ID(message_id) ON DUPLICATE KEY UPDATE message_id = LAST_INSERT_ID(message_id)
`; `;
const values = [timestamp, messageId, rfc822size]; const values = [timestamp, messageID, rfc822size];
return await execQueryAsyncWithId(query, values); return await execQueryAsyncWithId(query, values);
} }
export function registerMailbox_message(mailboxId, uid, messageId, modseq, seen, deleted) { export function registerMailbox_message(
mailboxId: number,
uid: number,
messageId: number,
modseq: number,
seen: boolean,
deleted: boolean,
) {
const query = ` const query = `
INSERT IGNORE INTO mailbox_message INSERT IGNORE INTO mailbox_message
(mailbox_id, uid, message_id, modseq, seen, deleted) VALUES (?, ?, ?, ?, ?, ?) (mailbox_id, uid, message_id, modseq, seen, deleted) VALUES (?, ?, ?, ?, ?, ?)
@ -20,7 +27,7 @@ export function registerMailbox_message(mailboxId, uid, messageId, modseq, seen,
execQuery(query, values); execQuery(query, values);
} }
export function registerBodypart(messageId, part, bodypartId, bytes, nbLines) { export function registerBodypart(messageId: number, part: string, bodypartId: number, bytes: number, nbLines: null) {
const query = ` const query = `
INSERT IGNORE INTO part_number INSERT IGNORE INTO part_number
(message_id, part, bodypart_id, bytes, nb_lines) VALUES (?, ?, ?, ?, ?) (message_id, part, bodypart_id, bytes, nb_lines) VALUES (?, ?, ?, ?, ?)
@ -36,7 +43,13 @@ export async function saveBodypart(bytes, hash, text, data) {
return await execQueryAsyncWithId(query, values); return await execQueryAsyncWithId(query, values);
} }
export async function saveHeader_fields(messageId, fieldId, bodypartId, part, value) { export async function saveHeader_fields(
messageId: number,
fieldId: number,
bodypartId: number,
part: string,
value: string,
) {
value = transformEmojis(value); value = transformEmojis(value);
const query = ` const query = `
INSERT IGNORE INTO header_field INSERT IGNORE INTO header_field
@ -46,7 +59,7 @@ export async function saveHeader_fields(messageId, fieldId, bodypartId, part, va
return await execQueryAsync(query, values); return await execQueryAsync(query, values);
} }
export async function saveAddress_fields(messageId, fieldId, addressId, number) { export async function saveAddress_fields(messageId: number, fieldId: number, addressId: number, number: number) {
const query = ` const query = `
INSERT IGNORE INTO address_field INSERT IGNORE INTO address_field
(message_id , field_id, address_id, number) VALUES (?, ?, ?, ?) (message_id , field_id, address_id, number) VALUES (?, ?, ?, ?)
@ -55,7 +68,7 @@ export async function saveAddress_fields(messageId, fieldId, addressId, number)
return await execQueryAsync(query, values); return await execQueryAsync(query, values);
} }
export function saveSource(messageId, content) { export function saveSource(messageId: number, content: string) {
content = transformEmojis(content); content = transformEmojis(content);
const query = ` const query = `
INSERT INTO source (message_id, content) VALUES (?, ?) INSERT INTO source (message_id, content) VALUES (?, ?)
@ -63,4 +76,4 @@ export function saveSource(messageId, content) {
`; `;
const values = [messageId, content, content]; const values = [messageId, content, content];
execQuery(query, values); execQuery(query, values);
} }

View File

@ -0,0 +1,208 @@
import {
createRoom,
registerMessageInRoom,
getRoomType,
findRoomsFromMessage,
hasSameMembersAsParent,
registerThread,
registerMember,
getAllMembers,
getThreadInfo,
incrementNotSeenRoom,
getThreadInfoOnId,
} from "../../db/message/saveMessage-db";
import { findRoomByOwner, getAddresseId, getUserIdOfMailbox } from "../../db/utils/mail";
import { nbMembers } from "../utils/envelopeUtils";
import logger from "../../system/Logger";
import { Attrs, Envelope, User } from "../../interfaces/mail/attrs.interface";
/**
* take object address and join mailbox and host to return mailbox@host
*/
function createAddress(elt: User): string {
return `${elt.mailbox}@${elt.host}`;
}
export enum RoomType {
ROOM = 0,
CHANNEL = 1,
GROUP = 2,
DM = 3,
THREAD = 4,
}
export default class RegisterMessageInApp {
messageId: number;
attrs: Attrs;
envelope: Envelope;
messageID?: string;
boxId: number;
isSeen: boolean;
ownerId: number;
userId: number;
inReplyTo: string;
constructor(_messageId: number, _attrs: Attrs, _boxId: number) {
this.messageId = _messageId;
this.attrs = _attrs;
if (!this.attrs.envelope) throw new Error("Envelope must exist in attributes");
this.envelope = this.attrs.envelope;
this.messageID = this.envelope?.messageId;
this.boxId = _boxId;
this.isSeen = this.attrs.flags.includes("\\Seen");
this.ownerId = -1;
this.userId = -1;
this.inReplyTo = "";
}
async init() {
if (this.envelope.from) {
this.ownerId = await getAddresseId(createAddress(this.envelope.from[0])); // todo use sender or from ?
} else {
throw new Error("Envelope must have a 'from' field");
}
}
isDm = () => nbMembers(this.envelope) == 2;
async isFromUs() {
if (this.userId == -1) {
await getUserIdOfMailbox(this.boxId).then((res) => {
this.userId = res[0]?.user_id;
});
}
return this.ownerId == this.userId;
}
async incrementNotSeen(roomId: number) {
// todo it appears there is an error with notifications
if (!this.isSeen) {
await incrementNotSeenRoom(roomId);
}
}
async registerMembers(roomId: number) {
getAllMembers(this.messageId).then((res) => {
if (res.lenght == 0) return;
const data = res[0].id.split(",");
data.forEach(async (memberId: number) => {
await registerMember(roomId, memberId);
});
});
}
async initiateRoom(owner: number, roomType: RoomType) {
try {
const roomId = await createRoom(this.envelope.subject, owner, this.messageId, roomType);
await registerMessageInRoom(this.messageId, roomId, this.envelope.date);
await this.incrementNotSeen(roomId);
await this.registerMembers(roomId);
return roomId;
} catch (err) {
logger.err(err);
}
}
async createOrRegisterOnExistence(owner: number, roomType: RoomType) {
await findRoomByOwner(owner).then(async (res) => {
if (res.length == 0) {
// first message with this sender
await this.initiateRoom(owner, roomType);
} else {
// not a reply, add to the list of message if this sender
await registerMessageInRoom(this.messageId, res[0].room_id, this.envelope.date);
await this.incrementNotSeen(res[0].room_id);
}
});
}
async initiateThread() {
await createRoom(this.envelope.subject, this.ownerId, this.messageId, RoomType.THREAD).then(
async (threadId: number) => {
// find parent room infos
let roomId: number;
let root_id: number;
await getThreadInfo(this.inReplyTo).then(async (room) => {
// todo room not lenght, reply to transfer ?
roomId = room[0].room_id;
root_id = room[0].root_id;
if (root_id === undefined) root_id = roomId;
await registerThread(threadId, roomId, root_id);
});
// impl register previous message or go back
await registerMessageInRoom(this.messageId, threadId, this.envelope.date);
await this.incrementNotSeen(root_id);
await this.incrementNotSeen(threadId);
await this.registerMembers(threadId);
},
);
}
async createOrRegisterOnMembers(roomId: number, isThread?: boolean) {
const hasSameMembers = await hasSameMembersAsParent(this.messageId, this.inReplyTo);
if (hasSameMembers) {
await registerMessageInRoom(this.messageId, roomId, this.envelope.date);
await this.incrementNotSeen(roomId);
if (isThread) {
await getThreadInfoOnId(roomId).then(async (res) => {
let root_id = res[0].root_id;
if (root_id == undefined) root_id = res[0].room_id;
await this.incrementNotSeen(res[0].root_id);
});
}
} else {
await this.initiateThread();
}
}
async save() {
await this.init();
if (this.envelope.inReplyTo) {
this.inReplyTo = this.envelope.inReplyTo;
await this.saveReply();
} else {
if (await this.isFromUs()) {
if (this.isDm()) {
// create or add new message to DM
if (!this.envelope.to) throw new Error("Who send a DM and put the recipient in cc ?");
const userTo = await getAddresseId(createAddress(this.envelope.to[0]));
await this.createOrRegisterOnExistence(userTo, RoomType.DM);
} else {
// it is not a reply and not a dm
// so it is a channel, which can be possibly a group
await this.initiateRoom(this.ownerId, RoomType.ROOM);
}
} else {
await this.createOrRegisterOnExistence(this.ownerId, RoomType.ROOM);
}
}
}
async saveReply() {
await findRoomsFromMessage(this.inReplyTo).then(async (rooms) => {
if (rooms.length < 1) {
// no rooms, so is a transfer
// todo test if members of transferred message are included
} else if (rooms.length === 1) {
// only one room so message is only in a room and not in a thread
// as a thread is associated to a room
const roomType = (await getRoomType(rooms[0].room_id))[0].room_type;
if (roomType == RoomType.GROUP || roomType == RoomType.THREAD) {
await this.createOrRegisterOnMembers(rooms[0].room_id, roomType == RoomType.THREAD);
} else {
// reply from CHANNEL or DM or ROOM
await this.initiateThread();
// todo
// if (sender == owner) { // correction from the original sender
// // leave in the same channel
// }
}
} else if (rooms.length > 1) {
// get the lowest thread (order by room_id)
const roomId = rooms[rooms.length - 1].room_id;
await this.createOrRegisterOnMembers(roomId);
}
});
}
}

View File

@ -0,0 +1,129 @@
import { getAddresseId } from "../../db/utils/mail";
import {simpleParser} from "mailparser";
import moment from "moment";
import Imap from "imap";
import {
registerMessage,
registerMailbox_message,
saveHeader_fields,
saveAddress_fields,
registerBodypart,
saveBodypart,
saveSource,
} from "../../db/message/storeMessage-db";
import { getFieldId } from "../../db/utils/mail";
import logger from "../../system/Logger";
import { AttrsWithEnvelope } from "../../interfaces/mail/attrs.interface";
export function saveMessage(attrs: AttrsWithEnvelope, mailboxId: number, imap: Imap): Promise<number> {
const envelope = attrs.envelope;
const ts = moment(new Date(envelope.date).getTime()).format("YYYY-MM-DD HH:mm:ss");
const rfc822size = attrs.size;
const messageID = envelope.messageId;
return new Promise((resolve, reject) => {
registerMessage(ts, rfc822size, messageID)
.then((messageId) => {
const isSeen: boolean = attrs.flags.includes("\\Seen");
const deleted: boolean = attrs.flags.includes("\\Deleted");
registerMailbox_message(mailboxId, attrs.uid, messageId, attrs?.modseq || 0, isSeen, deleted);
const f = imap.fetch(attrs.uid, { bodies: "" });
let buffer = "";
f.on("message", function (msg, seqno) {
msg.on("body", function (stream, info) {
stream.on("data", function (chunk) {
buffer += chunk.toString("utf8");
});
stream.once("end", () => {
// save raw data todo
// saveSource(messageId, buffer);
// parse data
simpleParser(buffer, async (err, parsed) => {
saveFromParsedData(parsed, messageId)
.then(() => {
resolve(messageId);
})
.catch((err) => {
reject(err);
});
});
});
});
});
f.once("error", function (err) {
logger.warn("Fetch error: " + err);
});
f.once("end", function () {
// logger.log("Done fetching data of " + messageID); // todo
});
})
.catch((err) => {
logger.warn("Unable to register message: " + err);
reject(err);
});
});
}
async function saveFromParsedData(parsed, messageId) {
const promises: Promise<any>[] = [];
Object.keys(parsed).forEach((key) => {
if (["from", "to", "cc", "bcc", "replyTo"].includes(key)) {
promises.push(
// save address field
getFieldId(key).then((fieldId) => {
parsed[key].value.forEach((addr, nb) => {
getAddresseId(addr.address, addr.name).then(async (addressId) => {
await saveAddress_fields(messageId, fieldId, addressId, nb);
});
});
}),
);
} else if (["subject", "inReplyTo", "references"].includes(key)) {
// todo : "references" (array)
if (key == "references") return;
promises.push(
getFieldId(key).then(async (fieldId) => {
await saveHeader_fields(messageId, fieldId, undefined, undefined, parsed[key]);
}),
);
} else if (["html", "text", "textAsHtml"].includes(key)) {
const hash = "0";
const size = "0";
let partType = "text/plain";
if (key == "html") {
partType = "text/html";
} else if (key == "textAsHtml") {
partType = "text/TexAsHtml";
}
saveBodypart(size, hash, parsed[key], "").then((bodypartId) => {
getFieldId(key).then((fieldId) => {
saveHeader_fields(messageId, fieldId, bodypartId, partType, undefined);
});
});
} else if (key == "attachments") {
// todo
} else if (["date", "messageId", "headers", "headerLines"].includes(key)) {
// messageId and date are already saved
// other field are not important and can be retrieved in source
return;
} else {
logger.warn("doesn't know key: " + key);
return;
}
});
return Promise.all(promises);
// todo when transfered
}
if (process.env["NODE_DEV"] == "TEST") {
module.exports = {
saveFromParsedData,
};
}

View File

@ -10,7 +10,7 @@ import { generateAttrs, generateUsers, randomInt } from "../test-utils/test-attr
import { jest, describe, it, expect } from "@jest/globals"; import { jest, describe, it, expect } from "@jest/globals";
import { mocked } from "jest-mock"; import { mocked } from "jest-mock";
import registerMessageInApp, { RoomType } from "../../mails/saveMessage"; import registerMessageInApp, { RoomType } from "../../mails/message/saveMessage";
const db = new saveMessageDatabase(generateUsers(5)); const db = new saveMessageDatabase(generateUsers(5));
const ownUser = db.users[0]; const ownUser = db.users[0];
@ -52,7 +52,7 @@ import {
getThreadInfo, getThreadInfo,
getThreadInfoOnId, getThreadInfoOnId,
incrementNotSeenRoom, incrementNotSeenRoom,
} from "../../db/saveMessage-db"; } from "../../db/message/saveMessage-db";
import { AttrsWithEnvelopeTest, createReplyWithSameMembers } from "../test-utils/test-messageUtils"; import { AttrsWithEnvelopeTest, createReplyWithSameMembers } from "../test-utils/test-messageUtils";
// todo esbuild // todo esbuild
// new message from us // new message from us

View File

@ -1,5 +1,5 @@
import { AttrsWithEnvelope, User } from "../../../interfaces/mail/attrs.interface"; import { AttrsWithEnvelope, User } from "../../../interfaces/mail/attrs.interface";
import { RoomType } from "../../../mails/saveMessage"; import { RoomType } from "../../../mails/message/saveMessage";
import { getMembers } from "../../../mails/utils/envelopeUtils"; import { getMembers } from "../../../mails/utils/envelopeUtils";
import { hasSameElements } from "../../../utils/array"; import { hasSameElements } from "../../../utils/array";
import { generateUsers, UserTest } from "../test-attrsUtils"; import { generateUsers, UserTest } from "../test-attrsUtils";