Skip to content

Commit

Permalink
migrated to bullmq
Browse files Browse the repository at this point in the history
  • Loading branch information
n4ze3m committed Apr 17, 2024
1 parent 185703f commit 8214052
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 196 deletions.
3 changes: 2 additions & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"axios": "^1.4.0",
"bcryptjs": "^2.4.3",
"bull": "^4.10.4",
"bullmq": "^5.7.1",
"cheerio": "1.0.0-rc.12",
"cohere-ai": "^6.2.1",
"concurrently": "^7.0.0",
Expand All @@ -71,7 +72,7 @@
"grammy": "^1.16.2",
"html-to-text": "^9.0.5",
"ignore": "^5.2.4",
"ioredis": "^5.3.2",
"ioredis": "^5.4.1",
"langchain": "^0.1.25",
"mammoth": "^1.6.0",
"pdf-parse": "^1.1.1",
Expand Down
30 changes: 30 additions & 0 deletions server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import fastifySession from "@fastify/session";
import { getSessionSecret, isCookieSecure } from "./utils/session";
import swagger from "@fastify/swagger";
import swaggerUi from "@fastify/swagger-ui";
import { pathToFileURL } from "url";
import { Worker } from "bullmq";
declare module "fastify" {
interface Session {
is_bot_allowed: boolean;
Expand Down Expand Up @@ -82,5 +84,33 @@ const app: FastifyPluginAsync<AppOptions> = async (
});
};

const redis_url = process.env.DB_REDIS_URL || process.env.REDIS_URL;
if (!redis_url) {
throw new Error("Redis url is not defined");
}
const username = redis_url.split(":")[1].replace("//", "");
const password = redis_url.split(":")[2].split("@")[0];
const host = redis_url.split("@")[1].split(":")[0];
const port = parseInt(redis_url.split(":")[3]);
const path = join(__dirname, "./queue/index.js");
const workerUrl = pathToFileURL(path);
const concurrency = parseInt(process.env.DB_QUEUE_CONCURRENCY || "1");
const workerThreads = process.env.DB_QUEUE_THREADS || "false";
const worker = new Worker("vector", workerUrl, {
connection: {
host,
port,
password,
username,
},
concurrency,
useWorkerThreads: workerThreads === "true",
});

process.on("SIGINT", async () => {
await worker.close();
process.exit();
});

export default app;
export { app, options };

0 comments on commit 8214052

Please sign in to comment.