70 lines
2.3 KiB
TypeScript

import { BaseCommand } from '@adonisjs/core/ace'
import { CommandOptions } from '@adonisjs/core/types/ace'
import { Worker } from 'bullmq'
import queueConfig from '#config/queue'
import QueueService from '#services/QueueService'
import NntpService from '#services/NntpService'
export default class FetchWorker extends BaseCommand {
public static commandName = 'worker:fetch'
public static description = 'Starts a worker to fetch headers from the NNTP server.'
public static options: CommandOptions = {
startApp: true,
}
public async run() {
this.logger.info('Starting fetch worker...')
const pool = NntpService
const headerQueue = QueueService.headerQueue
const worker = new Worker('nntp-fetch-queue', async (job) => {
const { groupName, startId, endId } = job.data
this.logger.info(`Processing fetch job for ${groupName}, articles ${startId}-${endId}`)
let conn
try {
conn = await pool.acquire()
await conn.group(groupName)
const overview: any = await conn.xover(startId, endId)
this.logger.info(`Fetched ${overview.overviews.length} headers from ${groupName}.`)
if (overview.overviews.length > 0) {
const jobs = overview.overviews.map(([id, header]: [number, any]) => {
if (!header) {
this.logger.warning(`Header is undefined for job ${job.id}`)
return null
}
return {
name: 'process-header',
data: { header, group: groupName },
opts: { jobId: `${groupName}-${id}` },
}
}).filter(Boolean)
await headerQueue.addBulk(jobs)
this.logger.info(`Added ${jobs.length} header jobs to the queue for ${groupName}.`)
}
} catch (error: any) {
this.logger.error(`Error fetching headers for ${groupName}: ${error.message}`)
throw error
} finally {
if (conn) {
pool.release(conn)
}
}
}, {
connection: queueConfig.connection,
concurrency: 5,
})
worker.on('failed', (job, err) => {
this.logger.error(`Fetch job ${job?.id} failed for group ${job?.data.groupName}: ${err.message}`)
})
this.logger.info('Fetch worker started and listening for jobs.')
await new Promise(() => {}) // Keep command running
}
}