title | date | draft | description | image | imageBig | categories | avatar | ||||
---|---|---|---|---|---|---|---|---|---|---|---|
NestJS Standalone Worker |
2024-08-29 20:11:50 +0300 |
false |
Create a standalone worker using NestJS to handle background tasks and offload heavy processing from the main application. 🚀 |
/images/nest/logo.svg |
/images/nest/logo.webp |
|
/images/avatar.webp |
When building an API server, it's crucial to keep the request-response cycle as short as possible to serve responses fast. However, some tasks, like sending emails, processing images, or running long queries, can be time-consuming. In these cases, it's better to handle these tasks in the background rather than keeping the client waiting.
In NestJS Common applications, this can be achieved using the BullMQ library, a Redis-based queue for Node.js.
By default, BullMQ runs the worker
(also known as the consumer
or processor
) in the same event loop as the main application, which can cause some issues. We'll explore some of these problems and discuss possible solutions.
npm i -g @nestjs/cli
nest new nest-worker
cd nest-worker
npm i --save @nestjs/typeorm typeorm pg
npm i --save @nestjs/bullmq bullmq
import { Module } from "@nestjs/common";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BullModule } from "@nestjs/bullmq";
@Module({
imports: [
TypeOrmModule.forRoot({
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
}),
BullModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
BullModule.registerQueue({
name: "test",
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
import { InjectQueue } from "@nestjs/bullmq";
import { Injectable, Logger } from "@nestjs/common";
import { Queue } from "bullmq";
@Injectable()
export class AppService {
private readonly logger = new Logger(AppService.name);
constructor(
@InjectQueue("test")
private readonly queue: Queue
) {}
getHello(): string {
return "Hello World!";
}
async addJob() {
this.logger.log("Adding job to queue");
await this.queue.add("testJob", { message: "hello from queue" });
}
}
import { Controller, Get, Post } from "@nestjs/common";
import { AppService } from "./app.service";
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
@Post("add-job")
addJob() {
return this.appService.addJob();
}
}
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { Logger } from "@nestjs/common";
@Processor("test", { concurrency: 3 })
export class TestProcessor extends WorkerHost {
private readonly logger = new Logger(TestProcessor.name);
async process(job: Job<any, any, string>) {
this.logger.log(`Processing job ${job.id}`);
this.logger.log(job.data);
}
}
providers: [AppService, TestProcessor],
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { Logger } from "@nestjs/common";
@Processor("test", { concurrency: 3 })
export class TestProcessor extends WorkerHost {
private readonly logger = new Logger(TestProcessor.name);
async process(job: Job<any, any, string>) {
this.logger.log(`Processing job ${job.id}`);
this.logger.log(job.data);
// event loop blocking code
while (true) {}
}
}
👀 As we can see, the processor will handle the long-running task, blocking the event loop, and the API server will be blocked.
Another problem with this approach is the database connection pool. If processors acquire all database connections, the API server will be blocked until a connection is released. Let's test this case.
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'postgres',
database: 'postgres',
logging: true,
extra: { max: 2 },
}),
import { InjectQueue } from "@nestjs/bullmq";
import { Injectable, Logger } from "@nestjs/common";
import { InjectDataSource } from "@nestjs/typeorm";
import { Queue } from "bullmq";
import { DataSource } from "typeorm";
@Injectable()
export class AppService {
private readonly logger = new Logger(AppService.name);
constructor(
@InjectQueue("test")
private readonly queue: Queue,
@InjectDataSource() private readonly datasource: DataSource
) {}
getHello(): string {
return "Hello World!";
}
async addJob() {
await this.datasource.query("SELECT NOW()");
this.logger.log("Adding job to queue");
await this.queue.add("testJob", { message: "hello from queue" });
}
}
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { Logger } from "@nestjs/common";
import { InjectDataSource } from "@nestjs/typeorm";
import { DataSource } from "typeorm";
@Processor("test", { concurrency: 3 })
export class TestProcessor extends WorkerHost {
private readonly logger = new Logger(TestProcessor.name);
constructor(@InjectDataSource() private readonly datasource: DataSource) {
super();
}
async process(job: Job<any, any, string>) {
this.logger.log(`Processing job ${job.id}`);
this.logger.log(job.data);
// long running query
await this.datasource.query("SELECT pg_sleep(1 * 60)");
}
}
ℹ️ We have 3 conncurrent processors, and only 2 database connections. If we add 2 jobs to the queue, the third API request will be blocked until a connection is released, 👀 As we see in the GIF below.
ℹ️ We can solve this by creating a separate TypeORM datasource for the processors. This way, a long running queries in the processors will not block the API server.
import { Module } from "@nestjs/common";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BullModule } from "@nestjs/bullmq";
import { TestProcessor } from "./test.processor";
@Module({
imports: [
TypeOrmModule.forRoot({
name: "API",
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
extra: { max: 2 },
}),
TypeOrmModule.forRoot({
name: "processor",
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
extra: { max: 2 },
}),
BullModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
BullModule.registerQueue({
name: "test",
}),
],
controllers: [AppController],
providers: [AppService, TestProcessor],
})
export class AppModule {}
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { Logger } from "@nestjs/common";
import { InjectDataSource } from "@nestjs/typeorm";
import { DataSource } from "typeorm";
@Processor("test", { concurrency: 3 })
export class TestProcessor extends WorkerHost {
private readonly logger = new Logger(TestProcessor.name);
constructor(
@InjectDataSource("processor") private readonly datasource: DataSource
) {
super();
}
async process(job: Job<any, any, string>) {
this.logger.log(`Processing job ${job.id}`);
this.logger.log(job.data);
// long running query
await this.datasource.query("SELECT pg_sleep(1 * 60)");
}
}
import { InjectQueue } from "@nestjs/bullmq";
import { Injectable, Logger } from "@nestjs/common";
import { InjectDataSource } from "@nestjs/typeorm";
import { Queue } from "bullmq";
import { DataSource } from "typeorm";
@Injectable()
export class AppService {
private readonly logger = new Logger(AppService.name);
constructor(
@InjectQueue("test")
private readonly queue: Queue,
@InjectDataSource("API") private readonly datasource: DataSource
) {}
getHello(): string {
return "Hello World!";
}
async addJob() {
await this.datasource.query("SELECT NOW()");
this.logger.log("Adding job to queue");
await this.queue.add("testJob", { message: "hello from queue" });
}
}
ℹ️ the processor can process only 2 jobs due to max database connections, but the API will no longer be blocked by the processor queries. Let's test this
👀 As we can see the API is not blocked, and it can queue the jobs even if the processors are running long queries.
ℹ️ We can solve the first problem by running the processor in a separate forked process. 👉🏻 separate-processes 👈🏻 this approach can't utilize dependency injection system and NestJS IOC by default; however. there are some workarounds to make this possible.
ℹ️ Another solution is to create a standalone worker using NestJS. This way we can run the worker in a separate event loop take advantage of the dependency injection system and NestJS IOC. It's a common NestJS app but without an API server—just a worker responsible for running BullMQ processors.
ℹ️ The worker still shares the same codebase as your Nest app, and this approach offers benefits like:
- The Worker can be scaled independently from the API server, as a standalone service or deployment in Kubernetes.
- The worker can be deployed on a separate machine, monitored, and managed separately.
- The worker can have its own database connection pool without duplicating the database connection configuration.
import { Module } from "@nestjs/common";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BullModule } from "@nestjs/bullmq";
@Module({
imports: [
TypeOrmModule.forRoot({
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
extra: { max: 2 },
}),
BullModule.registerQueue({
name: "test",
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
nest g module worker
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { TestProcessor } from "../test.processor";
import { BullModule } from "@nestjs/bullmq";
@Module({
imports: [
TypeOrmModule.forRoot({
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
extra: { max: 2 },
}),
BullModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
],
providers: [TestProcessor],
})
export class WorkerModule {}
import { NestFactory } from "@nestjs/core";
import { WorkerModule } from "./worker/worker.module";
async function bootstrap() {
await NestFactory.createApplicationContext(WorkerModule);
}
bootstrap();
ℹ️ We can use the HTTP server, which can be helpful for health checks and metrics, or we can create a standalone NestJS application as we did.
🛠️ Create a new worker-cli.json
file in main directory and add a script in the package.json
to run the worker.
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"entryFile": "worker",
"compilerOptions": {
"deleteOutDir": true
}
}
{
"scripts": {
"worker:dev": "nest start --watch --config worker-cli.json",
"worker:prod": "node dist/worker"
}
}
🛠️ Now let's do a something cool, lets create a way to autoload all processors in the app and register them.
- Load all processors from a glob pattern using the same importer used in TypeORM 😎
- Get any dependencies for each processor using the metadata system in nestjs
- Get the queue name for each processor.
- Import processors as providers, import and register queues in bullmq module and import dependencies.
nest g module queue
import { DynamicModule, Module } from "@nestjs/common";
import { BullModule, WorkerHost } from "@nestjs/bullmq";
import { importClassesFromDirectories } from "typeorm/util/DirectoryExportedClassesLoader";
@Module({})
export class QueueModule {
public static readonly consumers = [];
public static readonly dependencies = [];
public static readonly queues = [];
static async register(options: {
consumers: string[];
}): Promise<DynamicModule> {
const workerClasses = await importClassesFromDirectories(
{ log: console.log } as any,
options.consumers
);
workerClasses.forEach((workerClass) => {
if (workerClass.prototype instanceof WorkerHost) {
QueueModule.consumers.push(workerClass);
QueueModule.dependencies.push(
...(Reflect.getMetadata("dependencies", workerClass) || [])
);
QueueModule.queues.push(
Reflect.getMetadata("bullmq:processor_metadata", workerClass)
);
}
});
return {
module: QueueModule,
imports: [
BullModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
BullModule.registerQueue(...QueueModule.queues),
...QueueModule.dependencies,
],
providers: [...QueueModule.consumers],
};
}
}
🛠️ Remove any bullmq related code from the worker module, Use the queue module, and specify the processors' glob pattern.
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { TestProcessor } from "../test.processor";
import { QueueModule } from "src/queue/queue.module";
@Module({
imports: [
TypeOrmModule.forRoot({
type: "postgres",
host: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "postgres",
logging: true,
extra: { max: 2 },
}),
QueueModule.register({
consumers: ["dist/**/*.processor.js"],
}),
],
providers: [TestProcessor],
})
export class WorkerModule {}