From fc947953e40c5890386f275ef35038c4cb39621f Mon Sep 17 00:00:00 2001 From: Daan Meijer Date: Sun, 31 May 2026 22:07:14 +0200 Subject: [PATCH] application restructuring, added crc check when decoding yenc files --- package-lock.json | 46 +++++++++- package.json | 9 +- src/Application.js | 57 ++++++++++++ src/{ => bin}/download.js | 59 +++++------- src/{ => bin}/nzb.js | 4 +- src/bin/start.js | 6 ++ src/body.worker.js | 57 ------------ src/header.worker.js | 54 ----------- src/index.js | 53 ----------- src/lib/NntpPool.js | 70 ++++++++++++++ src/lib/YencFile.js | 89 ++++++++++++++++++ src/{ => lib}/database.js | 0 src/{ => lib}/logger.js | 1 + src/nntp.pool.js | 70 -------------- src/workers/BodyWorker.js | 60 ++++++++++++ .../CollectionWorker.js} | 45 ++++----- src/{file.worker.js => workers/FileWorker.js} | 37 ++++---- src/workers/HeaderWorker.js | 54 +++++++++++ src/yenc.util.js | 91 ------------------- 19 files changed, 456 insertions(+), 406 deletions(-) create mode 100644 src/Application.js rename src/{ => bin}/download.js (51%) rename src/{ => bin}/nzb.js (94%) create mode 100644 src/bin/start.js delete mode 100644 src/body.worker.js delete mode 100644 src/header.worker.js delete mode 100644 src/index.js create mode 100644 src/lib/NntpPool.js create mode 100644 src/lib/YencFile.js rename src/{ => lib}/database.js (100%) rename src/{ => lib}/logger.js (92%) delete mode 100644 src/nntp.pool.js create mode 100644 src/workers/BodyWorker.js rename src/{collection.worker.js => workers/CollectionWorker.js} (65%) rename src/{file.worker.js => workers/FileWorker.js} (67%) create mode 100644 src/workers/HeaderWorker.js delete mode 100644 src/yenc.util.js diff --git a/package-lock.json b/package-lock.json index 1f7d0d8..ec4225c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "ISC", "dependencies": { "bullmq": "^5.77.3", + "crc": "^4.3.2", "dotenv": "^16.3.1", "ioredis": "^5.3.2", "log4js": "^6.9.1", @@ -225,7 +226,7 @@ "readable-stream": "^3.4.0" } }, - "node_modules/buffer": { + "node_modules/bl/node_modules/buffer": { "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", @@ -249,6 +250,32 @@ "ieee754": "^1.1.13" } }, + "node_modules/buffer": { + "version": "6.0.3", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz", + "integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT", + "optional": true, + "peer": true, + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.2.1" + } + }, "node_modules/bullmq": { "version": "5.77.3", "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz", @@ -304,6 +331,23 @@ "node": ">=0.10.0" } }, + "node_modules/crc": { + "version": "4.3.2", + "resolved": "https://registry.npmjs.org/crc/-/crc-4.3.2.tgz", + "integrity": "sha512-uGDHf4KLLh2zsHa8D8hIQ1H/HtFQhyHrc0uhHBcoKGol/Xnb+MPYfUMw7cvON6ze/GUESTudKayDcJC5HnJv1A==", + "license": "MIT", + "engines": { + "node": ">=12" + }, + "peerDependencies": { + "buffer": ">=6.0.3" + }, + "peerDependenciesMeta": { + "buffer": { + "optional": true + } + } + }, "node_modules/cron-parser": { "version": "4.9.0", "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", diff --git a/package.json b/package.json index 1d92c16..d0e62a7 100644 --- a/package.json +++ b/package.json @@ -2,11 +2,13 @@ "name": "usenet-indexer", "version": "1.0.0", "description": "", - "main": "index.js", + "main": "src/bin/start.js", "type": "module", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", - "start": "node src/index.js" + "start": "node src/bin/start.js", + "download": "node src/bin/download.js", + "nzb": "node src/bin/nzb.js" }, "keywords": [], "author": "", @@ -22,6 +24,7 @@ "sqlite": "^5.1.1", "sqlite3": "^6.0.1", "xmlbuilder2": "^3.1.1", - "yencode": "^1.0.1" + "yencode": "^1.0.1", + "crc": "^4.3.2" } } diff --git a/src/Application.js b/src/Application.js new file mode 100644 index 0000000..e11f391 --- /dev/null +++ b/src/Application.js @@ -0,0 +1,57 @@ +import 'dotenv/config'; +import { NntpPool } from './lib/NntpPool.js'; +import { HeaderWorker } from './workers/HeaderWorker.js'; +import { FileWorker } from './workers/FileWorker.js'; +import { CollectionWorker } from './workers/CollectionWorker.js'; +import { BodyWorker } from './workers/BodyWorker.js'; +import log4js from './lib/logger.js'; + +const logger = log4js.getLogger(); + +export class Application { + constructor() { + this.pool = new NntpPool(); + const collectionWorker = new CollectionWorker(); + const fileWorker = new FileWorker(collectionWorker.queue); + const bodyWorker = new BodyWorker(null); // HeaderWorker is not yet initialized + this.headerWorker = new HeaderWorker(fileWorker.queue, bodyWorker.queue); + bodyWorker.headerQueue = this.headerWorker.queue; // Now we can set it + } + + async run() { + let conn; + try { + conn = await this.pool.acquire(); + logger.info('NNTP connection acquired from pool.'); + + logger.debug(`Server date: ${await conn.date()}`); + + const group = await conn.group('alt.binaries.test'); + logger.debug(`Group info: ${JSON.stringify(group)}`); + + const overview = await conn.xover(group.first, group.last); + logger.info(`Fetched ${overview.overviews.length} headers.`); + + for (const [id, header] of overview.overviews) { + await this.headerWorker.queue.add('process-header', header); + } + + if (overview.overviews.length > 0) { + const lastId = overview.overviews[overview.overviews.length - 1][0]; + logger.info(`Last header ID queued: ${lastId}`); + } + } catch (error) { + logger.error('Error in main execution:', error); + } finally { + if (conn) { + this.pool.release(conn); + } + } + } + + async shutdown() { + logger.info('Gracefully shutting down...'); + await this.pool.shutdown(); + process.exit(0); + } +} diff --git a/src/download.js b/src/bin/download.js similarity index 51% rename from src/download.js rename to src/bin/download.js index 6e671ea..6d0c54b 100644 --- a/src/download.js +++ b/src/bin/download.js @@ -1,20 +1,20 @@ import 'dotenv/config'; -import { getDb } from './database.js'; -import { acquire, release, shutdown, setPoolSize } from './nntp.pool.js'; -import { decodeYenc, parseYencMeta } from './yenc.util.js'; +import { getDb } from '../lib/database.js'; +import { NntpPool } from '../lib/NntpPool.js'; +import { YencFile } from '../lib/YencFile.js'; import fs from 'fs/promises'; -import log4js from './logger.js'; +import log4js from '../lib/logger.js'; const logger = log4js.getLogger('download'); -async function downloadPart(partNumber, segment, targetBuffer) { +async function downloadAndProcessPart(pool, partNumber, segment, yencFile) { let conn; try { - conn = await acquire(); + conn = await pool.acquire(); await conn.group('alt.binaries.test'); logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`); const bodyBuffer = (await conn.body(`<${segment.id}>`)).data; - decodeYenc(bodyBuffer, targetBuffer); + yencFile.processPart(bodyBuffer); } catch (error) { if (error.code === 430) { logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`); @@ -23,13 +23,14 @@ async function downloadPart(partNumber, segment, targetBuffer) { } } finally { if (conn) { - release(conn); + pool.release(conn); } } } async function downloadFile(fileId, numConnections) { - setPoolSize(numConnections); + const pool = new NntpPool(numConnections); + const yencFile = new YencFile(); const db = await getDb(); const file = await db.get('SELECT * FROM files WHERE id = ?', fileId); @@ -44,37 +45,21 @@ async function downloadFile(fileId, numConnections) { const messageIds = JSON.parse(file.message_ids); const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10)); - // Download the first part to get the total file size - const [firstPartNumber, firstSegment] = sortedParts[0]; - let firstBodyBuffer; - let conn = await acquire(); - try { - await conn.group('alt.binaries.test'); - firstBodyBuffer = (await conn.body(`<${firstSegment.id}>`)).data; - } finally { - release(conn); - } - - const meta = parseYencMeta(firstBodyBuffer); - const totalSize = meta.total; - if (!totalSize) { - throw new Error('Could not determine total file size from yEnc metadata.'); - } - - const targetBuffer = Buffer.alloc(totalSize); - logger.info(`Allocated buffer of size ${totalSize} for file "${file.filename}".`); - - // Decode the first part and write it to the target buffer - decodeYenc(firstBodyBuffer, targetBuffer); - - const downloadPromises = sortedParts.slice(1).map(([partNumber, segment]) => - downloadPart(partNumber, segment, targetBuffer) + const downloadPromises = sortedParts.map(([partNumber, segment]) => + downloadAndProcessPart(pool, partNumber, segment, yencFile) ); await Promise.all(downloadPromises); - await fs.writeFile(file.filename, targetBuffer); - logger.info(`File "${file.filename}" downloaded successfully.`); + const completeFile = yencFile.getBuffer(); + if (completeFile) { + await fs.writeFile(file.filename, completeFile); + logger.info(`File "${file.filename}" downloaded successfully.`); + } else { + logger.error('Could not download all parts of the file.'); + } + + await pool.shutdown(); } const args = process.argv.slice(2); @@ -89,4 +74,4 @@ if (!fileId || isNaN(fileId)) { process.exit(1); } -downloadFile(fileId, numConnections).finally(() => shutdown()); +downloadFile(fileId, numConnections); diff --git a/src/nzb.js b/src/bin/nzb.js similarity index 94% rename from src/nzb.js rename to src/bin/nzb.js index 2127a68..d399223 100644 --- a/src/nzb.js +++ b/src/bin/nzb.js @@ -1,8 +1,8 @@ import 'dotenv/config'; -import { getDb } from './database.js'; +import { getDb } from '../lib/database.js'; import { create } from 'xmlbuilder2'; import fs from 'fs/promises'; -import log4js from './logger.js'; +import log4js from '../lib/logger.js'; const logger = log4js.getLogger('nzb'); diff --git a/src/bin/start.js b/src/bin/start.js new file mode 100644 index 0000000..f70ee33 --- /dev/null +++ b/src/bin/start.js @@ -0,0 +1,6 @@ +import { Application } from '../Application.js'; + +const app = new Application(); +app.run(); + +process.on('SIGINT', () => app.shutdown()); diff --git a/src/body.worker.js b/src/body.worker.js deleted file mode 100644 index f620dae..0000000 --- a/src/body.worker.js +++ /dev/null @@ -1,57 +0,0 @@ -import { Queue, Worker } from 'bullmq'; -import log4js from './logger.js'; -import { headerQueue } from './header.worker.js'; -import { acquire, release } from './nntp.pool.js'; -import { decodeYenc } from './yenc.util.js'; - -const bodyLogger = log4js.getLogger('body'); - -const connection = { - host: process.env.REDIS_HOST || 'localhost', - port: process.env.REDIS_PORT || 6379, -}; - -export const bodyQueue = new Queue('body-queue', { connection }); - -const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/; - -export const startBodyWorker = () => { - const bodyWorker = new Worker('body-queue', async job => { - const { header } = job.data; - bodyLogger.debug(`Processing header with unparsable subject: ${header.subject}`); - - let conn; - try { - conn = await acquire(); - const bodyBuffer = (await conn.body(header['message-id'])).data; - const decodedBuffer = decodeYenc(bodyBuffer); - const firstLine = decodedBuffer.toString().split('\\n')[0]; - - const match = firstLine.match(YENC_REGEX); - - if (match) { - const part = parseInt(match[1], 10); - const total = parseInt(match[2], 10); - const filename = match[3].trim(); - - const newSubject = `"${filename}" yEnc (${part}/${total})`; - header.subject = newSubject; - - bodyLogger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); - await headerQueue.add('process-header', header); - } else { - bodyLogger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`); - } - } catch (error) { - bodyLogger.error('Error in body worker:', error); - } finally { - if (conn) { - release(conn); - } - } - }, { connection }); - - bodyWorker.on('failed', (job, err) => { - bodyLogger.error(`Body job ${job.id} failed with error: ${err.message}`); - }); -}; diff --git a/src/header.worker.js b/src/header.worker.js deleted file mode 100644 index 14f1d5e..0000000 --- a/src/header.worker.js +++ /dev/null @@ -1,54 +0,0 @@ -import { Queue, Worker } from 'bullmq'; -import Redis from 'ioredis'; -import log4js from './logger.js'; -import { fileQueue } from './file.worker.js'; -import { bodyQueue } from './body.worker.js'; - -const logger = log4js.getLogger('header'); - -const connection = { - host: process.env.REDIS_HOST || 'localhost', - port: process.env.REDIS_PORT || 6379, -}; - -const redis = new Redis(connection); - -export const headerQueue = new Queue('header-queue', { connection }); - -const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/; - -export const startHeaderWorker = () => { - const headerWorker = new Worker('header-queue', async job => { - const header = job.data; - const subject = header.subject; - - const match = subject.match(SUBJECT_REGEX); - - if (match) { - const filename = match[1]; - const part = parseInt(match[2], 10); - const total = parseInt(match[3], 10); - - const fileKey = `file:${filename}`; - await redis.hset(fileKey, part, JSON.stringify(header)); - - const partCount = await redis.hlen(fileKey); - - if (partCount === total) { - const fileParts = await redis.hgetall(fileKey); - await fileQueue.add('process-file', { filename, parts: fileParts }); - await redis.del(fileKey); - logger.info(`File "${filename}" is complete and moved to file-queue.`, fileParts); - } else { - logger.info(`Stored part ${part}/${total} for file "${filename}"`); - } - } else { - logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`); - await bodyQueue.add('process-body', { header }); - } - }, { connection }); - - headerWorker.on('failed', (job, err) => { - logger.error(`Header job ${job.id} failed with error: ${err.message}`); - }); -}; diff --git a/src/index.js b/src/index.js deleted file mode 100644 index 8368104..0000000 --- a/src/index.js +++ /dev/null @@ -1,53 +0,0 @@ -import 'dotenv/config'; -import { headerQueue, startHeaderWorker } from './header.worker.js'; -import { startFileWorker } from './file.worker.js'; -import { startCollectionWorker } from './collection.worker.js'; -import { startBodyWorker } from './body.worker.js'; -import log4js from './logger.js'; -import { acquire, release, shutdown } from './nntp.pool.js'; - -const logger = log4js.getLogger(); - -async function main() { - let conn; - try { - conn = await acquire(); - logger.info('NNTP connection acquired from pool.'); - - logger.debug(`Server date: ${await conn.date()}`); - - const group = await conn.group('alt.binaries.test'); - logger.debug(`Group info: ${JSON.stringify(group)}`); - - const overview = await conn.xover(group.first, group.group); - logger.info(`Fetched ${overview.overviews.length} headers.`); - - for (const [id, header] of overview.overviews) { - await headerQueue.add('process-header', header); - } - - if (overview.overviews.length > 0) { - const lastId = overview.overviews[overview.overviews.length - 1][0]; - logger.info(`Last header ID queued: ${lastId}`); - } - } catch (error) { - logger.error('Error in main execution:', error); - } finally { - if (conn) { - release(conn); - logger.info('NNTP connection released back to the pool.'); - } - } -} - -startHeaderWorker(); -startFileWorker(); -startCollectionWorker(); -startBodyWorker(); -main(); - -process.on('SIGINT', async () => { - logger.info('Gracefully shutting down...'); - await shutdown(); - process.exit(0); -}); diff --git a/src/lib/NntpPool.js b/src/lib/NntpPool.js new file mode 100644 index 0000000..3d87d5f --- /dev/null +++ b/src/lib/NntpPool.js @@ -0,0 +1,70 @@ +import { NNTP } from 'nntp-js'; +import log4js from './logger.js'; + +const logger = log4js.getLogger('pool'); + +export class NntpPool { + constructor(poolSize = 10) { + this.poolSize = poolSize; + this.allConnections = new Set(); + this.idleConnections = []; + this.waiters = []; + this.createdCount = 0; + } + + async _createConnection() { + const config = { + host: process.env.NNTP_HOST, + user: process.env.NNTP_USER, + password: process.env.NNTP_PASS, + port: 443, + secure: true, + }; + + const conn = new NNTP(config.host, 119); + await conn.connect(); + await conn.login(config.user, config.password); + this.allConnections.add(conn); + return conn; + } + + async acquire() { + if (this.idleConnections.length > 0) { + logger.debug('Reusing existing connection from pool.'); + return this.idleConnections.pop(); + } + + if (this.createdCount < this.poolSize) { + this.createdCount++; + logger.info(`Creating new connection (${this.createdCount}/${this.poolSize}).`); + return this._createConnection(); + } + + logger.info(`Pool maxed out at ${this.poolSize}. Waiting for a connection to become available.`); + return new Promise(resolve => this.waiters.push(resolve)); + } + + release(conn) { + if (this.waiters.length > 0) { + logger.debug('Releasing connection directly to a waiting task.'); + const resolve = this.waiters.shift(); + resolve(conn); + } else { + logger.debug('Returning connection to the idle pool.'); + this.idleConnections.push(conn); + } + } + + async shutdown() { + logger.info('Shutting down all connections in the pool.'); + const shutdownPromises = []; + for (const conn of this.allConnections) { + shutdownPromises.push(conn.quit()); + } + await Promise.all(shutdownPromises); + this.allConnections.clear(); + this.idleConnections.length = 0; + this.waiters.length = 0; + this.createdCount = 0; + } +} diff --git a/src/lib/YencFile.js b/src/lib/YencFile.js new file mode 100644 index 0000000..77624a1 --- /dev/null +++ b/src/lib/YencFile.js @@ -0,0 +1,89 @@ +import yencode from 'yencode'; +import { crc32 } from 'crc'; +import log4js from './logger.js'; + +const logger = log4js.getLogger('yenc'); + +export class YencFile { + constructor() { + this.targetBuffer = null; + this.totalSize = 0; + } + + _parseMetaLine(line) { + const meta = {}; + line.split(' ').forEach(part => { + const eqIndex = part.indexOf('='); + if (eqIndex !== -1) { + const key = part.slice(0, eqIndex); + const value = part.slice(eqIndex + 1); + meta[key] = isNaN(value) ? value : parseInt(value, 10); + } + }); + return meta; + } + + processPart(encodedBuffer) { + const headerBeginMarker = Buffer.from('=ybegin'); + const headerPartMarker = Buffer.from('=ypart'); + const footerMarker = Buffer.from('\r\n=yend'); + + const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); + const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); + const footerIndex = encodedBuffer.indexOf(footerMarker, headerPartIndex); + + if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) { + throw new Error('Invalid yEnc part: missing header or footer.'); + } + + const isFirstPart = headerBeginIndex !== -1; + const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex; + + const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex); + const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString(); + const headerMeta = this._parseMetaLine(headerLine); + + const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2); + const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString(); + const footerMeta = this._parseMetaLine(footerLine); + + if (isFirstPart && !this.targetBuffer) { + this.totalSize = headerMeta.total; + if (!this.totalSize) { + throw new Error('Could not determine total file size from =ybegin header.'); + } + this.targetBuffer = Buffer.alloc(this.totalSize); + logger.info(`Allocated buffer of size ${this.totalSize} for file.`); + } + + if (!this.targetBuffer) { + throw new Error('Cannot process yEnc part without a target buffer. Process the first part (with =ybegin) first.'); + } + + const contentStartIndex = encodedBuffer.indexOf('\r\n', headerPartIndex) + 2; + const contentEndIndex = footerIndex; + const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex); + + const decoded = yencode.decode(dataToDecode); + + if (decoded.length !== footerMeta.size) { + throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${footerMeta.size}).`); + } + logger.debug('Part size check passed.'); + + const calculatedCrc = crc32(decoded).toString(16); + const expectedCrc = footerMeta.pcrc32; + if (calculatedCrc !== expectedCrc) { + throw new Error(`CRC32 mismatch: expected ${expectedCrc}, but got ${calculatedCrc}.`); + } + logger.debug('CRC32 check passed.'); + + const offset = headerMeta.begin ? headerMeta.begin - 1 : 0; + decoded.copy(this.targetBuffer, offset); + logger.info(`Processed part ${headerMeta.part}/${headerMeta.total} and wrote to buffer at offset ${offset}.`); + } + + getBuffer() { + return this.targetBuffer; + } +} diff --git a/src/database.js b/src/lib/database.js similarity index 100% rename from src/database.js rename to src/lib/database.js diff --git a/src/logger.js b/src/lib/logger.js similarity index 92% rename from src/logger.js rename to src/lib/logger.js index 9485208..7d8584f 100644 --- a/src/logger.js +++ b/src/lib/logger.js @@ -20,6 +20,7 @@ log4js.configure({ collection: { appenders: ['console', 'file'], level: 'info' }, body: { appenders: ['console', 'file'], level: 'info' }, pool: { appenders: ['console', 'file'], level: 'info' }, + yenc: { appenders: ['console', 'file'], level: 'info' }, }, }); diff --git a/src/nntp.pool.js b/src/nntp.pool.js deleted file mode 100644 index f511b23..0000000 --- a/src/nntp.pool.js +++ /dev/null @@ -1,70 +0,0 @@ -import { NNTP } from 'nntp-js'; -import log4js from './logger.js'; - -const logger = log4js.getLogger('pool'); - -let POOL_SIZE = 10; -const allConnections = new Set(); // Tracks all connections for proper shutdown -const idleConnections = []; // Tracks available, ready-to-use connections -const waiters = []; // Queue of promises for tasks waiting for a connection -let createdCount = 0; // A counter for total connections created - -export const setPoolSize = (size) => { - POOL_SIZE = size; -}; - -const createConnection = async () => { - const config = { - host: process.env.NNTP_HOST, - user: process.env.NNTP_USER, - password: process.env.NNTP_PASS, - port: 443, - secure: true, - }; - - const conn = new NNTP(config.host, 119); - await conn.connect(); - await conn.login(config.user, config.password); - allConnections.add(conn); // Add to the set for shutdown tracking - return conn; -}; - -export const acquire = async () => { - if (idleConnections.length > 0) { - logger.debug('Reusing existing connection from pool.'); - return idleConnections.pop(); - } - - if (createdCount < POOL_SIZE) { - createdCount++; - logger.info(`Creating new connection (${createdCount}/${POOL_SIZE}).`); - return createConnection(); - } - - logger.info(`Pool maxed out at ${POOL_SIZE}. Waiting for a connection to become available.`); - return new Promise(resolve => waiters.push(resolve)); -}; - -export const release = conn => { - if (waiters.length > 0) { - logger.debug('Releasing connection directly to a waiting task.'); - const resolve = waiters.shift(); - resolve(conn); - } else { - logger.debug('Returning connection to the idle pool.'); - idleConnections.push(conn); - } -}; - -export const shutdown = async () => { - logger.info('Shutting down all connections in the pool.'); - const shutdownPromises = []; - for (const conn of allConnections) { - shutdownPromises.push(conn.quit()); - } - await Promise.all(shutdownPromises); - allConnections.clear(); - idleConnections.length = 0; - waiters.length = 0; - createdCount = 0; -}; diff --git a/src/workers/BodyWorker.js b/src/workers/BodyWorker.js new file mode 100644 index 0000000..e76ad7a --- /dev/null +++ b/src/workers/BodyWorker.js @@ -0,0 +1,60 @@ +import { Queue, Worker } from 'bullmq'; +import log4js from '../lib/logger.js'; +import { NntpPool } from '../lib/NntpPool.js'; +import { decodeYenc } from '../lib/yenc.util.js'; + +const logger = log4js.getLogger('body'); + +export class BodyWorker { + constructor(headerQueue) { + this.headerQueue = headerQueue; + this.queue = new Queue('body-queue', { + connection: { + host: process.env.REDIS_HOST || 'localhost', + port: process.env.REDIS_PORT || 6379, + }, + }); + this.pool = new NntpPool(); + this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection }); + + this.worker.on('failed', (job, err) => { + logger.error(`Body job ${job.id} failed with error: ${err.message}`); + }); + } + + async process(job) { + const { header } = job.data; + logger.debug(`Processing header with unparsable subject: ${header.subject}`); + + let conn; + try { + conn = await this.pool.acquire(); + const bodyBuffer = (await conn.body(header['message-id'])).data; + const decodedBuffer = decodeYenc(bodyBuffer); + const firstLine = decodedBuffer.toString().split('\\n')[0]; + + const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/; + const match = firstLine.match(YENC_REGEX); + + if (match) { + const part = parseInt(match[1], 10); + const total = parseInt(match[2], 10); + const filename = match[3].trim(); + + const newSubject = `"${filename}" yEnc (${part}/${total})`; + header.subject = newSubject; + + logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); + await this.headerQueue.add('process-header', header); + } else { + logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`); + } + } catch (error) { + logger.error('Error in body worker:', error); + } finally { + if (conn) { + this.pool.release(conn); + } + } + } +} diff --git a/src/collection.worker.js b/src/workers/CollectionWorker.js similarity index 65% rename from src/collection.worker.js rename to src/workers/CollectionWorker.js index 360474b..bd6c659 100644 --- a/src/collection.worker.js +++ b/src/workers/CollectionWorker.js @@ -1,23 +1,29 @@ import { Queue, Worker } from 'bullmq'; -import log4js from './logger.js'; -import { getDb } from './database.js'; -import { acquire, release } from './nntp.pool.js'; +import log4js from '../lib/logger.js'; +import { getDb } from '../lib/database.js'; +import { NntpPool } from '../lib/NntpPool.js'; import { createExtractorFromData } from 'node-unrar-js'; -import { decodeYenc } from './yenc.util.js'; +import { decodeYenc } from '../lib/yenc.util.js'; const logger = log4js.getLogger('collection'); -const connection = { - host: process.env.REDIS_HOST || 'localhost', - port: process.env.REDIS_PORT || 6379, -}; +export class CollectionWorker { + constructor() { + this.queue = new Queue('collection-queue', { + connection: { + host: process.env.REDIS_HOST || 'localhost', + port: process.env.REDIS_PORT || 6379, + }, + }); + this.pool = new NntpPool(); + this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection }); -export const collectionQueue = new Queue('collection-queue', { connection }); + this.worker.on('failed', (job, err) => { + logger.error(`Collection job ${job.id} failed with error: ${err.message}`); + }); + } -const RAR_REGEX = /\.part0*1\.rar$/; - -export const startCollectionWorker = () => { - const collectionWorker = new Worker('collection-queue', async job => { + async process(job) { const { fileId } = job.data; logger.debug(`Processing file ID ${fileId} for collection.`); @@ -29,6 +35,7 @@ export const startCollectionWorker = () => { return; } + const RAR_REGEX = /\.part0*1\.rar$/; if (RAR_REGEX.test(file.filename)) { logger.info(`File "${file.filename}" is the first part of a RAR set.`); @@ -42,7 +49,7 @@ export const startCollectionWorker = () => { let conn; try { - conn = await acquire(); + conn = await this.pool.acquire(); await conn.group('alt.binaries.test'); const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data; const decodedBuffer = decodeYenc(bodyBuffer); @@ -57,15 +64,11 @@ export const startCollectionWorker = () => { } } finally { if (conn) { - release(conn); + this.pool.release(conn); } } } else { logger.debug(`File "${file.filename}" is not the first part of a RAR set.`); } - }, { connection }); - - collectionWorker.on('failed', (job, err) => { - logger.error(`Collection job ${job.id} failed with error: ${err.message}`); - }); -}; + } +} diff --git a/src/file.worker.js b/src/workers/FileWorker.js similarity index 67% rename from src/file.worker.js rename to src/workers/FileWorker.js index 092a52f..6dbd63e 100644 --- a/src/file.worker.js +++ b/src/workers/FileWorker.js @@ -1,19 +1,26 @@ import { Queue, Worker } from 'bullmq'; -import log4js from './logger.js'; -import { getDb } from './database.js'; -import { collectionQueue } from './collection.worker.js'; +import log4js from '../lib/logger.js'; +import { getDb } from '../lib/database.js'; const logger = log4js.getLogger('file'); -const connection = { - host: process.env.REDIS_HOST || 'localhost', - port: process.env.REDIS_PORT || 6379, -}; +export class FileWorker { + constructor(collectionQueue) { + this.collectionQueue = collectionQueue; + this.queue = new Queue('file-queue', { + connection: { + host: process.env.REDIS_HOST || 'localhost', + port: process.env.REDIS_PORT || 6379, + }, + }); + this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection }); -export const fileQueue = new Queue('file-queue', { connection }); + this.worker.on('failed', (job, err) => { + logger.error(`File job ${job.id} failed with error: ${err.message}`); + }); + } -export const startFileWorker = () => { - const fileWorker = new Worker('file-queue', async job => { + async process(job) { const { filename, parts } = job.data; const partCount = Object.keys(parts).length; logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`); @@ -53,11 +60,7 @@ export const startFileWorker = () => { const fileId = result.lastID; logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`); - await collectionQueue.add('process-collection', { fileId }); + await this.collectionQueue.add('process-collection', { fileId }); logger.debug(`Added file ID ${fileId} to collection queue.`); - }, { connection }); - - fileWorker.on('failed', (job, err) => { - logger.error(`File job ${job.id} failed with error: ${err.message}`); - }); -}; + } +} diff --git a/src/workers/HeaderWorker.js b/src/workers/HeaderWorker.js new file mode 100644 index 0000000..c55b83c --- /dev/null +++ b/src/workers/HeaderWorker.js @@ -0,0 +1,54 @@ +import { Queue, Worker } from 'bullmq'; +import Redis from 'ioredis'; +import log4js from '../lib/logger.js'; + +const logger = log4js.getLogger('header'); + +export class HeaderWorker { + constructor(fileQueue, bodyQueue) { + this.fileQueue = fileQueue; + this.bodyQueue = bodyQueue; + this.queue = new Queue('header-queue', { + connection: { + host: process.env.REDIS_HOST || 'localhost', + port: process.env.REDIS_PORT || 6379, + }, + }); + this.redis = new Redis(this.queue.opts.connection); + this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection }); + + this.worker.on('failed', (job, err) => { + logger.error(`Header job ${job.id} failed with error: ${err.message}`); + }); + } + + async process(job) { + const header = job.data; + const subject = header.subject; + const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/; + const match = subject.match(SUBJECT_REGEX); + + if (match) { + const filename = match[1]; + const part = parseInt(match[2], 10); + const total = parseInt(match[3], 10); + + const fileKey = `file:${filename}`; + await this.redis.hset(fileKey, part, JSON.stringify(header)); + + const partCount = await this.redis.hlen(fileKey); + + if (partCount === total) { + const fileParts = await this.redis.hgetall(fileKey); + await this.fileQueue.add('process-file', { filename, parts: fileParts }); + await this.redis.del(fileKey); + logger.info(`File "${filename}" is complete and moved to file-queue.`); + } else { + logger.info(`Stored part ${part}/${total} for file "${filename}"`); + } + } else { + logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`); + await this.bodyQueue.add('process-body', { header }); + } + } +} diff --git a/src/yenc.util.js b/src/yenc.util.js deleted file mode 100644 index 7639fd4..0000000 --- a/src/yenc.util.js +++ /dev/null @@ -1,91 +0,0 @@ -import yencode from 'yencode'; - -/** - * Parses yEnc metadata from a header or footer line. - * @param {string} line The line to parse. - * @returns {object} A key-value map of the metadata. - */ -function parseMetaLine(line) { - const meta = {}; - const parts = line.split(' '); - for (const part of parts) { - const eqIndex = part.indexOf('='); - if (eqIndex !== -1) { - meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1); - } - } - return meta; -} - -/** - * Extracts metadata from a yEnc-encoded buffer. - * @param {Buffer} encodedBuffer The yEnc-encoded buffer. - * @returns {object} An object containing all metadata from the header and footer. - */ -export function parseYencMeta(encodedBuffer) { - const headerBegin = Buffer.from('=ybegin'); - const headerPart = Buffer.from('=ypart'); - const footer = Buffer.from('\n=yend'); - - const headerStartIndex = encodedBuffer.indexOf(headerBegin); - const partStartIndex = encodedBuffer.indexOf(headerPart); - const footerStartIndex = encodedBuffer.lastIndexOf(footer); - - if ((headerStartIndex === -1 && partStartIndex === -1) || footerStartIndex === -1) { - throw new Error('Invalid yEnc data: missing header or footer.'); - } - - const headerIndex = headerStartIndex !== -1 ? headerStartIndex : partStartIndex; - const headerEndIndex = encodedBuffer.indexOf(Buffer.from('\\n'), headerIndex); - const headerLine = encodedBuffer.subarray(headerIndex, headerEndIndex).toString(); - - const footerEndIndex = encodedBuffer.indexOf(Buffer.from('\\n'), footerStartIndex); - const footerLine = encodedBuffer.subarray(footerStartIndex, footerEndIndex).toString(); - - const meta = { - ...parseMetaLine(headerLine), - ...parseMetaLine(footerLine), - }; - - // Convert numeric values - for (const key in meta) { - if (!isNaN(meta[key])) { - meta[key] = parseInt(meta[key], 10); - } - } - - return meta; -} - -/** - * Decodes a yEnc-encoded buffer and optionally writes it to a target buffer. - * @param {Buffer} encodedBuffer The yEnc-encoded buffer. - * @param {Buffer} [targetBuffer] An optional buffer to write the decoded data into. - * @returns {Buffer} The decoded data (or the target buffer if provided). - */ -export function decodeYenc(encodedBuffer, targetBuffer) { - const meta = parseYencMeta(encodedBuffer); - - const header = meta.part ? Buffer.from(`=ypart`) : Buffer.from(`=ybegin`); - const footer = Buffer.from(`\n=yend`); - - const contentStartIndex = encodedBuffer.indexOf('\n', encodedBuffer.indexOf(header)) + 1; - const contentEndIndex = encodedBuffer.lastIndexOf(footer); - const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex); - - const decoded = yencode.decode(dataToDecode); - - if (decoded.length !== meta.size) { - throw new Error(`Decoded size (${decoded.length}) does not match expected size (${meta.size}).`); - } - - if (targetBuffer) { - if (meta.begin === undefined) { - throw new Error('Cannot write to target buffer: missing "begin" offset in yEnc metadata.'); - } - decoded.copy(targetBuffer, meta.begin - 1); - return targetBuffer; - } - - return decoded; -}