Compare commits
No commits in common. "dfd3336447f85790ea583cb5c7046abfafede993" and "d5b359191c42e28b10269541995b58a66ec03ab1" have entirely different histories.
dfd3336447
...
d5b359191c
57
package-lock.json
generated
57
package-lock.json
generated
@ -10,13 +10,11 @@
|
|||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"bullmq": "^5.77.3",
|
"bullmq": "^5.77.3",
|
||||||
"crc": "^4.3.2",
|
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"ioredis": "^5.3.2",
|
"ioredis": "^5.3.2",
|
||||||
"log4js": "^6.9.1",
|
"log4js": "^6.9.1",
|
||||||
"nntp-js": "^1.0.4",
|
"nntp-js": "^1.0.4",
|
||||||
"node-unrar-js": "^2.0.0",
|
"node-unrar-js": "^2.0.0",
|
||||||
"simple-yenc": "^1.0.4",
|
|
||||||
"sqlite": "^5.1.1",
|
"sqlite": "^5.1.1",
|
||||||
"sqlite3": "^6.0.1",
|
"sqlite3": "^6.0.1",
|
||||||
"xmlbuilder2": "^3.1.1",
|
"xmlbuilder2": "^3.1.1",
|
||||||
@ -226,7 +224,7 @@
|
|||||||
"readable-stream": "^3.4.0"
|
"readable-stream": "^3.4.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/bl/node_modules/buffer": {
|
"node_modules/buffer": {
|
||||||
"version": "5.7.1",
|
"version": "5.7.1",
|
||||||
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
|
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
|
||||||
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
|
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
|
||||||
@ -250,32 +248,6 @@
|
|||||||
"ieee754": "^1.1.13"
|
"ieee754": "^1.1.13"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/buffer": {
|
|
||||||
"version": "6.0.3",
|
|
||||||
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
|
|
||||||
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
|
|
||||||
"funding": [
|
|
||||||
{
|
|
||||||
"type": "github",
|
|
||||||
"url": "https://github.com/sponsors/feross"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "patreon",
|
|
||||||
"url": "https://www.patreon.com/feross"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "consulting",
|
|
||||||
"url": "https://feross.org/support"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"license": "MIT",
|
|
||||||
"optional": true,
|
|
||||||
"peer": true,
|
|
||||||
"dependencies": {
|
|
||||||
"base64-js": "^1.3.1",
|
|
||||||
"ieee754": "^1.2.1"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"node_modules/bullmq": {
|
"node_modules/bullmq": {
|
||||||
"version": "5.77.3",
|
"version": "5.77.3",
|
||||||
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz",
|
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.77.3.tgz",
|
||||||
@ -331,23 +303,6 @@
|
|||||||
"node": ">=0.10.0"
|
"node": ">=0.10.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/crc": {
|
|
||||||
"version": "4.3.2",
|
|
||||||
"resolved": "https://registry.npmjs.org/crc/-/crc-4.3.2.tgz",
|
|
||||||
"integrity": "sha512-uGDHf4KLLh2zsHa8D8hIQ1H/HtFQhyHrc0uhHBcoKGol/Xnb+MPYfUMw7cvON6ze/GUESTudKayDcJC5HnJv1A==",
|
|
||||||
"license": "MIT",
|
|
||||||
"engines": {
|
|
||||||
"node": ">=12"
|
|
||||||
},
|
|
||||||
"peerDependencies": {
|
|
||||||
"buffer": ">=6.0.3"
|
|
||||||
},
|
|
||||||
"peerDependenciesMeta": {
|
|
||||||
"buffer": {
|
|
||||||
"optional": true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"node_modules/cron-parser": {
|
"node_modules/cron-parser": {
|
||||||
"version": "4.9.0",
|
"version": "4.9.0",
|
||||||
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz",
|
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz",
|
||||||
@ -1064,16 +1019,6 @@
|
|||||||
"simple-concat": "^1.0.0"
|
"simple-concat": "^1.0.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/simple-yenc": {
|
|
||||||
"version": "1.0.4",
|
|
||||||
"resolved": "https://registry.npmjs.org/simple-yenc/-/simple-yenc-1.0.4.tgz",
|
|
||||||
"integrity": "sha512-5gvxpSd79e9a3V4QDYUqnqxeD4HGlhCakVpb6gMnDD7lexJggSBJRBO5h52y/iJrdXRilX9UCuDaIJhSWm5OWw==",
|
|
||||||
"license": "MIT",
|
|
||||||
"funding": {
|
|
||||||
"type": "individual",
|
|
||||||
"url": "https://github.com/sponsors/eshaz"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"node_modules/sprintf-js": {
|
"node_modules/sprintf-js": {
|
||||||
"version": "1.0.3",
|
"version": "1.0.3",
|
||||||
"resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz",
|
"resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz",
|
||||||
|
|||||||
10
package.json
10
package.json
@ -2,13 +2,11 @@
|
|||||||
"name": "usenet-indexer",
|
"name": "usenet-indexer",
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"description": "",
|
"description": "",
|
||||||
"main": "src/bin/start.js",
|
"main": "index.js",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "echo \"Error: no test specified\" && exit 1",
|
"test": "echo \"Error: no test specified\" && exit 1",
|
||||||
"start": "node src/bin/start.js",
|
"start": "node src/index.js"
|
||||||
"download": "node src/bin/download.js",
|
|
||||||
"nzb": "node src/bin/nzb.js"
|
|
||||||
},
|
},
|
||||||
"keywords": [],
|
"keywords": [],
|
||||||
"author": "",
|
"author": "",
|
||||||
@ -20,11 +18,9 @@
|
|||||||
"log4js": "^6.9.1",
|
"log4js": "^6.9.1",
|
||||||
"nntp-js": "^1.0.4",
|
"nntp-js": "^1.0.4",
|
||||||
"node-unrar-js": "^2.0.0",
|
"node-unrar-js": "^2.0.0",
|
||||||
"simple-yenc": "^1.0.4",
|
|
||||||
"sqlite": "^5.1.1",
|
"sqlite": "^5.1.1",
|
||||||
"sqlite3": "^6.0.1",
|
"sqlite3": "^6.0.1",
|
||||||
"xmlbuilder2": "^3.1.1",
|
"xmlbuilder2": "^3.1.1",
|
||||||
"yencode": "^1.0.1",
|
"yencode": "^1.0.1"
|
||||||
"crc": "^4.3.2"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,57 +0,0 @@
|
|||||||
import 'dotenv/config';
|
|
||||||
import { NntpPool } from './lib/NntpPool.js';
|
|
||||||
import { HeaderWorker } from './workers/HeaderWorker.js';
|
|
||||||
import { FileWorker } from './workers/FileWorker.js';
|
|
||||||
import { CollectionWorker } from './workers/CollectionWorker.js';
|
|
||||||
import { BodyWorker } from './workers/BodyWorker.js';
|
|
||||||
import log4js from './lib/logger.js';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger();
|
|
||||||
|
|
||||||
export class Application {
|
|
||||||
constructor() {
|
|
||||||
this.pool = new NntpPool();
|
|
||||||
const collectionWorker = new CollectionWorker();
|
|
||||||
const fileWorker = new FileWorker(collectionWorker.queue);
|
|
||||||
const bodyWorker = new BodyWorker(null); // HeaderWorker is not yet initialized
|
|
||||||
this.headerWorker = new HeaderWorker(fileWorker.queue, bodyWorker.queue);
|
|
||||||
bodyWorker.headerQueue = this.headerWorker.queue; // Now we can set it
|
|
||||||
}
|
|
||||||
|
|
||||||
async run() {
|
|
||||||
let conn;
|
|
||||||
try {
|
|
||||||
conn = await this.pool.acquire();
|
|
||||||
logger.info('NNTP connection acquired from pool.');
|
|
||||||
|
|
||||||
logger.debug(`Server date: ${await conn.date()}`);
|
|
||||||
|
|
||||||
const group = await conn.group('alt.binaries.test');
|
|
||||||
logger.debug(`Group info: ${JSON.stringify(group)}`);
|
|
||||||
|
|
||||||
const overview = await conn.xover(group.first, group.last);
|
|
||||||
logger.info(`Fetched ${overview.overviews.length} headers.`);
|
|
||||||
|
|
||||||
for (const [id, header] of overview.overviews) {
|
|
||||||
await this.headerWorker.queue.add('process-header', header);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (overview.overviews.length > 0) {
|
|
||||||
const lastId = overview.overviews[overview.overviews.length - 1][0];
|
|
||||||
logger.info(`Last header ID queued: ${lastId}`);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Error in main execution:', error);
|
|
||||||
} finally {
|
|
||||||
if (conn) {
|
|
||||||
this.pool.release(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async shutdown() {
|
|
||||||
logger.info('Gracefully shutting down...');
|
|
||||||
await this.pool.shutdown();
|
|
||||||
process.exit(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,84 +0,0 @@
|
|||||||
import 'dotenv/config';
|
|
||||||
import { getDb } from '../lib/database.js';
|
|
||||||
import { NntpPool } from '../lib/NntpPool.js';
|
|
||||||
import { YencFile } from '../lib/YencFile.js';
|
|
||||||
import fs from 'fs/promises';
|
|
||||||
import log4js from '../lib/logger.js';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger('download');
|
|
||||||
|
|
||||||
async function downloadAndProcessPart(pool, partNumber, segment, yencFile) {
|
|
||||||
let conn;
|
|
||||||
try {
|
|
||||||
conn = await pool.acquire();
|
|
||||||
await conn.group('alt.binaries.test');
|
|
||||||
logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`);
|
|
||||||
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
|
|
||||||
yencFile.processPart(bodyBuffer);
|
|
||||||
} catch (error) {
|
|
||||||
if (error.code === 430) {
|
|
||||||
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
|
|
||||||
} else {
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (conn) {
|
|
||||||
pool.release(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function downloadFile(fileId, numConnections) {
|
|
||||||
const pool = new NntpPool(numConnections);
|
|
||||||
const yencFile = new YencFile();
|
|
||||||
|
|
||||||
const db = await getDb();
|
|
||||||
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
|
|
||||||
|
|
||||||
if (!file) {
|
|
||||||
logger.error(`File with ID ${fileId} not found.`);
|
|
||||||
await pool.shutdown();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`Downloading file: ${file.filename} with ${numConnections} connections.`);
|
|
||||||
|
|
||||||
const messageIds = JSON.parse(file.message_ids);
|
|
||||||
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
|
|
||||||
|
|
||||||
// Sequentially process the first part to initialize the YencFile
|
|
||||||
const [firstPartNumber, firstSegment] = sortedParts[0];
|
|
||||||
await downloadAndProcessPart(pool, firstPartNumber, firstSegment, yencFile);
|
|
||||||
|
|
||||||
// Concurrently process the rest of the parts
|
|
||||||
const remainingParts = sortedParts.slice(1);
|
|
||||||
const downloadPromises = remainingParts.map(([partNumber, segment]) =>
|
|
||||||
downloadAndProcessPart(pool, partNumber, segment, yencFile)
|
|
||||||
);
|
|
||||||
|
|
||||||
await Promise.all(downloadPromises);
|
|
||||||
|
|
||||||
const completeFile = yencFile.getBuffer();
|
|
||||||
if (completeFile) {
|
|
||||||
await fs.writeFile(file.filename, completeFile);
|
|
||||||
logger.info(`File "${file.filename}" downloaded successfully.`);
|
|
||||||
} else {
|
|
||||||
logger.error('Could not assemble the final file.');
|
|
||||||
}
|
|
||||||
|
|
||||||
await pool.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
const args = process.argv.slice(2);
|
|
||||||
const fileIdArg = args.find(arg => !arg.startsWith('--'));
|
|
||||||
const connectionsArg = args.find(arg => arg.startsWith('--connections='));
|
|
||||||
|
|
||||||
const fileId = fileIdArg ? parseInt(fileIdArg, 10) : null;
|
|
||||||
const numConnections = connectionsArg ? parseInt(connectionsArg.split('=')[1], 10) : 10;
|
|
||||||
|
|
||||||
if (!fileId || isNaN(fileId)) {
|
|
||||||
logger.error('Please provide a valid file ID as a command-line argument.');
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
downloadFile(fileId, numConnections);
|
|
||||||
@ -1,6 +0,0 @@
|
|||||||
import { Application } from '../Application.js';
|
|
||||||
|
|
||||||
const app = new Application();
|
|
||||||
app.run();
|
|
||||||
|
|
||||||
process.on('SIGINT', () => app.shutdown());
|
|
||||||
58
src/body.worker.js
Normal file
58
src/body.worker.js
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import { Queue, Worker } from 'bullmq';
|
||||||
|
import log4js from './logger.js';
|
||||||
|
import { headerQueue } from './header.worker.js';
|
||||||
|
import { acquire, release } from './nntp.pool.js';
|
||||||
|
|
||||||
|
const bodyLogger = log4js.getLogger('body');
|
||||||
|
|
||||||
|
const connection = {
|
||||||
|
host: process.env.REDIS_HOST || 'localhost',
|
||||||
|
port: process.env.REDIS_PORT || 6379,
|
||||||
|
};
|
||||||
|
|
||||||
|
export const bodyQueue = new Queue('body-queue', { connection });
|
||||||
|
|
||||||
|
const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/;
|
||||||
|
|
||||||
|
export const startBodyWorker = () => {
|
||||||
|
const bodyWorker = new Worker('body-queue', async job => {
|
||||||
|
const { header } = job.data;
|
||||||
|
bodyLogger.debug(`Processing header with unparsable subject: ${header.subject}`);
|
||||||
|
|
||||||
|
let conn;
|
||||||
|
try {
|
||||||
|
conn = await acquire();
|
||||||
|
const bodyBuffer = (await conn.body(header['message-id'])).data;
|
||||||
|
|
||||||
|
const firstNewlineIndex = bodyBuffer.indexOf('\\n');
|
||||||
|
const firstLineBuffer = (firstNewlineIndex !== -1) ? bodyBuffer.slice(0, firstNewlineIndex) : bodyBuffer;
|
||||||
|
const firstLine = firstLineBuffer.toString();
|
||||||
|
|
||||||
|
const match = firstLine.match(YENC_REGEX);
|
||||||
|
|
||||||
|
if (match) {
|
||||||
|
const part = parseInt(match[1], 10);
|
||||||
|
const total = parseInt(match[2], 10);
|
||||||
|
const filename = match[3].trim();
|
||||||
|
|
||||||
|
const newSubject = `"${filename}" yEnc (${part}/${total})`;
|
||||||
|
header.subject = newSubject;
|
||||||
|
|
||||||
|
bodyLogger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
|
||||||
|
await headerQueue.add('process-header', header);
|
||||||
|
} else {
|
||||||
|
bodyLogger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
bodyLogger.error('Error in body worker:', error);
|
||||||
|
} finally {
|
||||||
|
if (conn) {
|
||||||
|
release(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, { connection });
|
||||||
|
|
||||||
|
bodyWorker.on('failed', (job, err) => {
|
||||||
|
bodyLogger.error(`Body job ${job.id} failed with error: ${err.message}`);
|
||||||
|
});
|
||||||
|
};
|
||||||
@ -1,29 +1,23 @@
|
|||||||
import { Queue, Worker } from 'bullmq';
|
import { Queue, Worker } from 'bullmq';
|
||||||
import log4js from '../lib/logger.js';
|
import log4js from './logger.js';
|
||||||
import { getDb } from '../lib/database.js';
|
import { getDb } from './database.js';
|
||||||
import { NntpPool } from '../lib/NntpPool.js';
|
import { acquire, release } from './nntp.pool.js';
|
||||||
import { createExtractorFromData } from 'node-unrar-js';
|
import { createExtractorFromData } from 'node-unrar-js';
|
||||||
import { YencFile } from '../lib/YencFile.js';
|
import * as yEnc from 'simple-yenc';
|
||||||
|
|
||||||
const logger = log4js.getLogger('collection');
|
const logger = log4js.getLogger('collection');
|
||||||
|
|
||||||
export class CollectionWorker {
|
const connection = {
|
||||||
constructor() {
|
host: process.env.REDIS_HOST || 'localhost',
|
||||||
this.queue = new Queue('collection-queue', {
|
port: process.env.REDIS_PORT || 6379,
|
||||||
connection: {
|
};
|
||||||
host: process.env.REDIS_HOST || 'localhost',
|
|
||||||
port: process.env.REDIS_PORT || 6379,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
this.pool = new NntpPool();
|
|
||||||
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
|
|
||||||
|
|
||||||
this.worker.on('failed', (job, err) => {
|
export const collectionQueue = new Queue('collection-queue', { connection });
|
||||||
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async process(job) {
|
const RAR_REGEX = /\.part0*1\.rar$/;
|
||||||
|
|
||||||
|
export const startCollectionWorker = () => {
|
||||||
|
const collectionWorker = new Worker('collection-queue', async job => {
|
||||||
const { fileId } = job.data;
|
const { fileId } = job.data;
|
||||||
logger.debug(`Processing file ID ${fileId} for collection.`);
|
logger.debug(`Processing file ID ${fileId} for collection.`);
|
||||||
|
|
||||||
@ -35,7 +29,6 @@ export class CollectionWorker {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const RAR_REGEX = /\.part0*1\.rar$/;
|
|
||||||
if (RAR_REGEX.test(file.filename)) {
|
if (RAR_REGEX.test(file.filename)) {
|
||||||
logger.info(`File "${file.filename}" is the first part of a RAR set.`);
|
logger.info(`File "${file.filename}" is the first part of a RAR set.`);
|
||||||
|
|
||||||
@ -49,15 +42,12 @@ export class CollectionWorker {
|
|||||||
|
|
||||||
let conn;
|
let conn;
|
||||||
try {
|
try {
|
||||||
conn = await this.pool.acquire();
|
conn = await acquire();
|
||||||
await conn.group('alt.binaries.test');
|
await conn.group('alt.binaries.test');
|
||||||
const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data;
|
const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data;
|
||||||
|
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
|
||||||
const yencFile = new YencFile();
|
const buffer = Buffer.from(decodedUint8Array);
|
||||||
yencFile.processPart(bodyBuffer);
|
const extractor = await createExtractorFromData({ data: buffer });
|
||||||
const decodedBuffer = yencFile.getBuffer();
|
|
||||||
|
|
||||||
const extractor = await createExtractorFromData({ data: decodedBuffer });
|
|
||||||
const fileList = extractor.getFileList();
|
const fileList = extractor.getFileList();
|
||||||
logger.info(`Files in "${file.filename}":`, fileList);
|
logger.info(`Files in "${file.filename}":`, fileList);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@ -68,11 +58,15 @@ export class CollectionWorker {
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (conn) {
|
if (conn) {
|
||||||
this.pool.release(conn);
|
release(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug(`File "${file.filename}" is not the first part of a RAR set.`);
|
logger.debug(`File "${file.filename}" is not the first part of a RAR set.`);
|
||||||
}
|
}
|
||||||
}
|
}, { connection });
|
||||||
}
|
|
||||||
|
collectionWorker.on('failed', (job, err) => {
|
||||||
|
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
|
||||||
|
});
|
||||||
|
};
|
||||||
72
src/download.js
Normal file
72
src/download.js
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
import 'dotenv/config';
|
||||||
|
import { getDb } from './database.js';
|
||||||
|
import { acquire, release, shutdown } from './nntp.pool.js';
|
||||||
|
import * as yEnc from 'simple-yenc';
|
||||||
|
import fs from 'fs/promises';
|
||||||
|
import log4js from './logger.js';
|
||||||
|
|
||||||
|
const logger = log4js.getLogger('download');
|
||||||
|
|
||||||
|
async function downloadFile(fileId) {
|
||||||
|
const db = await getDb();
|
||||||
|
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
|
||||||
|
|
||||||
|
if (!file) {
|
||||||
|
logger.error(`File with ID ${fileId} not found.`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Downloading file: ${file.filename}`);
|
||||||
|
|
||||||
|
const messageIds = JSON.parse(file.message_ids);
|
||||||
|
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
|
||||||
|
|
||||||
|
const parts = [];
|
||||||
|
let conn;
|
||||||
|
|
||||||
|
try {
|
||||||
|
conn = await acquire();
|
||||||
|
await conn.group('alt.binaries.test');
|
||||||
|
for (const [partNumber, segment] of sortedParts) {
|
||||||
|
if (!segment || !segment.id) {
|
||||||
|
logger.error(`Message ID for part ${partNumber} not found.`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
logger.debug(`Downloading part ${partNumber}/${file.parts} with message ID: ${segment.id}`);
|
||||||
|
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
|
||||||
|
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
|
||||||
|
const buffer = Buffer.from(decodedUint8Array);
|
||||||
|
parts.push(buffer);
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === 430) {
|
||||||
|
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
|
||||||
|
} else {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Error downloading file parts:', error);
|
||||||
|
} finally {
|
||||||
|
if (conn) {
|
||||||
|
release(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parts.length === file.parts) {
|
||||||
|
const completeFile = Buffer.concat(parts);
|
||||||
|
await fs.writeFile(file.filename, completeFile);
|
||||||
|
logger.info(`File "${file.filename}" downloaded successfully.`);
|
||||||
|
} else {
|
||||||
|
logger.error('Could not download all parts of the file.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileId = parseInt(process.argv[2], 10);
|
||||||
|
if (isNaN(fileId)) {
|
||||||
|
logger.error('Please provide a valid file ID as a command-line argument.');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
downloadFile(fileId).finally(() => shutdown());
|
||||||
@ -1,26 +1,19 @@
|
|||||||
import { Queue, Worker } from 'bullmq';
|
import { Queue, Worker } from 'bullmq';
|
||||||
import log4js from '../lib/logger.js';
|
import log4js from './logger.js';
|
||||||
import { getDb } from '../lib/database.js';
|
import { getDb } from './database.js';
|
||||||
|
import { collectionQueue } from './collection.worker.js';
|
||||||
|
|
||||||
const logger = log4js.getLogger('file');
|
const logger = log4js.getLogger('file');
|
||||||
|
|
||||||
export class FileWorker {
|
const connection = {
|
||||||
constructor(collectionQueue) {
|
host: process.env.REDIS_HOST || 'localhost',
|
||||||
this.collectionQueue = collectionQueue;
|
port: process.env.REDIS_PORT || 6379,
|
||||||
this.queue = new Queue('file-queue', {
|
};
|
||||||
connection: {
|
|
||||||
host: process.env.REDIS_HOST || 'localhost',
|
|
||||||
port: process.env.REDIS_PORT || 6379,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
|
|
||||||
|
|
||||||
this.worker.on('failed', (job, err) => {
|
export const fileQueue = new Queue('file-queue', { connection });
|
||||||
logger.error(`File job ${job.id} failed with error: ${err.message}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async process(job) {
|
export const startFileWorker = () => {
|
||||||
|
const fileWorker = new Worker('file-queue', async job => {
|
||||||
const { filename, parts } = job.data;
|
const { filename, parts } = job.data;
|
||||||
const partCount = Object.keys(parts).length;
|
const partCount = Object.keys(parts).length;
|
||||||
logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`);
|
logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`);
|
||||||
@ -60,7 +53,11 @@ export class FileWorker {
|
|||||||
const fileId = result.lastID;
|
const fileId = result.lastID;
|
||||||
logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`);
|
logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`);
|
||||||
|
|
||||||
await this.collectionQueue.add('process-collection', { fileId });
|
await collectionQueue.add('process-collection', { fileId });
|
||||||
logger.debug(`Added file ID ${fileId} to collection queue.`);
|
logger.debug(`Added file ID ${fileId} to collection queue.`);
|
||||||
}
|
}, { connection });
|
||||||
}
|
|
||||||
|
fileWorker.on('failed', (job, err) => {
|
||||||
|
logger.error(`File job ${job.id} failed with error: ${err.message}`);
|
||||||
|
});
|
||||||
|
};
|
||||||
54
src/header.worker.js
Normal file
54
src/header.worker.js
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import { Queue, Worker } from 'bullmq';
|
||||||
|
import Redis from 'ioredis';
|
||||||
|
import log4js from './logger.js';
|
||||||
|
import { fileQueue } from './file.worker.js';
|
||||||
|
import { bodyQueue } from './body.worker.js';
|
||||||
|
|
||||||
|
const logger = log4js.getLogger('header');
|
||||||
|
|
||||||
|
const connection = {
|
||||||
|
host: process.env.REDIS_HOST || 'localhost',
|
||||||
|
port: process.env.REDIS_PORT || 6379,
|
||||||
|
};
|
||||||
|
|
||||||
|
const redis = new Redis(connection);
|
||||||
|
|
||||||
|
export const headerQueue = new Queue('header-queue', { connection });
|
||||||
|
|
||||||
|
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
|
||||||
|
|
||||||
|
export const startHeaderWorker = () => {
|
||||||
|
const headerWorker = new Worker('header-queue', async job => {
|
||||||
|
const header = job.data;
|
||||||
|
const subject = header.subject;
|
||||||
|
|
||||||
|
const match = subject.match(SUBJECT_REGEX);
|
||||||
|
|
||||||
|
if (match) {
|
||||||
|
const filename = match[1];
|
||||||
|
const part = parseInt(match[2], 10);
|
||||||
|
const total = parseInt(match[3], 10);
|
||||||
|
|
||||||
|
const fileKey = `file:${filename}`;
|
||||||
|
await redis.hset(fileKey, part, JSON.stringify(header));
|
||||||
|
|
||||||
|
const partCount = await redis.hlen(fileKey);
|
||||||
|
|
||||||
|
if (partCount === total) {
|
||||||
|
const fileParts = await redis.hgetall(fileKey);
|
||||||
|
await fileQueue.add('process-file', { filename, parts: fileParts });
|
||||||
|
await redis.del(fileKey);
|
||||||
|
logger.info(`File "${filename}" is complete and moved to file-queue.`, fileParts);
|
||||||
|
} else {
|
||||||
|
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
|
||||||
|
await bodyQueue.add('process-body', { header });
|
||||||
|
}
|
||||||
|
}, { connection });
|
||||||
|
|
||||||
|
headerWorker.on('failed', (job, err) => {
|
||||||
|
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
|
||||||
|
});
|
||||||
|
};
|
||||||
53
src/index.js
Normal file
53
src/index.js
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import 'dotenv/config';
|
||||||
|
import { headerQueue, startHeaderWorker } from './header.worker.js';
|
||||||
|
import { startFileWorker } from './file.worker.js';
|
||||||
|
import { startCollectionWorker } from './collection.worker.js';
|
||||||
|
import { startBodyWorker } from './body.worker.js';
|
||||||
|
import log4js from './logger.js';
|
||||||
|
import { acquire, release, shutdown } from './nntp.pool.js';
|
||||||
|
|
||||||
|
const logger = log4js.getLogger();
|
||||||
|
|
||||||
|
async function main() {
|
||||||
|
let conn;
|
||||||
|
try {
|
||||||
|
conn = await acquire();
|
||||||
|
logger.info('NNTP connection acquired from pool.');
|
||||||
|
|
||||||
|
logger.debug(`Server date: ${await conn.date()}`);
|
||||||
|
|
||||||
|
const group = await conn.group('alt.binaries.test');
|
||||||
|
logger.debug(`Group info: ${JSON.stringify(group)}`);
|
||||||
|
|
||||||
|
const overview = await conn.xover(group.first, group.group);
|
||||||
|
logger.info(`Fetched ${overview.overviews.length} headers.`);
|
||||||
|
|
||||||
|
for (const [id, header] of overview.overviews) {
|
||||||
|
await headerQueue.add('process-header', header);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (overview.overviews.length > 0) {
|
||||||
|
const lastId = overview.overviews[overview.overviews.length - 1][0];
|
||||||
|
logger.info(`Last header ID queued: ${lastId}`);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Error in main execution:', error);
|
||||||
|
} finally {
|
||||||
|
if (conn) {
|
||||||
|
release(conn);
|
||||||
|
logger.info('NNTP connection released back to the pool.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startHeaderWorker();
|
||||||
|
startFileWorker();
|
||||||
|
startCollectionWorker();
|
||||||
|
startBodyWorker();
|
||||||
|
main();
|
||||||
|
|
||||||
|
process.on('SIGINT', async () => {
|
||||||
|
logger.info('Gracefully shutting down...');
|
||||||
|
await shutdown();
|
||||||
|
process.exit(0);
|
||||||
|
});
|
||||||
@ -1,70 +0,0 @@
|
|||||||
import { NNTP } from 'nntp-js';
|
|
||||||
import log4js from './logger.js';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger('pool');
|
|
||||||
|
|
||||||
export class NntpPool {
|
|
||||||
constructor(poolSize = 10) {
|
|
||||||
this.poolSize = poolSize;
|
|
||||||
this.allConnections = new Set();
|
|
||||||
this.idleConnections = [];
|
|
||||||
this.waiters = [];
|
|
||||||
this.createdCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _createConnection() {
|
|
||||||
const config = {
|
|
||||||
host: process.env.NNTP_HOST,
|
|
||||||
user: process.env.NNTP_USER,
|
|
||||||
password: process.env.NNTP_PASS,
|
|
||||||
port: 443,
|
|
||||||
secure: true,
|
|
||||||
};
|
|
||||||
|
|
||||||
const conn = new NNTP(config.host, 119);
|
|
||||||
await conn.connect();
|
|
||||||
await conn.login(config.user, config.password);
|
|
||||||
this.allConnections.add(conn);
|
|
||||||
return conn;
|
|
||||||
}
|
|
||||||
|
|
||||||
async acquire() {
|
|
||||||
if (this.idleConnections.length > 0) {
|
|
||||||
logger.debug('Reusing existing connection from pool.');
|
|
||||||
return this.idleConnections.pop();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.createdCount < this.poolSize) {
|
|
||||||
this.createdCount++;
|
|
||||||
logger.info(`Creating new connection (${this.createdCount}/${this.poolSize}).`);
|
|
||||||
return this._createConnection();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`Pool maxed out at ${this.poolSize}. Waiting for a connection to become available.`);
|
|
||||||
return new Promise(resolve => this.waiters.push(resolve));
|
|
||||||
}
|
|
||||||
|
|
||||||
release(conn) {
|
|
||||||
if (this.waiters.length > 0) {
|
|
||||||
logger.debug('Releasing connection directly to a waiting task.');
|
|
||||||
const resolve = this.waiters.shift();
|
|
||||||
resolve(conn);
|
|
||||||
} else {
|
|
||||||
logger.debug('Returning connection to the idle pool.');
|
|
||||||
this.idleConnections.push(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async shutdown() {
|
|
||||||
logger.info('Shutting down all connections in the pool.');
|
|
||||||
const shutdownPromises = [];
|
|
||||||
for (const conn of this.allConnections) {
|
|
||||||
shutdownPromises.push(conn.quit());
|
|
||||||
}
|
|
||||||
await Promise.all(shutdownPromises);
|
|
||||||
this.allConnections.clear();
|
|
||||||
this.idleConnections.length = 0;
|
|
||||||
this.waiters.length = 0;
|
|
||||||
this.createdCount = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,67 +0,0 @@
|
|||||||
import yencode from 'yencode';
|
|
||||||
import { crc32 } from 'crc';
|
|
||||||
import log4js from './logger.js';
|
|
||||||
import { parseYencMeta } from './yenc.util.js';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger('yenc');
|
|
||||||
|
|
||||||
export class YencFile {
|
|
||||||
constructor() {
|
|
||||||
this.targetBuffer = null;
|
|
||||||
this.totalSize = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
processPart(encodedBuffer) {
|
|
||||||
const meta = parseYencMeta(encodedBuffer);
|
|
||||||
|
|
||||||
// Initialize buffer on the first part that has total size info
|
|
||||||
if (!this.targetBuffer && meta.header?.size) {
|
|
||||||
this.totalSize = parseInt(meta.header.size, 10);
|
|
||||||
if (!this.totalSize) {
|
|
||||||
throw new Error('Could not determine total file size from yEnc metadata.');
|
|
||||||
}
|
|
||||||
this.targetBuffer = Buffer.alloc(this.totalSize);
|
|
||||||
logger.info(`Allocated buffer of size ${this.totalSize} for file.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!this.targetBuffer) {
|
|
||||||
throw new Error('Cannot process yEnc part: target buffer not initialized. The first part must contain total file size.');
|
|
||||||
}
|
|
||||||
|
|
||||||
const headerPartMarker = Buffer.from('=ypart');
|
|
||||||
const headerBeginMarker = Buffer.from('=ybegin');
|
|
||||||
const footerMarker = Buffer.from('\r\n=yend');
|
|
||||||
|
|
||||||
// The content starts after the LAST header line.
|
|
||||||
const partHeaderIndex = encodedBuffer.indexOf(headerPartMarker);
|
|
||||||
const beginHeaderIndex = encodedBuffer.indexOf(headerBeginMarker);
|
|
||||||
const contentHeaderIndex = partHeaderIndex !== -1 ? partHeaderIndex : beginHeaderIndex;
|
|
||||||
|
|
||||||
const contentStartIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex) + 2;
|
|
||||||
const contentEndIndex = encodedBuffer.lastIndexOf(footerMarker);
|
|
||||||
const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex);
|
|
||||||
|
|
||||||
const decoded = yencode.decode(dataToDecode);
|
|
||||||
|
|
||||||
const expectedSize = parseInt(meta.footer?.size, 10);
|
|
||||||
if (decoded.length !== expectedSize) {
|
|
||||||
throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`);
|
|
||||||
}
|
|
||||||
logger.debug('Part size check passed.');
|
|
||||||
|
|
||||||
const calculatedCrc = crc32(decoded);
|
|
||||||
const expectedCrc = parseInt(meta.footer?.pcrc32, 16);
|
|
||||||
if (calculatedCrc !== expectedCrc) {
|
|
||||||
throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`);
|
|
||||||
}
|
|
||||||
logger.debug('CRC32 check passed.');
|
|
||||||
|
|
||||||
const offset = parseInt(meta.part.begin, 10) - 1;
|
|
||||||
decoded.copy(this.targetBuffer, offset);
|
|
||||||
logger.info(`Processed part ${meta.header.part}/${meta.header.total || 'N/A'} and wrote to buffer at offset ${offset}.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
getBuffer() {
|
|
||||||
return this.targetBuffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,55 +0,0 @@
|
|||||||
/**
|
|
||||||
* Parses a single line of yEnc metadata.
|
|
||||||
* @param {string} line The metadata line.
|
|
||||||
* @returns {object} A key-value map of the metadata.
|
|
||||||
*/
|
|
||||||
function parseMetaLine(line) {
|
|
||||||
if (!line) return {};
|
|
||||||
const meta = {};
|
|
||||||
line.split(' ').forEach(part => {
|
|
||||||
const eqIndex = part.indexOf('=');
|
|
||||||
if (eqIndex !== -1) {
|
|
||||||
meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return meta;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Finds a line in a buffer that starts with a specific marker.
|
|
||||||
* @param {Buffer} buffer The buffer to search.
|
|
||||||
* @param {string} marker The marker to find (e.g., '=ybegin').
|
|
||||||
* @returns {string|null} The found line, or null.
|
|
||||||
*/
|
|
||||||
function findLine(buffer, marker) {
|
|
||||||
const markerBuffer = Buffer.from(marker);
|
|
||||||
const index = buffer.indexOf(markerBuffer);
|
|
||||||
if (index === -1) return null;
|
|
||||||
|
|
||||||
const lineEndIndex = buffer.indexOf(Buffer.from('\r\n'), index);
|
|
||||||
return buffer.subarray(index, (lineEndIndex !== -1) ? lineEndIndex : buffer.length).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Extracts and merges metadata from all yEnc headers and footers.
|
|
||||||
* @param {Buffer} encodedBuffer The yEnc-encoded buffer.
|
|
||||||
* @returns {object} A single, merged object of all metadata.
|
|
||||||
*/
|
|
||||||
export function parseYencMeta(encodedBuffer) {
|
|
||||||
const beginLine = findLine(encodedBuffer, '=ybegin');
|
|
||||||
const partLine = findLine(encodedBuffer, '=ypart');
|
|
||||||
const endLine = findLine(encodedBuffer, '=yend');
|
|
||||||
|
|
||||||
if (!endLine || (!beginLine && !partLine)) {
|
|
||||||
throw new Error('Invalid yEnc data: missing required headers or footers.');
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge metadata, with more specific lines overwriting general ones.
|
|
||||||
const meta = {
|
|
||||||
header: parseMetaLine(beginLine),
|
|
||||||
part: parseMetaLine(partLine),
|
|
||||||
footer: parseMetaLine(endLine),
|
|
||||||
};
|
|
||||||
|
|
||||||
return meta;
|
|
||||||
}
|
|
||||||
@ -20,7 +20,6 @@ log4js.configure({
|
|||||||
collection: { appenders: ['console', 'file'], level: 'info' },
|
collection: { appenders: ['console', 'file'], level: 'info' },
|
||||||
body: { appenders: ['console', 'file'], level: 'info' },
|
body: { appenders: ['console', 'file'], level: 'info' },
|
||||||
pool: { appenders: ['console', 'file'], level: 'info' },
|
pool: { appenders: ['console', 'file'], level: 'info' },
|
||||||
yenc: { appenders: ['console', 'file'], level: 'info' },
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
59
src/nntp.pool.js
Normal file
59
src/nntp.pool.js
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import { NNTP } from 'nntp-js';
|
||||||
|
import log4js from './logger.js';
|
||||||
|
|
||||||
|
const logger = log4js.getLogger('pool');
|
||||||
|
|
||||||
|
const POOL_SIZE = 5;
|
||||||
|
const connections = [];
|
||||||
|
const queue = [];
|
||||||
|
|
||||||
|
const createConnection = async () => {
|
||||||
|
const config = {
|
||||||
|
host: process.env.NNTP_HOST,
|
||||||
|
user: process.env.NNTP_USER,
|
||||||
|
password: process.env.NNTP_PASS,
|
||||||
|
port: 443,
|
||||||
|
secure: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
const conn = new NNTP(config.host, 119);
|
||||||
|
await conn.connect();
|
||||||
|
await conn.login(config.user, config.password);
|
||||||
|
return conn;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const acquire = async () => {
|
||||||
|
if (connections.length > 0) {
|
||||||
|
logger.debug('Reusing existing connection from pool.');
|
||||||
|
return connections.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connections.length + queue.length < POOL_SIZE) {
|
||||||
|
logger.info('Creating new connection.');
|
||||||
|
return createConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Waiting for a connection to become available.');
|
||||||
|
return new Promise(resolve => queue.push(resolve));
|
||||||
|
};
|
||||||
|
|
||||||
|
export const release = conn => {
|
||||||
|
if (queue.length > 0) {
|
||||||
|
logger.info('Releasing connection to a waiting consumer.');
|
||||||
|
const resolve = queue.shift();
|
||||||
|
resolve(conn);
|
||||||
|
} else {
|
||||||
|
logger.debug('Returning connection to the pool.');
|
||||||
|
connections.push(conn);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const shutdown = async () => {
|
||||||
|
logger.info('Shutting down all connections in the pool.');
|
||||||
|
const allConns = [...connections];
|
||||||
|
connections.length = 0; // Clear the pool
|
||||||
|
|
||||||
|
for (const conn of allConns) {
|
||||||
|
await conn.quit();
|
||||||
|
}
|
||||||
|
};
|
||||||
@ -1,8 +1,8 @@
|
|||||||
import 'dotenv/config';
|
import 'dotenv/config';
|
||||||
import { getDb } from '../lib/database.js';
|
import { getDb } from './database.js';
|
||||||
import { create } from 'xmlbuilder2';
|
import { create } from 'xmlbuilder2';
|
||||||
import fs from 'fs/promises';
|
import fs from 'fs/promises';
|
||||||
import log4js from '../lib/logger.js';
|
import log4js from './logger.js';
|
||||||
|
|
||||||
const logger = log4js.getLogger('nzb');
|
const logger = log4js.getLogger('nzb');
|
||||||
|
|
||||||
@ -1,71 +0,0 @@
|
|||||||
import { Queue, Worker } from 'bullmq';
|
|
||||||
import log4js from '../lib/logger.js';
|
|
||||||
import { NntpPool } from '../lib/NntpPool.js';
|
|
||||||
import { parseYencMeta } from '../lib/yenc.util.js';
|
|
||||||
import fs from 'fs/promises';
|
|
||||||
import path from 'path';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger('body');
|
|
||||||
|
|
||||||
export class BodyWorker {
|
|
||||||
constructor(headerQueue) {
|
|
||||||
this.headerQueue = headerQueue;
|
|
||||||
this.queue = new Queue('body-queue', {
|
|
||||||
connection: {
|
|
||||||
host: process.env.REDIS_HOST || 'localhost',
|
|
||||||
port: process.env.REDIS_PORT || 6379,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
this.pool = new NntpPool();
|
|
||||||
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
|
|
||||||
|
|
||||||
this.worker.on('failed', (job, err) => {
|
|
||||||
logger.error(`Body job ${job.id} failed with error: ${err.message}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async process(job) {
|
|
||||||
const { header } = job.data;
|
|
||||||
const messageId = header['message-id'];
|
|
||||||
logger.debug(`Processing header with unparsable subject: ${header.subject}`);
|
|
||||||
|
|
||||||
let conn;
|
|
||||||
try {
|
|
||||||
conn = await this.pool.acquire();
|
|
||||||
const bodyBuffer = (await conn.body(messageId)).data;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const meta = parseYencMeta(bodyBuffer);
|
|
||||||
if (meta.header.name) {
|
|
||||||
const { name, part, total } = meta.header;
|
|
||||||
const newSubject = `"${name}" yEnc (${part}/${total})`;
|
|
||||||
header.subject = newSubject;
|
|
||||||
|
|
||||||
logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
|
|
||||||
await this.headerQueue.add('process-header', header);
|
|
||||||
} else {
|
|
||||||
logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
|
|
||||||
}
|
|
||||||
} catch (parseError) {
|
|
||||||
if (parseError.message.includes('Invalid yEnc data')) {
|
|
||||||
logger.error(`Failed to parse yEnc data for message ID ${messageId}. Dumping buffer for inspection.`);
|
|
||||||
const debugDir = path.join(process.cwd(), 'debug');
|
|
||||||
await fs.mkdir(debugDir, { recursive: true });
|
|
||||||
const timestamp = new Date().toISOString().replace(/:/g, '-');
|
|
||||||
const dumpFile = path.join(debugDir, `body-error-${timestamp}-${messageId.replace(/[<>]/g, '')}.bin`);
|
|
||||||
await fs.writeFile(dumpFile, bodyBuffer);
|
|
||||||
logger.error(`Problematic body buffer saved to: ${dumpFile}`);
|
|
||||||
}
|
|
||||||
// Re-throw the original parsing error to fail the job
|
|
||||||
throw parseError;
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(`Error in body worker for message ID ${messageId}:`, error);
|
|
||||||
throw error; // Ensure the job fails if any other error occurs
|
|
||||||
} finally {
|
|
||||||
if (conn) {
|
|
||||||
this.pool.release(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,54 +0,0 @@
|
|||||||
import { Queue, Worker } from 'bullmq';
|
|
||||||
import Redis from 'ioredis';
|
|
||||||
import log4js from '../lib/logger.js';
|
|
||||||
|
|
||||||
const logger = log4js.getLogger('header');
|
|
||||||
|
|
||||||
export class HeaderWorker {
|
|
||||||
constructor(fileQueue, bodyQueue) {
|
|
||||||
this.fileQueue = fileQueue;
|
|
||||||
this.bodyQueue = bodyQueue;
|
|
||||||
this.queue = new Queue('header-queue', {
|
|
||||||
connection: {
|
|
||||||
host: process.env.REDIS_HOST || 'localhost',
|
|
||||||
port: process.env.REDIS_PORT || 6379,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
this.redis = new Redis(this.queue.opts.connection);
|
|
||||||
this.worker = new Worker(this.queue.name, this.process.bind(this), { connection: this.queue.opts.connection });
|
|
||||||
|
|
||||||
this.worker.on('failed', (job, err) => {
|
|
||||||
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async process(job) {
|
|
||||||
const header = job.data;
|
|
||||||
const subject = header.subject;
|
|
||||||
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
|
|
||||||
const match = subject.match(SUBJECT_REGEX);
|
|
||||||
|
|
||||||
if (match) {
|
|
||||||
const filename = match[1];
|
|
||||||
const part = parseInt(match[2], 10);
|
|
||||||
const total = parseInt(match[3], 10);
|
|
||||||
|
|
||||||
const fileKey = `file:${filename}`;
|
|
||||||
await this.redis.hset(fileKey, part, JSON.stringify(header));
|
|
||||||
|
|
||||||
const partCount = await this.redis.hlen(fileKey);
|
|
||||||
|
|
||||||
if (partCount === total) {
|
|
||||||
const fileParts = await this.redis.hgetall(fileKey);
|
|
||||||
await this.fileQueue.add('process-file', { filename, parts: fileParts });
|
|
||||||
await this.redis.del(fileKey);
|
|
||||||
logger.info(`File "${filename}" is complete and moved to file-queue.`);
|
|
||||||
} else {
|
|
||||||
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
|
|
||||||
await this.bodyQueue.add('process-body', { header });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,12 +1,13 @@
|
|||||||
import { decodeYenc } from './yenc.util.js';
|
import yencode from 'yencode';
|
||||||
import fs from 'fs/promises';
|
import fs from 'fs/promises';
|
||||||
import { Buffer } from 'buffer';
|
import { Buffer } from 'buffer';
|
||||||
|
|
||||||
async function runTest() {
|
async function runTest() {
|
||||||
const encodedData = await fs.readFile('files/HjVfQlWmHdUrQeQkRiLkTwEj-1779830864932@nyuu.bin');
|
const encodedData = await fs.readFile('HjVfQlWmHdUrQeQkRiLkTwEj-1779830864932@nyuu.bin');
|
||||||
const correctlyDecodedData = await fs.readFile('files/Dragon.Ball.S01E119.MULTI.BDRip.REMASTERED.1080p.x264.DTS-LILAS.par2-good');
|
const correctlyDecodedData = await fs.readFile('Dragon.Ball.S01E119.MULTI.BDRip.REMASTERED.1080p.x264.DTS-LILAS.par2');
|
||||||
|
|
||||||
const decodedBuffer = decodeYenc(encodedData);
|
const decodedBuffer = yencode.decode(encodedData);
|
||||||
|
await fs.writeFile('decoded.bin', decodedBuffer)
|
||||||
|
|
||||||
if (Buffer.compare(decodedBuffer, correctlyDecodedData) === 0) {
|
if (Buffer.compare(decodedBuffer, correctlyDecodedData) === 0) {
|
||||||
console.log('Test passed: Decoded data matches the correctly decoded file.');
|
console.log('Test passed: Decoded data matches the correctly decoded file.');
|
||||||
@ -14,7 +15,7 @@ async function runTest() {
|
|||||||
console.error('Test failed: Decoded data does not match the correctly decoded file.');
|
console.error('Test failed: Decoded data does not match the correctly decoded file.');
|
||||||
console.error('Decoded buffer length:', decodedBuffer.length);
|
console.error('Decoded buffer length:', decodedBuffer.length);
|
||||||
console.error('Correct buffer length:', correctlyDecodedData.length);
|
console.error('Correct buffer length:', correctlyDecodedData.length);
|
||||||
await fs.writeFile('files/test-decoded-output.bin', decodedBuffer);
|
await fs.writeFile('test-decoded-output.bin', decodedBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user