application restructuring, added crc check when decoding yenc files

This commit is contained in:
Daan Meijer 2026-05-31 22:07:14 +02:00
parent efbc1237d6
commit fc947953e4
19 changed files with 456 additions and 406 deletions

46
package-lock.json generated
View File

@ -10,6 +10,7 @@
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"bullmq": "^5.77.3", "bullmq": "^5.77.3",
"crc": "^4.3.2",
"dotenv": "^16.3.1", "dotenv": "^16.3.1",
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"log4js": "^6.9.1", "log4js": "^6.9.1",
@ -225,7 +226,7 @@
"readable-stream": "^3.4.0" "readable-stream": "^3.4.0"
} }
}, },
"node_modules/buffer": { "node_modules/bl/node_modules/buffer": {
"version": "5.7.1", "version": "5.7.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
@ -249,6 +250,32 @@
"ieee754": "^1.1.13" "ieee754": "^1.1.13"
} }
}, },
"node_modules/buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
},
"node_modules/bullmq": { "node_modules/bullmq": {
"version": "5.77.3", "version": "5.77.3",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz", "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz",
@ -304,6 +331,23 @@
"node": ">=0.10.0" "node": ">=0.10.0"
} }
}, },
"node_modules/crc": {
"version": "4.3.2",
"resolved": "https://registry.npmjs.org/crc/-/crc-4.3.2.tgz",
"integrity": "sha512-uGDHf4KLLh2zsHa8D8hIQ1H/HtFQhyHrc0uhHBcoKGol/Xnb+MPYfUMw7cvON6ze/GUESTudKayDcJC5HnJv1A==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"peerDependencies": {
"buffer": ">=6.0.3"
},
"peerDependenciesMeta": {
"buffer": {
"optional": true
}
}
},
"node_modules/cron-parser": { "node_modules/cron-parser": {
"version": "4.9.0", "version": "4.9.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz",

View File

@ -2,11 +2,13 @@
"name": "usenet-indexer", "name": "usenet-indexer",
"version": "1.0.0", "version": "1.0.0",
"description": "", "description": "",
"main": "index.js", "main": "src/bin/start.js",
"type": "module", "type": "module",
"scripts": { "scripts": {
"test": "echo \"Error: no test specified\" && exit 1", "test": "echo \"Error: no test specified\" && exit 1",
"start": "node src/index.js" "start": "node src/bin/start.js",
"download": "node src/bin/download.js",
"nzb": "node src/bin/nzb.js"
}, },
"keywords": [], "keywords": [],
"author": "", "author": "",
@ -22,6 +24,7 @@
"sqlite": "^5.1.1", "sqlite": "^5.1.1",
"sqlite3": "^6.0.1", "sqlite3": "^6.0.1",
"xmlbuilder2": "^3.1.1", "xmlbuilder2": "^3.1.1",
"yencode": "^1.0.1" "yencode": "^1.0.1",
"crc": "^4.3.2"
} }
} }

57
src/Application.js Normal file
View File

@ -0,0 +1,57 @@
import 'dotenv/config';
import { NntpPool } from './lib/NntpPool.js';
import { HeaderWorker } from './workers/HeaderWorker.js';
import { FileWorker } from './workers/FileWorker.js';
import { CollectionWorker } from './workers/CollectionWorker.js';
import { BodyWorker } from './workers/BodyWorker.js';
import log4js from './lib/logger.js';
const logger = log4js.getLogger();
export class Application {
constructor() {
this.pool = new NntpPool();
const collectionWorker = new CollectionWorker();
const fileWorker = new FileWorker(collectionWorker.queue);
const bodyWorker = new BodyWorker(null); // HeaderWorker is not yet initialized
this.headerWorker = new HeaderWorker(fileWorker.queue, bodyWorker.queue);
bodyWorker.headerQueue = this.headerWorker.queue; // Now we can set it
}
async run() {
let conn;
try {
conn = await this.pool.acquire();
logger.info('NNTP connection acquired from pool.');
logger.debug(`Server date: ${await conn.date()}`);
const group = await conn.group('alt.binaries.test');
logger.debug(`Group info: ${JSON.stringify(group)}`);
const overview = await conn.xover(group.first, group.last);
logger.info(`Fetched ${overview.overviews.length} headers.`);
for (const [id, header] of overview.overviews) {
await this.headerWorker.queue.add('process-header', header);
}
if (overview.overviews.length > 0) {
const lastId = overview.overviews[overview.overviews.length - 1][0];
logger.info(`Last header ID queued: ${lastId}`);
}
} catch (error) {
logger.error('Error in main execution:', error);
} finally {
if (conn) {
this.pool.release(conn);
}
}
}
async shutdown() {
logger.info('Gracefully shutting down...');
await this.pool.shutdown();
process.exit(0);
}
}

View File

@ -1,20 +1,20 @@
import 'dotenv/config'; import 'dotenv/config';
import { getDb } from './database.js'; import { getDb } from '../lib/database.js';
import { acquire, release, shutdown, setPoolSize } from './nntp.pool.js'; import { NntpPool } from '../lib/NntpPool.js';
import { decodeYenc, parseYencMeta } from './yenc.util.js'; import { YencFile } from '../lib/YencFile.js';
import fs from 'fs/promises'; import fs from 'fs/promises';
import log4js from './logger.js'; import log4js from '../lib/logger.js';
const logger = log4js.getLogger('download'); const logger = log4js.getLogger('download');
async function downloadPart(partNumber, segment, targetBuffer) { async function downloadAndProcessPart(pool, partNumber, segment, yencFile) {
let conn; let conn;
try { try {
conn = await acquire(); conn = await pool.acquire();
await conn.group('alt.binaries.test'); await conn.group('alt.binaries.test');
logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`); logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data; const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
decodeYenc(bodyBuffer, targetBuffer); yencFile.processPart(bodyBuffer);
} catch (error) { } catch (error) {
if (error.code === 430) { if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`); logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
@ -23,13 +23,14 @@ async function downloadPart(partNumber, segment, targetBuffer) {
} }
} finally { } finally {
if (conn) { if (conn) {
release(conn); pool.release(conn);
} }
} }
} }
async function downloadFile(fileId, numConnections) { async function downloadFile(fileId, numConnections) {
setPoolSize(numConnections); const pool = new NntpPool(numConnections);
const yencFile = new YencFile();
const db = await getDb(); const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId); const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
@ -44,37 +45,21 @@ async function downloadFile(fileId, numConnections) {
const messageIds = JSON.parse(file.message_ids); const messageIds = JSON.parse(file.message_ids);
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10)); const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
// Download the first part to get the total file size const downloadPromises = sortedParts.map(([partNumber, segment]) =>
const [firstPartNumber, firstSegment] = sortedParts[0]; downloadAndProcessPart(pool, partNumber, segment, yencFile)
let firstBodyBuffer;
let conn = await acquire();
try {
await conn.group('alt.binaries.test');
firstBodyBuffer = (await conn.body(`<${firstSegment.id}>`)).data;
} finally {
release(conn);
}
const meta = parseYencMeta(firstBodyBuffer);
const totalSize = meta.total;
if (!totalSize) {
throw new Error('Could not determine total file size from yEnc metadata.');
}
const targetBuffer = Buffer.alloc(totalSize);
logger.info(`Allocated buffer of size ${totalSize} for file "${file.filename}".`);
// Decode the first part and write it to the target buffer
decodeYenc(firstBodyBuffer, targetBuffer);
const downloadPromises = sortedParts.slice(1).map(([partNumber, segment]) =>
downloadPart(partNumber, segment, targetBuffer)
); );
await Promise.all(downloadPromises); await Promise.all(downloadPromises);
await fs.writeFile(file.filename, targetBuffer); const completeFile = yencFile.getBuffer();
logger.info(`File "${file.filename}" downloaded successfully.`); if (completeFile) {
await fs.writeFile(file.filename, completeFile);
logger.info(`File "${file.filename}" downloaded successfully.`);
} else {
logger.error('Could not download all parts of the file.');
}
await pool.shutdown();
} }
const args = process.argv.slice(2); const args = process.argv.slice(2);
@ -89,4 +74,4 @@ if (!fileId || isNaN(fileId)) {
process.exit(1); process.exit(1);
} }
downloadFile(fileId, numConnections).finally(() => shutdown()); downloadFile(fileId, numConnections);

View File

@ -1,8 +1,8 @@
import 'dotenv/config'; import 'dotenv/config';
import { getDb } from './database.js'; import { getDb } from '../lib/database.js';
import { create } from 'xmlbuilder2'; import { create } from 'xmlbuilder2';
import fs from 'fs/promises'; import fs from 'fs/promises';
import log4js from './logger.js'; import log4js from '../lib/logger.js';
const logger = log4js.getLogger('nzb'); const logger = log4js.getLogger('nzb');

6
src/bin/start.js Normal file
View File

@ -0,0 +1,6 @@
import { Application } from '../Application.js';
const app = new Application();
app.run();
process.on('SIGINT', () => app.shutdown());

View File

@ -1,57 +0,0 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { headerQueue } from './header.worker.js';
import { acquire, release } from './nntp.pool.js';
import { decodeYenc } from './yenc.util.js';
const bodyLogger = log4js.getLogger('body');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
export const bodyQueue = new Queue('body-queue', { connection });
const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/;
export const startBodyWorker = () => {
const bodyWorker = new Worker('body-queue', async job => {
const { header } = job.data;
bodyLogger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn;
try {
conn = await acquire();
const bodyBuffer = (await conn.body(header['message-id'])).data;
const decodedBuffer = decodeYenc(bodyBuffer);
const firstLine = decodedBuffer.toString().split('\\n')[0];
const match = firstLine.match(YENC_REGEX);
if (match) {
const part = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
const filename = match[3].trim();
const newSubject = `"${filename}" yEnc (${part}/${total})`;
header.subject = newSubject;
bodyLogger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
await headerQueue.add('process-header', header);
} else {
bodyLogger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
} catch (error) {
bodyLogger.error('Error in body worker:', error);
} finally {
if (conn) {
release(conn);
}
}
}, { connection });
bodyWorker.on('failed', (job, err) => {
bodyLogger.error(`Body job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -1,54 +0,0 @@
import { Queue, Worker } from 'bullmq';
import Redis from 'ioredis';
import log4js from './logger.js';
import { fileQueue } from './file.worker.js';
import { bodyQueue } from './body.worker.js';
const logger = log4js.getLogger('header');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
const redis = new Redis(connection);
export const headerQueue = new Queue('header-queue', { connection });
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
export const startHeaderWorker = () => {
const headerWorker = new Worker('header-queue', async job => {
const header = job.data;
const subject = header.subject;
const match = subject.match(SUBJECT_REGEX);
if (match) {
const filename = match[1];
const part = parseInt(match[2], 10);
const total = parseInt(match[3], 10);
const fileKey = `file:${filename}`;
await redis.hset(fileKey, part, JSON.stringify(header));
const partCount = await redis.hlen(fileKey);
if (partCount === total) {
const fileParts = await redis.hgetall(fileKey);
await fileQueue.add('process-file', { filename, parts: fileParts });
await redis.del(fileKey);
logger.info(`File "${filename}" is complete and moved to file-queue.`, fileParts);
} else {
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
}
} else {
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
await bodyQueue.add('process-body', { header });
}
}, { connection });
headerWorker.on('failed', (job, err) => {
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -1,53 +0,0 @@
import 'dotenv/config';
import { headerQueue, startHeaderWorker } from './header.worker.js';
import { startFileWorker } from './file.worker.js';
import { startCollectionWorker } from './collection.worker.js';
import { startBodyWorker } from './body.worker.js';
import log4js from './logger.js';
import { acquire, release, shutdown } from './nntp.pool.js';
const logger = log4js.getLogger();
async function main() {
let conn;
try {
conn = await acquire();
logger.info('NNTP connection acquired from pool.');
logger.debug(`Server date: ${await conn.date()}`);
const group = await conn.group('alt.binaries.test');
logger.debug(`Group info: ${JSON.stringify(group)}`);
const overview = await conn.xover(group.first, group.group);
logger.info(`Fetched ${overview.overviews.length} headers.`);
for (const [id, header] of overview.overviews) {
await headerQueue.add('process-header', header);
}
if (overview.overviews.length > 0) {
const lastId = overview.overviews[overview.overviews.length - 1][0];
logger.info(`Last header ID queued: ${lastId}`);
}
} catch (error) {
logger.error('Error in main execution:', error);
} finally {
if (conn) {
release(conn);
logger.info('NNTP connection released back to the pool.');
}
}
}
startHeaderWorker();
startFileWorker();
startCollectionWorker();
startBodyWorker();
main();
process.on('SIGINT', async () => {
logger.info('Gracefully shutting down...');
await shutdown();
process.exit(0);
});

70
src/lib/NntpPool.js Normal file
View File

@ -0,0 +1,70 @@
import { NNTP } from 'nntp-js';
import log4js from './logger.js';
const logger = log4js.getLogger('pool');
export class NntpPool {
constructor(poolSize = 10) {
this.poolSize = poolSize;
this.allConnections = new Set();
this.idleConnections = [];
this.waiters = [];
this.createdCount = 0;
}
async _createConnection() {
const config = {
host: process.env.NNTP_HOST,
user: process.env.NNTP_USER,
password: process.env.NNTP_PASS,
port: 443,
secure: true,
};
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
this.allConnections.add(conn);
return conn;
}
async acquire() {
if (this.idleConnections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return this.idleConnections.pop();
}
if (this.createdCount < this.poolSize) {
this.createdCount++;
logger.info(`Creating new connection (${this.createdCount}/${this.poolSize}).`);
return this._createConnection();
}
logger.info(`Pool maxed out at ${this.poolSize}. Waiting for a connection to become available.`);
return new Promise(resolve => this.waiters.push(resolve));
}
release(conn) {
if (this.waiters.length > 0) {
logger.debug('Releasing connection directly to a waiting task.');
const resolve = this.waiters.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the idle pool.');
this.idleConnections.push(conn);
}
}
async shutdown() {
logger.info('Shutting down all connections in the pool.');
const shutdownPromises = [];
for (const conn of this.allConnections) {
shutdownPromises.push(conn.quit());
}
await Promise.all(shutdownPromises);
this.allConnections.clear();
this.idleConnections.length = 0;
this.waiters.length = 0;
this.createdCount = 0;
}
}

89
src/lib/YencFile.js Normal file
View File

@ -0,0 +1,89 @@
import yencode from 'yencode';
import { crc32 } from 'crc';
import log4js from './logger.js';
const logger = log4js.getLogger('yenc');
export class YencFile {
constructor() {
this.targetBuffer = null;
this.totalSize = 0;
}
_parseMetaLine(line) {
const meta = {};
line.split(' ').forEach(part => {
const eqIndex = part.indexOf('=');
if (eqIndex !== -1) {
const key = part.slice(0, eqIndex);
const value = part.slice(eqIndex + 1);
meta[key] = isNaN(value) ? value : parseInt(value, 10);
}
});
return meta;
}
processPart(encodedBuffer) {
const headerBeginMarker = Buffer.from('=ybegin');
const headerPartMarker = Buffer.from('=ypart');
const footerMarker = Buffer.from('\r\n=yend');
const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker);
const headerPartIndex = encodedBuffer.indexOf(headerPartMarker);
const footerIndex = encodedBuffer.indexOf(footerMarker, headerPartIndex);
if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) {
throw new Error('Invalid yEnc part: missing header or footer.');
}
const isFirstPart = headerBeginIndex !== -1;
const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex;
const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex);
const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString();
const headerMeta = this._parseMetaLine(headerLine);
const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2);
const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString();
const footerMeta = this._parseMetaLine(footerLine);
if (isFirstPart && !this.targetBuffer) {
this.totalSize = headerMeta.total;
if (!this.totalSize) {
throw new Error('Could not determine total file size from =ybegin header.');
}
this.targetBuffer = Buffer.alloc(this.totalSize);
logger.info(`Allocated buffer of size ${this.totalSize} for file.`);
}
if (!this.targetBuffer) {
throw new Error('Cannot process yEnc part without a target buffer. Process the first part (with =ybegin) first.');
}
const contentStartIndex = encodedBuffer.indexOf('\r\n', headerPartIndex) + 2;
const contentEndIndex = footerIndex;
const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex);
const decoded = yencode.decode(dataToDecode);
if (decoded.length !== footerMeta.size) {
throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${footerMeta.size}).`);
}
logger.debug('Part size check passed.');
const calculatedCrc = crc32(decoded).toString(16);
const expectedCrc = footerMeta.pcrc32;
if (calculatedCrc !== expectedCrc) {
throw new Error(`CRC32 mismatch: expected ${expectedCrc}, but got ${calculatedCrc}.`);
}
logger.debug('CRC32 check passed.');
const offset = headerMeta.begin ? headerMeta.begin - 1 : 0;
decoded.copy(this.targetBuffer, offset);
logger.info(`Processed part ${headerMeta.part}/${headerMeta.total} and wrote to buffer at offset ${offset}.`);
}
getBuffer() {
return this.targetBuffer;
}
}

View File

@ -20,6 +20,7 @@ log4js.configure({
collection: { appenders: ['console', 'file'], level: 'info' }, collection: { appenders: ['console', 'file'], level: 'info' },
body: { appenders: ['console', 'file'], level: 'info' }, body: { appenders: ['console', 'file'], level: 'info' },
pool: { appenders: ['console', 'file'], level: 'info' }, pool: { appenders: ['console', 'file'], level: 'info' },
yenc: { appenders: ['console', 'file'], level: 'info' },
}, },
}); });

View File

@ -1,70 +0,0 @@
import { NNTP } from 'nntp-js';
import log4js from './logger.js';
const logger = log4js.getLogger('pool');
let POOL_SIZE = 10;
const allConnections = new Set(); // Tracks all connections for proper shutdown
const idleConnections = []; // Tracks available, ready-to-use connections
const waiters = []; // Queue of promises for tasks waiting for a connection
let createdCount = 0; // A counter for total connections created
export const setPoolSize = (size) => {
POOL_SIZE = size;
};
const createConnection = async () => {
const config = {
host: process.env.NNTP_HOST,
user: process.env.NNTP_USER,
password: process.env.NNTP_PASS,
port: 443,
secure: true,
};
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
allConnections.add(conn); // Add to the set for shutdown tracking
return conn;
};
export const acquire = async () => {
if (idleConnections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return idleConnections.pop();
}
if (createdCount < POOL_SIZE) {
createdCount++;
logger.info(`Creating new connection (${createdCount}/${POOL_SIZE}).`);
return createConnection();
}
logger.info(`Pool maxed out at ${POOL_SIZE}. Waiting for a connection to become available.`);
return new Promise(resolve => waiters.push(resolve));
};
export const release = conn => {
if (waiters.length > 0) {
logger.debug('Releasing connection directly to a waiting task.');
const resolve = waiters.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the idle pool.');
idleConnections.push(conn);
}
};
export const shutdown = async () => {
logger.info('Shutting down all connections in the pool.');
const shutdownPromises = [];
for (const conn of allConnections) {
shutdownPromises.push(conn.quit());
}
await Promise.all(shutdownPromises);
allConnections.clear();
idleConnections.length = 0;
waiters.length = 0;
createdCount = 0;
};

60
src/workers/BodyWorker.js Normal file
View File

@ -0,0 +1,60 @@
import { Queue, Worker } from 'bullmq';
import log4js from '../lib/logger.js';
import { NntpPool } from '../lib/NntpPool.js';
import { decodeYenc } from '../lib/yenc.util.js';
const logger = log4js.getLogger('body');
export class BodyWorker {
constructor(headerQueue) {
this.headerQueue = headerQueue;
this.queue = new Queue('body-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.pool = new NntpPool();
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
this.worker.on('failed', (job, err) => {
logger.error(`Body job ${job.id} failed with error: ${err.message}`);
});
}
async process(job) {
const { header } = job.data;
logger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn;
try {
conn = await this.pool.acquire();
const bodyBuffer = (await conn.body(header['message-id'])).data;
const decodedBuffer = decodeYenc(bodyBuffer);
const firstLine = decodedBuffer.toString().split('\\n')[0];
const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/;
const match = firstLine.match(YENC_REGEX);
if (match) {
const part = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
const filename = match[3].trim();
const newSubject = `"${filename}" yEnc (${part}/${total})`;
header.subject = newSubject;
logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
await this.headerQueue.add('process-header', header);
} else {
logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
} catch (error) {
logger.error('Error in body worker:', error);
} finally {
if (conn) {
this.pool.release(conn);
}
}
}
}

View File

@ -1,23 +1,29 @@
import { Queue, Worker } from 'bullmq'; import { Queue, Worker } from 'bullmq';
import log4js from './logger.js'; import log4js from '../lib/logger.js';
import { getDb } from './database.js'; import { getDb } from '../lib/database.js';
import { acquire, release } from './nntp.pool.js'; import { NntpPool } from '../lib/NntpPool.js';
import { createExtractorFromData } from 'node-unrar-js'; import { createExtractorFromData } from 'node-unrar-js';
import { decodeYenc } from './yenc.util.js'; import { decodeYenc } from '../lib/yenc.util.js';
const logger = log4js.getLogger('collection'); const logger = log4js.getLogger('collection');
const connection = { export class CollectionWorker {
host: process.env.REDIS_HOST || 'localhost', constructor() {
port: process.env.REDIS_PORT || 6379, this.queue = new Queue('collection-queue', {
}; connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.pool = new NntpPool();
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
export const collectionQueue = new Queue('collection-queue', { connection }); this.worker.on('failed', (job, err) => {
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
});
}
const RAR_REGEX = /\.part0*1\.rar$/; async process(job) {
export const startCollectionWorker = () => {
const collectionWorker = new Worker('collection-queue', async job => {
const { fileId } = job.data; const { fileId } = job.data;
logger.debug(`Processing file ID ${fileId} for collection.`); logger.debug(`Processing file ID ${fileId} for collection.`);
@ -29,6 +35,7 @@ export const startCollectionWorker = () => {
return; return;
} }
const RAR_REGEX = /\.part0*1\.rar$/;
if (RAR_REGEX.test(file.filename)) { if (RAR_REGEX.test(file.filename)) {
logger.info(`File "${file.filename}" is the first part of a RAR set.`); logger.info(`File "${file.filename}" is the first part of a RAR set.`);
@ -42,7 +49,7 @@ export const startCollectionWorker = () => {
let conn; let conn;
try { try {
conn = await acquire(); conn = await this.pool.acquire();
await conn.group('alt.binaries.test'); await conn.group('alt.binaries.test');
const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data; const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data;
const decodedBuffer = decodeYenc(bodyBuffer); const decodedBuffer = decodeYenc(bodyBuffer);
@ -57,15 +64,11 @@ export const startCollectionWorker = () => {
} }
} finally { } finally {
if (conn) { if (conn) {
release(conn); this.pool.release(conn);
} }
} }
} else { } else {
logger.debug(`File "${file.filename}" is not the first part of a RAR set.`); logger.debug(`File "${file.filename}" is not the first part of a RAR set.`);
} }
}, { connection }); }
}
collectionWorker.on('failed', (job, err) => {
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -1,19 +1,26 @@
import { Queue, Worker } from 'bullmq'; import { Queue, Worker } from 'bullmq';
import log4js from './logger.js'; import log4js from '../lib/logger.js';
import { getDb } from './database.js'; import { getDb } from '../lib/database.js';
import { collectionQueue } from './collection.worker.js';
const logger = log4js.getLogger('file'); const logger = log4js.getLogger('file');
const connection = { export class FileWorker {
host: process.env.REDIS_HOST || 'localhost', constructor(collectionQueue) {
port: process.env.REDIS_PORT || 6379, this.collectionQueue = collectionQueue;
}; this.queue = new Queue('file-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
export const fileQueue = new Queue('file-queue', { connection }); this.worker.on('failed', (job, err) => {
logger.error(`File job ${job.id} failed with error: ${err.message}`);
});
}
export const startFileWorker = () => { async process(job) {
const fileWorker = new Worker('file-queue', async job => {
const { filename, parts } = job.data; const { filename, parts } = job.data;
const partCount = Object.keys(parts).length; const partCount = Object.keys(parts).length;
logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`); logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`);
@ -53,11 +60,7 @@ export const startFileWorker = () => {
const fileId = result.lastID; const fileId = result.lastID;
logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`); logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`);
await collectionQueue.add('process-collection', { fileId }); await this.collectionQueue.add('process-collection', { fileId });
logger.debug(`Added file ID ${fileId} to collection queue.`); logger.debug(`Added file ID ${fileId} to collection queue.`);
}, { connection }); }
}
fileWorker.on('failed', (job, err) => {
logger.error(`File job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -0,0 +1,54 @@
import { Queue, Worker } from 'bullmq';
import Redis from 'ioredis';
import log4js from '../lib/logger.js';
const logger = log4js.getLogger('header');
export class HeaderWorker {
constructor(fileQueue, bodyQueue) {
this.fileQueue = fileQueue;
this.bodyQueue = bodyQueue;
this.queue = new Queue('header-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.redis = new Redis(this.queue.opts.connection);
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
this.worker.on('failed', (job, err) => {
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
});
}
async process(job) {
const header = job.data;
const subject = header.subject;
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
const match = subject.match(SUBJECT_REGEX);
if (match) {
const filename = match[1];
const part = parseInt(match[2], 10);
const total = parseInt(match[3], 10);
const fileKey = `file:${filename}`;
await this.redis.hset(fileKey, part, JSON.stringify(header));
const partCount = await this.redis.hlen(fileKey);
if (partCount === total) {
const fileParts = await this.redis.hgetall(fileKey);
await this.fileQueue.add('process-file', { filename, parts: fileParts });
await this.redis.del(fileKey);
logger.info(`File "${filename}" is complete and moved to file-queue.`);
} else {
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
}
} else {
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
await this.bodyQueue.add('process-body', { header });
}
}
}

View File

@ -1,91 +0,0 @@
import yencode from 'yencode';
/**
* Parses yEnc metadata from a header or footer line.
* @param {string} line The line to parse.
* @returns {object} A key-value map of the metadata.
*/
function parseMetaLine(line) {
const meta = {};
const parts = line.split(' ');
for (const part of parts) {
const eqIndex = part.indexOf('=');
if (eqIndex !== -1) {
meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1);
}
}
return meta;
}
/**
* Extracts metadata from a yEnc-encoded buffer.
* @param {Buffer} encodedBuffer The yEnc-encoded buffer.
* @returns {object} An object containing all metadata from the header and footer.
*/
export function parseYencMeta(encodedBuffer) {
const headerBegin = Buffer.from('=ybegin');
const headerPart = Buffer.from('=ypart');
const footer = Buffer.from('\n=yend');
const headerStartIndex = encodedBuffer.indexOf(headerBegin);
const partStartIndex = encodedBuffer.indexOf(headerPart);
const footerStartIndex = encodedBuffer.lastIndexOf(footer);
if ((headerStartIndex === -1 && partStartIndex === -1) || footerStartIndex === -1) {
throw new Error('Invalid yEnc data: missing header or footer.');
}
const headerIndex = headerStartIndex !== -1 ? headerStartIndex : partStartIndex;
const headerEndIndex = encodedBuffer.indexOf(Buffer.from('\\n'), headerIndex);
const headerLine = encodedBuffer.subarray(headerIndex, headerEndIndex).toString();
const footerEndIndex = encodedBuffer.indexOf(Buffer.from('\\n'), footerStartIndex);
const footerLine = encodedBuffer.subarray(footerStartIndex, footerEndIndex).toString();
const meta = {
...parseMetaLine(headerLine),
...parseMetaLine(footerLine),
};
// Convert numeric values
for (const key in meta) {
if (!isNaN(meta[key])) {
meta[key] = parseInt(meta[key], 10);
}
}
return meta;
}
/**
* Decodes a yEnc-encoded buffer and optionally writes it to a target buffer.
* @param {Buffer} encodedBuffer The yEnc-encoded buffer.
* @param {Buffer} [targetBuffer] An optional buffer to write the decoded data into.
* @returns {Buffer} The decoded data (or the target buffer if provided).
*/
export function decodeYenc(encodedBuffer, targetBuffer) {
const meta = parseYencMeta(encodedBuffer);
const header = meta.part ? Buffer.from(`=ypart`) : Buffer.from(`=ybegin`);
const footer = Buffer.from(`\n=yend`);
const contentStartIndex = encodedBuffer.indexOf('\n', encodedBuffer.indexOf(header)) + 1;
const contentEndIndex = encodedBuffer.lastIndexOf(footer);
const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex);
const decoded = yencode.decode(dataToDecode);
if (decoded.length !== meta.size) {
throw new Error(`Decoded size (${decoded.length}) does not match expected size (${meta.size}).`);
}
if (targetBuffer) {
if (meta.begin === undefined) {
throw new Error('Cannot write to target buffer: missing "begin" offset in yEnc metadata.');
}
decoded.copy(targetBuffer, meta.begin - 1);
return targetBuffer;
}
return decoded;
}