Compare commits

...

5 Commits

Author SHA1 Message Date
Daan Meijer
dfd3336447 download seems to work, par2 verification worked at least once 2026-06-01 00:36:34 +02:00
Daan Meijer
ba6cb101a6 crc checks work 2026-05-31 22:48:00 +02:00
Daan Meijer
fc947953e4 application restructuring, added crc check when decoding yenc files 2026-05-31 22:07:14 +02:00
Daan Meijer
efbc1237d6 downloader can handle concurrent connections 2026-05-31 21:23:18 +02:00
Daan Meijer
b5647521fd yenc functions decode 1 part files correctly 2026-05-31 21:12:05 +02:00
21 changed files with 586 additions and 350 deletions

57
package-lock.json generated
View File

@ -10,11 +10,13 @@
"license": "ISC",
"dependencies": {
"bullmq": "^5.77.3",
"crc": "^4.3.2",
"dotenv": "^16.3.1",
"ioredis": "^5.3.2",
"log4js": "^6.9.1",
"nntp-js": "^1.0.4",
"node-unrar-js": "^2.0.0",
"simple-yenc": "^1.0.4",
"sqlite": "^5.1.1",
"sqlite3": "^6.0.1",
"xmlbuilder2": "^3.1.1",
@ -224,7 +226,7 @@
"readable-stream": "^3.4.0"
}
},
"node_modules/buffer": {
"node_modules/bl/node_modules/buffer": {
"version": "5.7.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
@ -248,6 +250,32 @@
"ieee754": "^1.1.13"
}
},
"node_modules/buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
},
"node_modules/bullmq": {
"version": "5.77.3",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz",
@ -303,6 +331,23 @@
"node": ">=0.10.0"
}
},
"node_modules/crc": {
"version": "4.3.2",
"resolved": "https://registry.npmjs.org/crc/-/crc-4.3.2.tgz",
"integrity": "sha512-uGDHf4KLLh2zsHa8D8hIQ1H/HtFQhyHrc0uhHBcoKGol/Xnb+MPYfUMw7cvON6ze/GUESTudKayDcJC5HnJv1A==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"peerDependencies": {
"buffer": ">=6.0.3"
},
"peerDependenciesMeta": {
"buffer": {
"optional": true
}
}
},
"node_modules/cron-parser": {
"version": "4.9.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz",
@ -1019,6 +1064,16 @@
"simple-concat": "^1.0.0"
}
},
"node_modules/simple-yenc": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/simple-yenc/-/simple-yenc-1.0.4.tgz",
"integrity": "sha512-5gvxpSd79e9a3V4QDYUqnqxeD4HGlhCakVpb6gMnDD7lexJggSBJRBO5h52y/iJrdXRilX9UCuDaIJhSWm5OWw==",
"license": "MIT",
"funding": {
"type": "individual",
"url": "https://github.com/sponsors/eshaz"
}
},
"node_modules/sprintf-js": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz",

View File

@ -2,11 +2,13 @@
"name": "usenet-indexer",
"version": "1.0.0",
"description": "",
"main": "index.js",
"main": "src/bin/start.js",
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node src/index.js"
"start": "node src/bin/start.js",
"download": "node src/bin/download.js",
"nzb": "node src/bin/nzb.js"
},
"keywords": [],
"author": "",
@ -18,9 +20,11 @@
"log4js": "^6.9.1",
"nntp-js": "^1.0.4",
"node-unrar-js": "^2.0.0",
"simple-yenc": "^1.0.4",
"sqlite": "^5.1.1",
"sqlite3": "^6.0.1",
"xmlbuilder2": "^3.1.1",
"yencode": "^1.0.1"
"yencode": "^1.0.1",
"crc": "^4.3.2"
}
}

57
src/Application.js Normal file
View File

@ -0,0 +1,57 @@
import 'dotenv/config';
import { NntpPool } from './lib/NntpPool.js';
import { HeaderWorker } from './workers/HeaderWorker.js';
import { FileWorker } from './workers/FileWorker.js';
import { CollectionWorker } from './workers/CollectionWorker.js';
import { BodyWorker } from './workers/BodyWorker.js';
import log4js from './lib/logger.js';
const logger = log4js.getLogger();
export class Application {
constructor() {
this.pool = new NntpPool();
const collectionWorker = new CollectionWorker();
const fileWorker = new FileWorker(collectionWorker.queue);
const bodyWorker = new BodyWorker(null); // HeaderWorker is not yet initialized
this.headerWorker = new HeaderWorker(fileWorker.queue, bodyWorker.queue);
bodyWorker.headerQueue = this.headerWorker.queue; // Now we can set it
}
async run() {
let conn;
try {
conn = await this.pool.acquire();
logger.info('NNTP connection acquired from pool.');
logger.debug(`Server date: ${await conn.date()}`);
const group = await conn.group('alt.binaries.test');
logger.debug(`Group info: ${JSON.stringify(group)}`);
const overview = await conn.xover(group.first, group.last);
logger.info(`Fetched ${overview.overviews.length} headers.`);
for (const [id, header] of overview.overviews) {
await this.headerWorker.queue.add('process-header', header);
}
if (overview.overviews.length > 0) {
const lastId = overview.overviews[overview.overviews.length - 1][0];
logger.info(`Last header ID queued: ${lastId}`);
}
} catch (error) {
logger.error('Error in main execution:', error);
} finally {
if (conn) {
this.pool.release(conn);
}
}
}
async shutdown() {
logger.info('Gracefully shutting down...');
await this.pool.shutdown();
process.exit(0);
}
}

84
src/bin/download.js Normal file
View File

@ -0,0 +1,84 @@
import 'dotenv/config';
import { getDb } from '../lib/database.js';
import { NntpPool } from '../lib/NntpPool.js';
import { YencFile } from '../lib/YencFile.js';
import fs from 'fs/promises';
import log4js from '../lib/logger.js';
const logger = log4js.getLogger('download');
async function downloadAndProcessPart(pool, partNumber, segment, yencFile) {
let conn;
try {
conn = await pool.acquire();
await conn.group('alt.binaries.test');
logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
yencFile.processPart(bodyBuffer);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
} else {
throw error;
}
} finally {
if (conn) {
pool.release(conn);
}
}
}
async function downloadFile(fileId, numConnections) {
const pool = new NntpPool(numConnections);
const yencFile = new YencFile();
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
if (!file) {
logger.error(`File with ID ${fileId} not found.`);
await pool.shutdown();
return;
}
logger.info(`Downloading file: ${file.filename} with ${numConnections} connections.`);
const messageIds = JSON.parse(file.message_ids);
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
// Sequentially process the first part to initialize the YencFile
const [firstPartNumber, firstSegment] = sortedParts[0];
await downloadAndProcessPart(pool, firstPartNumber, firstSegment, yencFile);
// Concurrently process the rest of the parts
const remainingParts = sortedParts.slice(1);
const downloadPromises = remainingParts.map(([partNumber, segment]) =>
downloadAndProcessPart(pool, partNumber, segment, yencFile)
);
await Promise.all(downloadPromises);
const completeFile = yencFile.getBuffer();
if (completeFile) {
await fs.writeFile(file.filename, completeFile);
logger.info(`File "${file.filename}" downloaded successfully.`);
} else {
logger.error('Could not assemble the final file.');
}
await pool.shutdown();
}
const args = process.argv.slice(2);
const fileIdArg = args.find(arg => !arg.startsWith('--'));
const connectionsArg = args.find(arg => arg.startsWith('--connections='));
const fileId = fileIdArg ? parseInt(fileIdArg, 10) : null;
const numConnections = connectionsArg ? parseInt(connectionsArg.split('=')[1], 10) : 10;
if (!fileId || isNaN(fileId)) {
logger.error('Please provide a valid file ID as a command-line argument.');
process.exit(1);
}
downloadFile(fileId, numConnections);

View File

@ -1,8 +1,8 @@
import 'dotenv/config';
import { getDb } from './database.js';
import { getDb } from '../lib/database.js';
import { create } from 'xmlbuilder2';
import fs from 'fs/promises';
import log4js from './logger.js';
import log4js from '../lib/logger.js';
const logger = log4js.getLogger('nzb');

6
src/bin/start.js Normal file
View File

@ -0,0 +1,6 @@
import { Application } from '../Application.js';
const app = new Application();
app.run();
process.on('SIGINT', () => app.shutdown());

View File

@ -1,58 +0,0 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { headerQueue } from './header.worker.js';
import { acquire, release } from './nntp.pool.js';
const bodyLogger = log4js.getLogger('body');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
export const bodyQueue = new Queue('body-queue', { connection });
const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/;
export const startBodyWorker = () => {
const bodyWorker = new Worker('body-queue', async job => {
const { header } = job.data;
bodyLogger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn;
try {
conn = await acquire();
const bodyBuffer = (await conn.body(header['message-id'])).data;
const firstNewlineIndex = bodyBuffer.indexOf('\\n');
const firstLineBuffer = (firstNewlineIndex !== -1) ? bodyBuffer.slice(0, firstNewlineIndex) : bodyBuffer;
const firstLine = firstLineBuffer.toString();
const match = firstLine.match(YENC_REGEX);
if (match) {
const part = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
const filename = match[3].trim();
const newSubject = `"${filename}" yEnc (${part}/${total})`;
header.subject = newSubject;
bodyLogger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
await headerQueue.add('process-header', header);
} else {
bodyLogger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
} catch (error) {
bodyLogger.error('Error in body worker:', error);
} finally {
if (conn) {
release(conn);
}
}
}, { connection });
bodyWorker.on('failed', (job, err) => {
bodyLogger.error(`Body job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -1,72 +0,0 @@
import 'dotenv/config';
import { getDb } from './database.js';
import { acquire, release, shutdown } from './nntp.pool.js';
import * as yEnc from 'simple-yenc';
import fs from 'fs/promises';
import log4js from './logger.js';
const logger = log4js.getLogger('download');
async function downloadFile(fileId) {
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
if (!file) {
logger.error(`File with ID ${fileId} not found.`);
return;
}
logger.info(`Downloading file: ${file.filename}`);
const messageIds = JSON.parse(file.message_ids);
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
const parts = [];
let conn;
try {
conn = await acquire();
await conn.group('alt.binaries.test');
for (const [partNumber, segment] of sortedParts) {
if (!segment || !segment.id) {
logger.error(`Message ID for part ${partNumber} not found.`);
continue;
}
try {
logger.debug(`Downloading part ${partNumber}/${file.parts} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
const buffer = Buffer.from(decodedUint8Array);
parts.push(buffer);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
} else {
throw error;
}
}
}
} catch (error) {
logger.error('Error downloading file parts:', error);
} finally {
if (conn) {
release(conn);
}
}
if (parts.length === file.parts) {
const completeFile = Buffer.concat(parts);
await fs.writeFile(file.filename, completeFile);
logger.info(`File "${file.filename}" downloaded successfully.`);
} else {
logger.error('Could not download all parts of the file.');
}
}
const fileId = parseInt(process.argv[2], 10);
if (isNaN(fileId)) {
logger.error('Please provide a valid file ID as a command-line argument.');
process.exit(1);
}
downloadFile(fileId).finally(() => shutdown());

View File

@ -1,54 +0,0 @@
import { Queue, Worker } from 'bullmq';
import Redis from 'ioredis';
import log4js from './logger.js';
import { fileQueue } from './file.worker.js';
import { bodyQueue } from './body.worker.js';
const logger = log4js.getLogger('header');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
const redis = new Redis(connection);
export const headerQueue = new Queue('header-queue', { connection });
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
export const startHeaderWorker = () => {
const headerWorker = new Worker('header-queue', async job => {
const header = job.data;
const subject = header.subject;
const match = subject.match(SUBJECT_REGEX);
if (match) {
const filename = match[1];
const part = parseInt(match[2], 10);
const total = parseInt(match[3], 10);
const fileKey = `file:${filename}`;
await redis.hset(fileKey, part, JSON.stringify(header));
const partCount = await redis.hlen(fileKey);
if (partCount === total) {
const fileParts = await redis.hgetall(fileKey);
await fileQueue.add('process-file', { filename, parts: fileParts });
await redis.del(fileKey);
logger.info(`File "${filename}" is complete and moved to file-queue.`, fileParts);
} else {
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
}
} else {
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
await bodyQueue.add('process-body', { header });
}
}, { connection });
headerWorker.on('failed', (job, err) => {
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
});
};

View File

@ -1,53 +0,0 @@
import 'dotenv/config';
import { headerQueue, startHeaderWorker } from './header.worker.js';
import { startFileWorker } from './file.worker.js';
import { startCollectionWorker } from './collection.worker.js';
import { startBodyWorker } from './body.worker.js';
import log4js from './logger.js';
import { acquire, release, shutdown } from './nntp.pool.js';
const logger = log4js.getLogger();
async function main() {
let conn;
try {
conn = await acquire();
logger.info('NNTP connection acquired from pool.');
logger.debug(`Server date: ${await conn.date()}`);
const group = await conn.group('alt.binaries.test');
logger.debug(`Group info: ${JSON.stringify(group)}`);
const overview = await conn.xover(group.first, group.group);
logger.info(`Fetched ${overview.overviews.length} headers.`);
for (const [id, header] of overview.overviews) {
await headerQueue.add('process-header', header);
}
if (overview.overviews.length > 0) {
const lastId = overview.overviews[overview.overviews.length - 1][0];
logger.info(`Last header ID queued: ${lastId}`);
}
} catch (error) {
logger.error('Error in main execution:', error);
} finally {
if (conn) {
release(conn);
logger.info('NNTP connection released back to the pool.');
}
}
}
startHeaderWorker();
startFileWorker();
startCollectionWorker();
startBodyWorker();
main();
process.on('SIGINT', async () => {
logger.info('Gracefully shutting down...');
await shutdown();
process.exit(0);
});

70
src/lib/NntpPool.js Normal file
View File

@ -0,0 +1,70 @@
import { NNTP } from 'nntp-js';
import log4js from './logger.js';
const logger = log4js.getLogger('pool');
export class NntpPool {
constructor(poolSize = 10) {
this.poolSize = poolSize;
this.allConnections = new Set();
this.idleConnections = [];
this.waiters = [];
this.createdCount = 0;
}
async _createConnection() {
const config = {
host: process.env.NNTP_HOST,
user: process.env.NNTP_USER,
password: process.env.NNTP_PASS,
port: 443,
secure: true,
};
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
this.allConnections.add(conn);
return conn;
}
async acquire() {
if (this.idleConnections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return this.idleConnections.pop();
}
if (this.createdCount < this.poolSize) {
this.createdCount++;
logger.info(`Creating new connection (${this.createdCount}/${this.poolSize}).`);
return this._createConnection();
}
logger.info(`Pool maxed out at ${this.poolSize}. Waiting for a connection to become available.`);
return new Promise(resolve => this.waiters.push(resolve));
}
release(conn) {
if (this.waiters.length > 0) {
logger.debug('Releasing connection directly to a waiting task.');
const resolve = this.waiters.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the idle pool.');
this.idleConnections.push(conn);
}
}
async shutdown() {
logger.info('Shutting down all connections in the pool.');
const shutdownPromises = [];
for (const conn of this.allConnections) {
shutdownPromises.push(conn.quit());
}
await Promise.all(shutdownPromises);
this.allConnections.clear();
this.idleConnections.length = 0;
this.waiters.length = 0;
this.createdCount = 0;
}
}

67
src/lib/YencFile.js Normal file
View File

@ -0,0 +1,67 @@
import yencode from 'yencode';
import { crc32 } from 'crc';
import log4js from './logger.js';
import { parseYencMeta } from './yenc.util.js';
const logger = log4js.getLogger('yenc');
export class YencFile {
constructor() {
this.targetBuffer = null;
this.totalSize = 0;
}
processPart(encodedBuffer) {
const meta = parseYencMeta(encodedBuffer);
// Initialize buffer on the first part that has total size info
if (!this.targetBuffer && meta.header?.size) {
this.totalSize = parseInt(meta.header.size, 10);
if (!this.totalSize) {
throw new Error('Could not determine total file size from yEnc metadata.');
}
this.targetBuffer = Buffer.alloc(this.totalSize);
logger.info(`Allocated buffer of size ${this.totalSize} for file.`);
}
if (!this.targetBuffer) {
throw new Error('Cannot process yEnc part: target buffer not initialized. The first part must contain total file size.');
}
const headerPartMarker = Buffer.from('=ypart');
const headerBeginMarker = Buffer.from('=ybegin');
const footerMarker = Buffer.from('\r\n=yend');
// The content starts after the LAST header line.
const partHeaderIndex = encodedBuffer.indexOf(headerPartMarker);
const beginHeaderIndex = encodedBuffer.indexOf(headerBeginMarker);
const contentHeaderIndex = partHeaderIndex !== -1 ? partHeaderIndex : beginHeaderIndex;
const contentStartIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex) + 2;
const contentEndIndex = encodedBuffer.lastIndexOf(footerMarker);
const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex);
const decoded = yencode.decode(dataToDecode);
const expectedSize = parseInt(meta.footer?.size, 10);
if (decoded.length !== expectedSize) {
throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`);
}
logger.debug('Part size check passed.');
const calculatedCrc = crc32(decoded);
const expectedCrc = parseInt(meta.footer?.pcrc32, 16);
if (calculatedCrc !== expectedCrc) {
throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`);
}
logger.debug('CRC32 check passed.');
const offset = parseInt(meta.part.begin, 10) - 1;
decoded.copy(this.targetBuffer, offset);
logger.info(`Processed part ${meta.header.part}/${meta.header.total || 'N/A'} and wrote to buffer at offset ${offset}.`);
}
getBuffer() {
return this.targetBuffer;
}
}

View File

@ -20,6 +20,7 @@ log4js.configure({
collection: { appenders: ['console', 'file'], level: 'info' },
body: { appenders: ['console', 'file'], level: 'info' },
pool: { appenders: ['console', 'file'], level: 'info' },
yenc: { appenders: ['console', 'file'], level: 'info' },
},
});

55
src/lib/yenc.util.js Normal file
View File

@ -0,0 +1,55 @@
/**
* Parses a single line of yEnc metadata.
* @param {string} line The metadata line.
* @returns {object} A key-value map of the metadata.
*/
function parseMetaLine(line) {
if (!line) return {};
const meta = {};
line.split(' ').forEach(part => {
const eqIndex = part.indexOf('=');
if (eqIndex !== -1) {
meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1);
}
});
return meta;
}
/**
* Finds a line in a buffer that starts with a specific marker.
* @param {Buffer} buffer The buffer to search.
* @param {string} marker The marker to find (e.g., '=ybegin').
* @returns {string|null} The found line, or null.
*/
function findLine(buffer, marker) {
const markerBuffer = Buffer.from(marker);
const index = buffer.indexOf(markerBuffer);
if (index === -1) return null;
const lineEndIndex = buffer.indexOf(Buffer.from('\r\n'), index);
return buffer.subarray(index, (lineEndIndex !== -1) ? lineEndIndex : buffer.length).toString();
}
/**
* Extracts and merges metadata from all yEnc headers and footers.
* @param {Buffer} encodedBuffer The yEnc-encoded buffer.
* @returns {object} A single, merged object of all metadata.
*/
export function parseYencMeta(encodedBuffer) {
const beginLine = findLine(encodedBuffer, '=ybegin');
const partLine = findLine(encodedBuffer, '=ypart');
const endLine = findLine(encodedBuffer, '=yend');
if (!endLine || (!beginLine && !partLine)) {
throw new Error('Invalid yEnc data: missing required headers or footers.');
}
// Merge metadata, with more specific lines overwriting general ones.
const meta = {
header: parseMetaLine(beginLine),
part: parseMetaLine(partLine),
footer: parseMetaLine(endLine),
};
return meta;
}

View File

@ -1,59 +0,0 @@
import { NNTP } from 'nntp-js';
import log4js from './logger.js';
const logger = log4js.getLogger('pool');
const POOL_SIZE = 5;
const connections = [];
const queue = [];
const createConnection = async () => {
const config = {
host: process.env.NNTP_HOST,
user: process.env.NNTP_USER,
password: process.env.NNTP_PASS,
port: 443,
secure: true,
};
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
return conn;
};
export const acquire = async () => {
if (connections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return connections.pop();
}
if (connections.length + queue.length < POOL_SIZE) {
logger.info('Creating new connection.');
return createConnection();
}
logger.info('Waiting for a connection to become available.');
return new Promise(resolve => queue.push(resolve));
};
export const release = conn => {
if (queue.length > 0) {
logger.info('Releasing connection to a waiting consumer.');
const resolve = queue.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the pool.');
connections.push(conn);
}
};
export const shutdown = async () => {
logger.info('Shutting down all connections in the pool.');
const allConns = [...connections];
connections.length = 0; // Clear the pool
for (const conn of allConns) {
await conn.quit();
}
};

71
src/workers/BodyWorker.js Normal file
View File

@ -0,0 +1,71 @@
import { Queue, Worker } from 'bullmq';
import log4js from '../lib/logger.js';
import { NntpPool } from '../lib/NntpPool.js';
import { parseYencMeta } from '../lib/yenc.util.js';
import fs from 'fs/promises';
import path from 'path';
const logger = log4js.getLogger('body');
export class BodyWorker {
constructor(headerQueue) {
this.headerQueue = headerQueue;
this.queue = new Queue('body-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.pool = new NntpPool();
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
this.worker.on('failed', (job, err) => {
logger.error(`Body job ${job.id} failed with error: ${err.message}`);
});
}
async process(job) {
const { header } = job.data;
const messageId = header['message-id'];
logger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn;
try {
conn = await this.pool.acquire();
const bodyBuffer = (await conn.body(messageId)).data;
try {
const meta = parseYencMeta(bodyBuffer);
if (meta.header.name) {
const { name, part, total } = meta.header;
const newSubject = `"${name}" yEnc (${part}/${total})`;
header.subject = newSubject;
logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
await this.headerQueue.add('process-header', header);
} else {
logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
} catch (parseError) {
if (parseError.message.includes('Invalid yEnc data')) {
logger.error(`Failed to parse yEnc data for message ID ${messageId}. Dumping buffer for inspection.`);
const debugDir = path.join(process.cwd(), 'debug');
await fs.mkdir(debugDir, { recursive: true });
const timestamp = new Date().toISOString().replace(/:/g, '-');
const dumpFile = path.join(debugDir, `body-error-${timestamp}-${messageId.replace(/[<>]/g, '')}.bin`);
await fs.writeFile(dumpFile, bodyBuffer);
logger.error(`Problematic body buffer saved to: ${dumpFile}`);
}
// Re-throw the original parsing error to fail the job
throw parseError;
}
} catch (error) {
logger.error(`Error in body worker for message ID ${messageId}:`, error);
throw error; // Ensure the job fails if any other error occurs
} finally {
if (conn) {
this.pool.release(conn);
}
}
}
}

View File

@ -1,23 +1,29 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { getDb } from './database.js';
import { acquire, release } from './nntp.pool.js';
import log4js from '../lib/logger.js';
import { getDb } from '../lib/database.js';
import { NntpPool } from '../lib/NntpPool.js';
import { createExtractorFromData } from 'node-unrar-js';
import * as yEnc from 'simple-yenc';
import { YencFile } from '../lib/YencFile.js';
const logger = log4js.getLogger('collection');
const connection = {
export class CollectionWorker {
constructor() {
this.queue = new Queue('collection-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
},
});
this.pool = new NntpPool();
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
export const collectionQueue = new Queue('collection-queue', { connection });
this.worker.on('failed', (job, err) => {
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
});
}
const RAR_REGEX = /\.part0*1\.rar$/;
export const startCollectionWorker = () => {
const collectionWorker = new Worker('collection-queue', async job => {
async process(job) {
const { fileId } = job.data;
logger.debug(`Processing file ID ${fileId} for collection.`);
@ -29,6 +35,7 @@ export const startCollectionWorker = () => {
return;
}
const RAR_REGEX = /\.part0*1\.rar$/;
if (RAR_REGEX.test(file.filename)) {
logger.info(`File "${file.filename}" is the first part of a RAR set.`);
@ -42,12 +49,15 @@ export const startCollectionWorker = () => {
let conn;
try {
conn = await acquire();
conn = await this.pool.acquire();
await conn.group('alt.binaries.test');
const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data;
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
const buffer = Buffer.from(decodedUint8Array);
const extractor = await createExtractorFromData({ data: buffer });
const yencFile = new YencFile();
yencFile.processPart(bodyBuffer);
const decodedBuffer = yencFile.getBuffer();
const extractor = await createExtractorFromData({ data: decodedBuffer });
const fileList = extractor.getFileList();
logger.info(`Files in "${file.filename}":`, fileList);
} catch (error) {
@ -58,15 +68,11 @@ export const startCollectionWorker = () => {
}
} finally {
if (conn) {
release(conn);
this.pool.release(conn);
}
}
} else {
logger.debug(`File "${file.filename}" is not the first part of a RAR set.`);
}
}, { connection });
collectionWorker.on('failed', (job, err) => {
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
});
};
}
}

View File

@ -1,19 +1,26 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { getDb } from './database.js';
import { collectionQueue } from './collection.worker.js';
import log4js from '../lib/logger.js';
import { getDb } from '../lib/database.js';
const logger = log4js.getLogger('file');
const connection = {
export class FileWorker {
constructor(collectionQueue) {
this.collectionQueue = collectionQueue;
this.queue = new Queue('file-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
},
});
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
export const fileQueue = new Queue('file-queue', { connection });
this.worker.on('failed', (job, err) => {
logger.error(`File job ${job.id} failed with error: ${err.message}`);
});
}
export const startFileWorker = () => {
const fileWorker = new Worker('file-queue', async job => {
async process(job) {
const { filename, parts } = job.data;
const partCount = Object.keys(parts).length;
logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`);
@ -53,11 +60,7 @@ export const startFileWorker = () => {
const fileId = result.lastID;
logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`);
await collectionQueue.add('process-collection', { fileId });
await this.collectionQueue.add('process-collection', { fileId });
logger.debug(`Added file ID ${fileId} to collection queue.`);
}, { connection });
fileWorker.on('failed', (job, err) => {
logger.error(`File job ${job.id} failed with error: ${err.message}`);
});
};
}
}

View File

@ -0,0 +1,54 @@
import { Queue, Worker } from 'bullmq';
import Redis from 'ioredis';
import log4js from '../lib/logger.js';
const logger = log4js.getLogger('header');
export class HeaderWorker {
constructor(fileQueue, bodyQueue) {
this.fileQueue = fileQueue;
this.bodyQueue = bodyQueue;
this.queue = new Queue('header-queue', {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
},
});
this.redis = new Redis(this.queue.opts.connection);
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
this.worker.on('failed', (job, err) => {
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
});
}
async process(job) {
const header = job.data;
const subject = header.subject;
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
const match = subject.match(SUBJECT_REGEX);
if (match) {
const filename = match[1];
const part = parseInt(match[2], 10);
const total = parseInt(match[3], 10);
const fileKey = `file:${filename}`;
await this.redis.hset(fileKey, part, JSON.stringify(header));
const partCount = await this.redis.hlen(fileKey);
if (partCount === total) {
const fileParts = await this.redis.hgetall(fileKey);
await this.fileQueue.add('process-file', { filename, parts: fileParts });
await this.redis.del(fileKey);
logger.info(`File "${filename}" is complete and moved to file-queue.`);
} else {
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
}
} else {
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
await this.bodyQueue.add('process-body', { header });
}
}
}

View File

@ -1,13 +1,12 @@
import yencode from 'yencode';
import { decodeYenc } from './yenc.util.js';
import fs from 'fs/promises';
import { Buffer } from 'buffer';
async function runTest() {
const encodedData = await fs.readFile('HjVfQlWmHdUrQeQkRiLkTwEj-1779830864932@nyuu.bin');
const correctlyDecodedData = await fs.readFile('Dragon.Ball.S01E119.MULTI.BDRip.REMASTERED.1080p.x264.DTS-LILAS.par2');
const encodedData = await fs.readFile('files/HjVfQlWmHdUrQeQkRiLkTwEj-1779830864932@nyuu.bin');
const correctlyDecodedData = await fs.readFile('files/Dragon.Ball.S01E119.MULTI.BDRip.REMASTERED.1080p.x264.DTS-LILAS.par2-good');
const decodedBuffer = yencode.decode(encodedData);
await fs.writeFile('decoded.bin', decodedBuffer)
const decodedBuffer = decodeYenc(encodedData);
if (Buffer.compare(decodedBuffer, correctlyDecodedData) === 0) {
console.log('Test passed: Decoded data matches the correctly decoded file.');
@ -15,7 +14,7 @@ async function runTest() {
console.error('Test failed: Decoded data does not match the correctly decoded file.');
console.error('Decoded buffer length:', decodedBuffer.length);
console.error('Correct buffer length:', correctlyDecodedData.length);
await fs.writeFile('test-decoded-output.bin', decodedBuffer);
await fs.writeFile('files/test-decoded-output.bin', decodedBuffer);
}
}