Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch and parse full gmail message #5160

Merged
merged 11 commits into from
May 20, 2024
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
"@types/lodash.camelcase": "^4.3.7",
"@types/lodash.merge": "^4.6.7",
"@types/lodash.pick": "^4.3.7",
"@types/mailparser": "^3.4.4",
"@types/nodemailer": "^6.4.14",
"@types/passport-microsoft": "^1.0.3",
"add": "^2.0.6",
"addressparser": "^1.0.1",
"afterframe": "^1.0.2",
"apollo-server-express": "^3.12.0",
"apollo-upload-client": "^17.0.0",
Expand Down Expand Up @@ -126,7 +126,6 @@
"lodash.snakecase": "^4.1.1",
"lodash.upperfirst": "^4.3.1",
"luxon": "^3.3.0",
"mailparser": "^3.6.5",
"microdiff": "^1.3.2",
"nest-commander": "^3.12.0",
"next": "14.0.4",
Expand Down Expand Up @@ -232,6 +231,7 @@
"@swc/helpers": "~0.5.2",
"@testing-library/jest-dom": "^6.1.5",
"@testing-library/react": "14.0.0",
"@types/addressparser": "^1.0.3",
"@types/apollo-upload-client": "^17.0.2",
"@types/bcrypt": "^5.0.0",
"@types/better-sqlite3": "^7.6.8",
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 10;
export const GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 20;
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Injectable, Logger } from '@nestjs/common';

import { AxiosResponse } from 'axios';
import { simpleParser } from 'mailparser';
import planer from 'planer';
import addressparser from 'addressparser';
import { gmail_v1 } from 'googleapis';

import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/types/message-or-thread-query';
import { GmailMessageParsedResponse } from 'src/modules/messaging/types/gmail-message-parsed-response';
import { FetchByBatchesService } from 'src/modules/messaging/services/fetch-by-batch/fetch-by-batch.service';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/services/utils/format-address-object-as-participants.util';
import { assert, assertNotNull } from 'src/utils/assert';

@Injectable()
export class FetchMessagesByBatchesService {
Expand All @@ -19,9 +20,9 @@ export class FetchMessagesByBatchesService {
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
workspaceId?: string,
connectedAccountId?: string,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
workspaceId: string,
connectedAccountId: string,
): Promise<GmailMessage[]> {
let startTime = Date.now();
const batchResponses = await this.fetchByBatchesService.fetchAllByBatches(
queries,
Expand All @@ -38,8 +39,11 @@ export class FetchMessagesByBatchesService {

startTime = Date.now();

const formattedResponse =
await this.formatBatchResponsesAsGmailMessages(batchResponses);
const formattedResponse = this.formatBatchResponsesAsGmailMessages(
batchResponses,
workspaceId,
connectedAccountId,
);

endTime = Date.now();

Expand All @@ -52,109 +56,172 @@ export class FetchMessagesByBatchesService {
return formattedResponse;
}

async formatBatchResponseAsGmailMessage(
private formatBatchResponseAsGmailMessage(
responseCollection: AxiosResponse<any, any>,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const parsedResponses = this.fetchByBatchesService.parseBatch(
responseCollection,
) as GmailMessageParsedResponse[];

const errors: any = [];
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const parsedResponses =
this.fetchByBatchesService.parseBatch(responseCollection);

const sanitizeString = (str: string) => {
return str.replace(/\0/g, '');
};

const formattedResponse = Promise.all(
parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (message.error) {
errors.push(message.error);
const formattedResponse = parsedResponses.map(
(response): GmailMessage | null => {
if ('error' in response) {
if (response.error.code === 404) {
return null;
}

throw response.error;
}

const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);

if (!from) {
this.logger.log(
`From value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log(
`To, Delivered-To, Bcc or Cc value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return;
return null;
}

const { historyId, id, threadId, internalDate, raw } = message;

const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));

try {
const parsed = await simpleParser(body, {
skipHtmlToText: true,
skipImageLinks: true,
skipTextToHtml: true,
maxHtmlLengthToParse: 0,
});

const { subject, messageId, from, to, cc, bcc, text, attachments } =
parsed;

if (!from) throw new Error('From value is missing');

const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rostaklein why are we removing these participants?

...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text)
try {
textWithoutReplyQuotations = planer.extractFrom(
text,
'text/plain',
);
} catch (error) {
console.log(
'Error while trying to remove reply quotations',
error,
);
}

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId: messageId || '',
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from.value[0].address || '',
fromDisplayName: from.value[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
} catch (error) {
console.log('Error', error);

errors.push(error);
const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}
}),

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
},
);

const filteredMessages = (await formattedResponse).filter(
(message) => message,
const filteredMessages = formattedResponse.filter((message) =>
assertNotNull(message),
) as GmailMessage[];

return { messages: filteredMessages, errors };
return filteredMessages;
}

async formatBatchResponsesAsGmailMessages(
private formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[],
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const messagesAndErrors = await Promise.all(
batchResponses.map(async (response) => {
return this.formatBatchResponseAsGmailMessage(response);
}),
);
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const messageBatches = batchResponses.map((response) => {
return this.formatBatchResponseAsGmailMessage(
response,
workspaceId,
connectedAccountId,
);
});

return messageBatches.flat();
}

private parseGmailMessage(message: gmail_v1.Schema$Message) {
const subject = this.getPropertyFromHeaders(message, 'Subject');
const rawFrom = this.getPropertyFromHeaders(message, 'From');
const rawTo = this.getPropertyFromHeaders(message, 'To');
const rawDeliveredTo = this.getPropertyFromHeaders(message, 'Delivered-To');
const rawCc = this.getPropertyFromHeaders(message, 'Cc');
const rawBcc = this.getPropertyFromHeaders(message, 'Bcc');
const messageId = this.getPropertyFromHeaders(message, 'Message-ID');
const id = message.id;
const threadId = message.threadId;
const historyId = message.historyId;
const internalDate = message.internalDate;

assert(id);
assert(messageId);
assert(threadId);
assert(historyId);
assert(internalDate);

const bodyData = this.getBodyData(message);
const text = bodyData ? Buffer.from(bodyData, 'base64').toString() : '';

return {
id,
headerMessageId: messageId,
threadId,
historyId,
internalDate,
subject,
from: rawFrom ? addressparser(rawFrom) : undefined,
deliveredTo: rawDeliveredTo ? addressparser(rawDeliveredTo) : undefined,
to: rawTo ? addressparser(rawTo) : undefined,
cc: rawCc ? addressparser(rawCc) : undefined,
bcc: rawBcc ? addressparser(rawBcc) : undefined,
text,
attachments: [],
};
}

const messages = messagesAndErrors.map((item) => item.messages).flat();
private getBodyData(message: gmail_v1.Schema$Message) {
const firstPart = message.payload?.parts?.[0];

const errors = messagesAndErrors.map((item) => item.errors).flat();
if (firstPart?.mimeType === 'text/plain') {
return firstPart?.body?.data;
}

return firstPart?.parts?.find((part) => part.mimeType === 'text/plain')
?.body?.data;
}

private getPropertyFromHeaders(
message: gmail_v1.Schema$Message,
property: string,
) {
const header = message.payload?.headers?.find(
(header) => header.name?.toLowerCase() === property.toLowerCase(),
);

return { messages, errors };
return header?.value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class GmailFetchMessageContentFromCacheService {
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);

try {
const { messages: messagesToSave, errors } =
const messagesToSave =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
Expand All @@ -194,22 +194,6 @@ export class GmailFetchMessageContentFromCacheService {
return [];
}

if (errors.length) {
const errorsCanBeIgnored = errors.every(
(error) => error.code === 404,
);

if (!errorsCanBeIgnored) {
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify(
errors,
null,
2,
)}`,
);
}
}

const messageExternalIdsAndIdsMap =
await this.messageService.saveMessagesWithinTransaction(
messagesToSave,
Expand Down Expand Up @@ -292,21 +276,19 @@ export class GmailFetchMessageContentFromCacheService {
messageIdsToFetch,
);

if (error?.message?.code === 429) {
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);
await this.messageChannelRepository.updateSyncStatus(
gmailMessageChannelId,
MessageChannelSyncStatus.FAILED,
workspaceId,
);
Comment on lines +279 to +283
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now being called for every single caught error, not just 429. I think thats what we actually want, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!


await this.messageChannelRepository.updateSyncStatus(
gmailMessageChannelId,
MessageChannelSyncStatus.FAILED,
workspaceId,
);
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);

throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
);
}
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
);
}
}

Expand Down