Phase 1: Forward Assist initial build
Multi-tenant AI help desk SaaS for the firearms industry. Full monorepo: API (Express/Prisma), Worker (BullMQ), Frontend (React/Vite/Tailwind). PostgreSQL 16 + pgvector, Redis 7, JWT auth, RLS tenant isolation. Dark Armory theme with tactical branding throughout. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
{
|
||||
"name": "@forward-assist/worker",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
"dev": "tsx watch src/index.ts",
|
||||
"build": "tsc",
|
||||
"start": "node dist/index.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@forward-assist/shared": "*",
|
||||
"@prisma/client": "^6.5.0",
|
||||
"bullmq": "^5.0.0",
|
||||
"dotenv": "^16.4.0",
|
||||
"ioredis": "^5.4.0",
|
||||
"imapflow": "^1.0.160",
|
||||
"mailparser": "^3.7.0",
|
||||
"nodemailer": "^6.9.0",
|
||||
"uuid": "^10.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/nodemailer": "^6.4.16",
|
||||
"@types/node": "^22.0.0",
|
||||
"@types/uuid": "^10.0.0",
|
||||
"tsx": "^4.19.0",
|
||||
"typescript": "^5.4.0"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
import dotenv from "dotenv";
|
||||
dotenv.config({ path: "../../.env" });
|
||||
|
||||
import { Worker, Queue, QueueScheduler } from "bullmq";
|
||||
import IORedis from "ioredis";
|
||||
import { imapPollProcessor } from "./jobs/imapPoll";
|
||||
import { smtpSendProcessor } from "./jobs/smtpSend";
|
||||
import { schedulerService } from "./services/scheduler";
|
||||
|
||||
const REDIS_URL = process.env.REDIS_URL || "redis://localhost:6379";
|
||||
|
||||
const connection = new IORedis(REDIS_URL, {
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
|
||||
// Queues
|
||||
export const imapQueue = new Queue("imap-poll", { connection });
|
||||
export const smtpQueue = new Queue("smtp-send", { connection });
|
||||
|
||||
// Workers
|
||||
const imapWorker = new Worker("imap-poll", imapPollProcessor, {
|
||||
connection,
|
||||
concurrency: 5,
|
||||
limiter: {
|
||||
max: 10,
|
||||
duration: 60000,
|
||||
},
|
||||
});
|
||||
|
||||
const smtpWorker = new Worker("smtp-send", smtpSendProcessor, {
|
||||
connection,
|
||||
concurrency: 10,
|
||||
});
|
||||
|
||||
// Event handlers
|
||||
imapWorker.on("completed", (job) => {
|
||||
console.log(`[IMAP Worker] Job ${job.id} completed`);
|
||||
});
|
||||
|
||||
imapWorker.on("failed", (job, err) => {
|
||||
console.error(`[IMAP Worker] Job ${job?.id} failed:`, err.message);
|
||||
});
|
||||
|
||||
smtpWorker.on("completed", (job) => {
|
||||
console.log(`[SMTP Worker] Job ${job.id} completed`);
|
||||
});
|
||||
|
||||
smtpWorker.on("failed", (job, err) => {
|
||||
console.error(`[SMTP Worker] Job ${job?.id} failed:`, err.message);
|
||||
});
|
||||
|
||||
// Start scheduler
|
||||
schedulerService.start(imapQueue).then(() => {
|
||||
console.log("[Forward Assist Worker] All workers and schedulers running");
|
||||
});
|
||||
|
||||
// Graceful shutdown
|
||||
async function shutdown() {
|
||||
console.log("[Worker] Shutting down...");
|
||||
await imapWorker.close();
|
||||
await smtpWorker.close();
|
||||
await schedulerService.stop();
|
||||
await connection.quit();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.on("SIGTERM", shutdown);
|
||||
process.on("SIGINT", shutdown);
|
||||
|
||||
console.log("[Forward Assist Worker] Starting workers...");
|
||||
@@ -0,0 +1,258 @@
|
||||
import { Job } from "bullmq";
|
||||
import { ImapFlow } from "imapflow";
|
||||
import { simpleParser, ParsedMail } from "mailparser";
|
||||
import { prisma } from "../services/prisma";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
||||
interface ImapPollData {
|
||||
emailAccountId: string;
|
||||
tenantId: string;
|
||||
}
|
||||
|
||||
export async function imapPollProcessor(job: Job<ImapPollData>): Promise<void> {
|
||||
const { emailAccountId, tenantId } = job.data;
|
||||
|
||||
const account = await prisma.emailAccount.findUnique({
|
||||
where: { id: emailAccountId },
|
||||
});
|
||||
|
||||
if (!account || !account.isActive) {
|
||||
console.log(`[IMAP] Account ${emailAccountId} not found or inactive, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[IMAP] Polling ${account.emailAddress} for tenant ${tenantId}`);
|
||||
|
||||
const client = new ImapFlow({
|
||||
host: account.imapHost,
|
||||
port: account.imapPort,
|
||||
secure: account.imapTls,
|
||||
auth: {
|
||||
user: account.imapUser,
|
||||
pass: account.imapPassword,
|
||||
},
|
||||
logger: false,
|
||||
});
|
||||
|
||||
try {
|
||||
await client.connect();
|
||||
const lock = await client.getMailboxLock("INBOX");
|
||||
|
||||
try {
|
||||
// Fetch unseen messages
|
||||
const messages: ParsedMail[] = [];
|
||||
for await (const msg of client.fetch(
|
||||
{ seen: false },
|
||||
{ source: true, envelope: true, flags: true }
|
||||
)) {
|
||||
if (msg.source) {
|
||||
const parsed = await simpleParser(msg.source);
|
||||
messages.push(parsed);
|
||||
|
||||
// Mark as seen
|
||||
await client.messageFlagsAdd(msg.seq, ["\\Seen"]);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[IMAP] Found ${messages.length} new messages for ${account.emailAddress}`);
|
||||
|
||||
// Process each message
|
||||
for (const mail of messages) {
|
||||
await processIncomingEmail(mail, account, tenantId);
|
||||
}
|
||||
|
||||
// Update last poll time
|
||||
await prisma.emailAccount.update({
|
||||
where: { id: emailAccountId },
|
||||
data: { lastPollAt: new Date(), lastError: null },
|
||||
});
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
|
||||
await client.logout();
|
||||
} catch (error: any) {
|
||||
console.error(`[IMAP] Error polling ${account.emailAddress}:`, error.message);
|
||||
|
||||
// Store the error on the account
|
||||
await prisma.emailAccount.update({
|
||||
where: { id: emailAccountId },
|
||||
data: {
|
||||
lastPollAt: new Date(),
|
||||
lastError: error.message,
|
||||
},
|
||||
});
|
||||
|
||||
throw error; // Let BullMQ handle retries
|
||||
}
|
||||
}
|
||||
|
||||
async function processIncomingEmail(
|
||||
mail: ParsedMail,
|
||||
account: any,
|
||||
tenantId: string
|
||||
): Promise<void> {
|
||||
const fromEmail = mail.from?.value?.[0]?.address || "unknown@unknown.com";
|
||||
const fromName = mail.from?.value?.[0]?.name || undefined;
|
||||
const subject = mail.subject || "(No Subject)";
|
||||
const bodyText = mail.text || "";
|
||||
const bodyHtml = mail.html || undefined;
|
||||
const messageId = mail.messageId || undefined;
|
||||
const inReplyTo = mail.inReplyTo || undefined;
|
||||
const references = Array.isArray(mail.references)
|
||||
? mail.references.join(" ")
|
||||
: mail.references || undefined;
|
||||
|
||||
// Thread detection: check if this is a reply to an existing ticket
|
||||
let existingTicket = null;
|
||||
|
||||
// Strategy 1: Match by In-Reply-To or References headers
|
||||
if (inReplyTo || references) {
|
||||
const refIds = [inReplyTo, ...(references?.split(/\s+/) || [])].filter(Boolean);
|
||||
|
||||
if (refIds.length > 0) {
|
||||
const existingMessage = await prisma.message.findFirst({
|
||||
where: {
|
||||
tenantId,
|
||||
messageId: { in: refIds as string[] },
|
||||
},
|
||||
include: { ticket: true },
|
||||
});
|
||||
|
||||
if (existingMessage) {
|
||||
existingTicket = existingMessage.ticket;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Strategy 2: Match by subject line (strip Re:/Fwd: prefixes)
|
||||
if (!existingTicket) {
|
||||
const cleanSubject = subject.replace(/^(Re|Fwd|Fw):\s*/gi, "").trim();
|
||||
existingTicket = await prisma.ticket.findFirst({
|
||||
where: {
|
||||
tenantId,
|
||||
customerEmail: fromEmail,
|
||||
subject: { contains: cleanSubject, mode: "insensitive" },
|
||||
status: { notIn: ["archived", "resolved"] },
|
||||
},
|
||||
orderBy: { lastMessageAt: "desc" },
|
||||
});
|
||||
}
|
||||
|
||||
if (existingTicket) {
|
||||
// Add message to existing ticket
|
||||
await prisma.message.create({
|
||||
data: {
|
||||
tenantId,
|
||||
ticketId: existingTicket.id,
|
||||
direction: "inbound",
|
||||
fromEmail,
|
||||
fromName,
|
||||
toEmail: account.emailAddress,
|
||||
subject,
|
||||
bodyText,
|
||||
bodyHtml: bodyHtml || null,
|
||||
messageId,
|
||||
inReplyTo,
|
||||
references,
|
||||
sentAt: mail.date || new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
// Update ticket
|
||||
await prisma.ticket.update({
|
||||
where: { id: existingTicket.id },
|
||||
data: {
|
||||
status: "incoming",
|
||||
messageCount: { increment: 1 },
|
||||
lastMessageAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
console.log(
|
||||
`[IMAP] Added message to existing ticket ${existingTicket.ticketNumber}`
|
||||
);
|
||||
} else {
|
||||
// Create new ticket
|
||||
const ticketNumber = await generateNextTicketNumber(tenantId);
|
||||
|
||||
// Find or create customer profile
|
||||
let profile = await prisma.customerProfile.findUnique({
|
||||
where: { tenantId_email: { tenantId, email: fromEmail } },
|
||||
});
|
||||
|
||||
if (!profile) {
|
||||
profile = await prisma.customerProfile.create({
|
||||
data: {
|
||||
tenantId,
|
||||
email: fromEmail,
|
||||
name: fromName,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await prisma.ticket.create({
|
||||
data: {
|
||||
tenantId,
|
||||
ticketNumber,
|
||||
emailAccountId: account.id,
|
||||
subject,
|
||||
status: "incoming",
|
||||
priority: "medium",
|
||||
customerEmail: fromEmail,
|
||||
customerName: fromName,
|
||||
customerProfileId: profile.id,
|
||||
messageCount: 1,
|
||||
lastMessageAt: new Date(),
|
||||
messages: {
|
||||
create: {
|
||||
tenantId,
|
||||
direction: "inbound",
|
||||
fromEmail,
|
||||
fromName,
|
||||
toEmail: account.emailAddress,
|
||||
subject,
|
||||
bodyText,
|
||||
bodyHtml: bodyHtml || null,
|
||||
messageId,
|
||||
inReplyTo,
|
||||
references,
|
||||
sentAt: mail.date || new Date(),
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Update customer profile
|
||||
await prisma.customerProfile.update({
|
||||
where: { id: profile.id },
|
||||
data: {
|
||||
ticketCount: { increment: 1 },
|
||||
lastContactAt: new Date(),
|
||||
name: profile.name || fromName,
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[IMAP] Created new ticket ${ticketNumber} from ${fromEmail}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function generateNextTicketNumber(tenantId: string): Promise<string> {
|
||||
const tenant = await prisma.tenant.findUnique({ where: { id: tenantId } });
|
||||
const settings = (tenant?.settings as any) || {};
|
||||
const prefix = settings.ticketPrefix || "FA";
|
||||
|
||||
const lastTicket = await prisma.ticket.findFirst({
|
||||
where: { tenantId },
|
||||
orderBy: { createdAt: "desc" },
|
||||
select: { ticketNumber: true },
|
||||
});
|
||||
|
||||
let nextNum = 1;
|
||||
if (lastTicket) {
|
||||
const match = lastTicket.ticketNumber.match(/(\d+)$/);
|
||||
if (match) nextNum = parseInt(match[1], 10) + 1;
|
||||
}
|
||||
|
||||
return `${prefix}-${String(nextNum).padStart(4, "0")}`;
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
import { Job } from "bullmq";
|
||||
import nodemailer from "nodemailer";
|
||||
import { prisma } from "../services/prisma";
|
||||
|
||||
interface SmtpSendData {
|
||||
messageId: string;
|
||||
tenantId: string;
|
||||
}
|
||||
|
||||
export async function smtpSendProcessor(job: Job<SmtpSendData>): Promise<void> {
|
||||
const { messageId, tenantId } = job.data;
|
||||
|
||||
const message = await prisma.message.findUnique({
|
||||
where: { id: messageId },
|
||||
include: {
|
||||
ticket: {
|
||||
include: {
|
||||
emailAccount: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (!message) {
|
||||
console.error(`[SMTP] Message ${messageId} not found`);
|
||||
return;
|
||||
}
|
||||
|
||||
const account = message.ticket.emailAccount;
|
||||
if (!account) {
|
||||
console.error(`[SMTP] No email account for ticket ${message.ticket.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[SMTP] Sending reply for ticket ${message.ticket.ticketNumber} via ${account.emailAddress}`);
|
||||
|
||||
const transporter = nodemailer.createTransport({
|
||||
host: account.smtpHost,
|
||||
port: account.smtpPort,
|
||||
secure: account.smtpTls && account.smtpPort === 465,
|
||||
auth: {
|
||||
user: account.smtpUser,
|
||||
pass: account.smtpPassword,
|
||||
},
|
||||
tls: {
|
||||
rejectUnauthorized: false,
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
// Find the original inbound message to get its Message-ID for threading
|
||||
const originalMessage = await prisma.message.findFirst({
|
||||
where: {
|
||||
ticketId: message.ticketId,
|
||||
direction: "inbound",
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
});
|
||||
|
||||
const mailOptions: nodemailer.SendMailOptions = {
|
||||
from: {
|
||||
name: message.fromName || account.name,
|
||||
address: account.emailAddress,
|
||||
},
|
||||
to: message.toEmail,
|
||||
subject: message.subject,
|
||||
text: message.bodyText,
|
||||
html: message.bodyHtml || undefined,
|
||||
};
|
||||
|
||||
// Add threading headers
|
||||
if (originalMessage?.messageId) {
|
||||
mailOptions.inReplyTo = originalMessage.messageId;
|
||||
mailOptions.references = originalMessage.messageId;
|
||||
}
|
||||
|
||||
const result = await transporter.sendMail(mailOptions);
|
||||
|
||||
// Update message with the sent Message-ID
|
||||
await prisma.message.update({
|
||||
where: { id: messageId },
|
||||
data: {
|
||||
messageId: result.messageId,
|
||||
sentAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[SMTP] Sent successfully: ${result.messageId}`);
|
||||
} catch (error: any) {
|
||||
console.error(`[SMTP] Send failed for message ${messageId}:`, error.message);
|
||||
throw error; // Let BullMQ retry
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
|
||||
export const prisma = new PrismaClient({
|
||||
log: process.env.NODE_ENV === "development" ? ["warn", "error"] : ["error"],
|
||||
});
|
||||
@@ -0,0 +1,83 @@
|
||||
import { Queue } from "bullmq";
|
||||
import { prisma } from "./prisma";
|
||||
|
||||
class SchedulerService {
|
||||
private intervalHandle: NodeJS.Timeout | null = null;
|
||||
|
||||
async start(imapQueue: Queue): Promise<void> {
|
||||
console.log("[Scheduler] Starting IMAP poll scheduler");
|
||||
|
||||
// Schedule initial jobs
|
||||
await this.schedulePolls(imapQueue);
|
||||
|
||||
// Re-check every 60 seconds for new/changed email accounts
|
||||
this.intervalHandle = setInterval(async () => {
|
||||
try {
|
||||
await this.schedulePolls(imapQueue);
|
||||
} catch (error) {
|
||||
console.error("[Scheduler] Error scheduling polls:", error);
|
||||
}
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (this.intervalHandle) {
|
||||
clearInterval(this.intervalHandle);
|
||||
this.intervalHandle = null;
|
||||
}
|
||||
console.log("[Scheduler] Stopped");
|
||||
}
|
||||
|
||||
private async schedulePolls(imapQueue: Queue): Promise<void> {
|
||||
// Get all active email accounts
|
||||
const accounts = await prisma.emailAccount.findMany({
|
||||
where: { isActive: true },
|
||||
select: {
|
||||
id: true,
|
||||
tenantId: true,
|
||||
emailAddress: true,
|
||||
pollIntervalSeconds: true,
|
||||
lastPollAt: true,
|
||||
},
|
||||
});
|
||||
|
||||
const now = new Date();
|
||||
|
||||
for (const account of accounts) {
|
||||
// Check if it is time to poll
|
||||
const lastPoll = account.lastPollAt ? new Date(account.lastPollAt) : new Date(0);
|
||||
const elapsed = (now.getTime() - lastPoll.getTime()) / 1000;
|
||||
|
||||
if (elapsed >= account.pollIntervalSeconds) {
|
||||
// Add job with deduplication key to prevent duplicates
|
||||
const jobId = `imap-poll-${account.id}`;
|
||||
try {
|
||||
await imapQueue.add(
|
||||
"poll",
|
||||
{
|
||||
emailAccountId: account.id,
|
||||
tenantId: account.tenantId,
|
||||
},
|
||||
{
|
||||
jobId,
|
||||
removeOnComplete: 100,
|
||||
removeOnFail: 50,
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 5000,
|
||||
},
|
||||
}
|
||||
);
|
||||
} catch (error: any) {
|
||||
// Job with this ID may already exist, that is fine
|
||||
if (!error.message?.includes("already exists")) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const schedulerService = new SchedulerService();
|
||||
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2022",
|
||||
"module": "commonjs",
|
||||
"lib": ["ES2022"],
|
||||
"outDir": "dist",
|
||||
"rootDir": "src",
|
||||
"strict": true,
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"resolveJsonModule": true,
|
||||
"declaration": true,
|
||||
"sourceMap": true
|
||||
},
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["node_modules", "dist"]
|
||||
}
|
||||
Reference in New Issue
Block a user