first commit: decoding doesn't work well yet

This commit is contained in:
Daan Meijer 2026-05-27 07:31:40 +02:00
commit d5b359191c
14 changed files with 1965 additions and 0 deletions

113
.gitignore vendored Normal file
View File

@ -0,0 +1,113 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-temporary-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# TypeScript v1 declaration files
typings/
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variables file
.env
.env.test
# parcel-bundler cache (https://parceljs.org/)
.cache
# Next.js build output
.next
# Nuxt.js build output
.nuxt
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# IDE files
.idea/
.vscode/
*.swp
database.sqlite
*.par2
*.nzb
files
*.bin

1266
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

26
package.json Normal file
View File

@ -0,0 +1,26 @@
{
"name": "usenet-indexer",
"version": "1.0.0",
"description": "",
"main": "index.js",
"type": "module",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node src/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"bullmq": "^5.77.3",
"dotenv": "^16.3.1",
"ioredis": "^5.3.2",
"log4js": "^6.9.1",
"nntp-js": "^1.0.4",
"node-unrar-js": "^2.0.0",
"sqlite": "^5.1.1",
"sqlite3": "^6.0.1",
"xmlbuilder2": "^3.1.1",
"yencode": "^1.0.1"
}
}

58
src/body.worker.js Normal file
View File

@ -0,0 +1,58 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { headerQueue } from './header.worker.js';
import { acquire, release } from './nntp.pool.js';
const bodyLogger = log4js.getLogger('body');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
export const bodyQueue = new Queue('body-queue', { connection });
const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/;
export const startBodyWorker = () => {
const bodyWorker = new Worker('body-queue', async job => {
const { header } = job.data;
bodyLogger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn;
try {
conn = await acquire();
const bodyBuffer = (await conn.body(header['message-id'])).data;
const firstNewlineIndex = bodyBuffer.indexOf('\\n');
const firstLineBuffer = (firstNewlineIndex !== -1) ? bodyBuffer.slice(0, firstNewlineIndex) : bodyBuffer;
const firstLine = firstLineBuffer.toString();
const match = firstLine.match(YENC_REGEX);
if (match) {
const part = parseInt(match[1], 10);
const total = parseInt(match[2], 10);
const filename = match[3].trim();
const newSubject = `"${filename}" yEnc (${part}/${total})`;
header.subject = newSubject;
bodyLogger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
await headerQueue.add('process-header', header);
} else {
bodyLogger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
} catch (error) {
bodyLogger.error('Error in body worker:', error);
} finally {
if (conn) {
release(conn);
}
}
}, { connection });
bodyWorker.on('failed', (job, err) => {
bodyLogger.error(`Body job ${job.id} failed with error: ${err.message}`);
});
};

72
src/collection.worker.js Normal file
View File

@ -0,0 +1,72 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { getDb } from './database.js';
import { acquire, release } from './nntp.pool.js';
import { createExtractorFromData } from 'node-unrar-js';
import * as yEnc from 'simple-yenc';
const logger = log4js.getLogger('collection');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
export const collectionQueue = new Queue('collection-queue', { connection });
const RAR_REGEX = /\.part0*1\.rar$/;
export const startCollectionWorker = () => {
const collectionWorker = new Worker('collection-queue', async job => {
const { fileId } = job.data;
logger.debug(`Processing file ID ${fileId} for collection.`);
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
if (!file) {
logger.error(`File with ID ${fileId} not found in the database.`);
return;
}
if (RAR_REGEX.test(file.filename)) {
logger.info(`File "${file.filename}" is the first part of a RAR set.`);
const messageIds = JSON.parse(file.message_ids);
const firstPart = messageIds['1'];
if (!firstPart || !firstPart.id) {
logger.error(`Could not find message ID for the first part of file "${file.filename}".`);
return;
}
let conn;
try {
conn = await acquire();
await conn.group('alt.binaries.test');
const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data;
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
const buffer = Buffer.from(decodedUint8Array);
const extractor = await createExtractorFromData({ data: buffer });
const fileList = extractor.getFileList();
logger.info(`Files in "${file.filename}":`, fileList);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for first part of RAR set (Message ID: ${firstPart.id})`);
} else {
logger.error('Error processing RAR file:', error);
}
} finally {
if (conn) {
release(conn);
}
}
} else {
logger.debug(`File "${file.filename}" is not the first part of a RAR set.`);
}
}, { connection });
collectionWorker.on('failed', (job, err) => {
logger.error(`Collection job ${job.id} failed with error: ${err.message}`);
});
};

26
src/database.js Normal file
View File

@ -0,0 +1,26 @@
import { open } from 'sqlite';
import sqlite3 from 'sqlite3';
let db;
export const getDb = async () => {
if (!db) {
db = await open({
filename: './database.sqlite',
driver: sqlite3.Database,
});
await db.exec(`
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
poster TEXT NOT NULL,
date INTEGER NOT NULL,
parts INTEGER NOT NULL,
message_ids TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`);
}
return db;
};

72
src/download.js Normal file
View File

@ -0,0 +1,72 @@
import 'dotenv/config';
import { getDb } from './database.js';
import { acquire, release, shutdown } from './nntp.pool.js';
import * as yEnc from 'simple-yenc';
import fs from 'fs/promises';
import log4js from './logger.js';
const logger = log4js.getLogger('download');
async function downloadFile(fileId) {
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
if (!file) {
logger.error(`File with ID ${fileId} not found.`);
return;
}
logger.info(`Downloading file: ${file.filename}`);
const messageIds = JSON.parse(file.message_ids);
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
const parts = [];
let conn;
try {
conn = await acquire();
await conn.group('alt.binaries.test');
for (const [partNumber, segment] of sortedParts) {
if (!segment || !segment.id) {
logger.error(`Message ID for part ${partNumber} not found.`);
continue;
}
try {
logger.debug(`Downloading part ${partNumber}/${file.parts} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
const decodedUint8Array = yEnc.decode(bodyBuffer.toString('latin1'));
const buffer = Buffer.from(decodedUint8Array);
parts.push(buffer);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
} else {
throw error;
}
}
}
} catch (error) {
logger.error('Error downloading file parts:', error);
} finally {
if (conn) {
release(conn);
}
}
if (parts.length === file.parts) {
const completeFile = Buffer.concat(parts);
await fs.writeFile(file.filename, completeFile);
logger.info(`File "${file.filename}" downloaded successfully.`);
} else {
logger.error('Could not download all parts of the file.');
}
}
const fileId = parseInt(process.argv[2], 10);
if (isNaN(fileId)) {
logger.error('Please provide a valid file ID as a command-line argument.');
process.exit(1);
}
downloadFile(fileId).finally(() => shutdown());

63
src/file.worker.js Normal file
View File

@ -0,0 +1,63 @@
import { Queue, Worker } from 'bullmq';
import log4js from './logger.js';
import { getDb } from './database.js';
import { collectionQueue } from './collection.worker.js';
const logger = log4js.getLogger('file');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
export const fileQueue = new Queue('file-queue', { connection });
export const startFileWorker = () => {
const fileWorker = new Worker('file-queue', async job => {
const { filename, parts } = job.data;
const partCount = Object.keys(parts).length;
logger.debug(`Processing complete file: "${filename}" with ${partCount} parts.`);
const firstPart = JSON.parse(Object.values(parts)[0]);
const poster = firstPart.from;
const date = new Date(firstPart.date).getTime();
const messageIds = Object.entries(parts).reduce((acc, [partNumber, partData]) => {
const part = JSON.parse(partData);
const messageId = part['message-id'];
if (messageId) {
acc[partNumber] = {
id: messageId.replace(/[<>]/g, ''),
size: part[':bytes'],
};
} else {
logger.warn(`Message ID not found for part ${partNumber} of file "${filename}"`);
}
return acc;
}, {});
if (Object.keys(messageIds).length !== partCount) {
throw new Error(`Could not process all parts for file "${filename}" due to missing message IDs.`);
}
const db = await getDb();
const result = await db.run(
'INSERT INTO files (filename, poster, date, parts, message_ids) VALUES (?, ?, ?, ?, ?)',
filename,
poster,
date,
partCount,
JSON.stringify(messageIds)
);
const fileId = result.lastID;
logger.debug(`Saved file "${filename}" to database with ID: ${fileId}`);
await collectionQueue.add('process-collection', { fileId });
logger.debug(`Added file ID ${fileId} to collection queue.`);
}, { connection });
fileWorker.on('failed', (job, err) => {
logger.error(`File job ${job.id} failed with error: ${err.message}`);
});
};

54
src/header.worker.js Normal file
View File

@ -0,0 +1,54 @@
import { Queue, Worker } from 'bullmq';
import Redis from 'ioredis';
import log4js from './logger.js';
import { fileQueue } from './file.worker.js';
import { bodyQueue } from './body.worker.js';
const logger = log4js.getLogger('header');
const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
};
const redis = new Redis(connection);
export const headerQueue = new Queue('header-queue', { connection });
const SUBJECT_REGEX = /"(.+)"(?: yEnc)? \((\d+)\/(\d+)\)/;
export const startHeaderWorker = () => {
const headerWorker = new Worker('header-queue', async job => {
const header = job.data;
const subject = header.subject;
const match = subject.match(SUBJECT_REGEX);
if (match) {
const filename = match[1];
const part = parseInt(match[2], 10);
const total = parseInt(match[3], 10);
const fileKey = `file:${filename}`;
await redis.hset(fileKey, part, JSON.stringify(header));
const partCount = await redis.hlen(fileKey);
if (partCount === total) {
const fileParts = await redis.hgetall(fileKey);
await fileQueue.add('process-file', { filename, parts: fileParts });
await redis.del(fileKey);
logger.info(`File "${filename}" is complete and moved to file-queue.`, fileParts);
} else {
logger.info(`Stored part ${part}/${total} for file "${filename}"`);
}
} else {
logger.warn(`Could not parse subject: "${subject}". Moving to body-queue.`);
await bodyQueue.add('process-body', { header });
}
}, { connection });
headerWorker.on('failed', (job, err) => {
logger.error(`Header job ${job.id} failed with error: ${err.message}`);
});
};

53
src/index.js Normal file
View File

@ -0,0 +1,53 @@
import 'dotenv/config';
import { headerQueue, startHeaderWorker } from './header.worker.js';
import { startFileWorker } from './file.worker.js';
import { startCollectionWorker } from './collection.worker.js';
import { startBodyWorker } from './body.worker.js';
import log4js from './logger.js';
import { acquire, release, shutdown } from './nntp.pool.js';
const logger = log4js.getLogger();
async function main() {
let conn;
try {
conn = await acquire();
logger.info('NNTP connection acquired from pool.');
logger.debug(`Server date: ${await conn.date()}`);
const group = await conn.group('alt.binaries.test');
logger.debug(`Group info: ${JSON.stringify(group)}`);
const overview = await conn.xover(group.first, group.group);
logger.info(`Fetched ${overview.overviews.length} headers.`);
for (const [id, header] of overview.overviews) {
await headerQueue.add('process-header', header);
}
if (overview.overviews.length > 0) {
const lastId = overview.overviews[overview.overviews.length - 1][0];
logger.info(`Last header ID queued: ${lastId}`);
}
} catch (error) {
logger.error('Error in main execution:', error);
} finally {
if (conn) {
release(conn);
logger.info('NNTP connection released back to the pool.');
}
}
}
startHeaderWorker();
startFileWorker();
startCollectionWorker();
startBodyWorker();
main();
process.on('SIGINT', async () => {
logger.info('Gracefully shutting down...');
await shutdown();
process.exit(0);
});

26
src/logger.js Normal file
View File

@ -0,0 +1,26 @@
import log4js from 'log4js';
import fs from 'fs';
const logsDir = './logs';
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir);
}
const timestamp = new Date().toISOString().replace(/:/g, '-');
log4js.configure({
appenders: {
console: { type: 'console' },
file: { type: 'file', filename: `${logsDir}/${timestamp}.log` },
},
categories: {
default: { appenders: ['console', 'file'], level: 'debug' },
header: { appenders: ['console', 'file'], level: 'info' },
file: { appenders: ['console', 'file'], level: 'info' },
collection: { appenders: ['console', 'file'], level: 'info' },
body: { appenders: ['console', 'file'], level: 'info' },
pool: { appenders: ['console', 'file'], level: 'info' },
},
});
export default log4js;

59
src/nntp.pool.js Normal file
View File

@ -0,0 +1,59 @@
import { NNTP } from 'nntp-js';
import log4js from './logger.js';
const logger = log4js.getLogger('pool');
const POOL_SIZE = 5;
const connections = [];
const queue = [];
const createConnection = async () => {
const config = {
host: process.env.NNTP_HOST,
user: process.env.NNTP_USER,
password: process.env.NNTP_PASS,
port: 443,
secure: true,
};
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
return conn;
};
export const acquire = async () => {
if (connections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return connections.pop();
}
if (connections.length + queue.length < POOL_SIZE) {
logger.info('Creating new connection.');
return createConnection();
}
logger.info('Waiting for a connection to become available.');
return new Promise(resolve => queue.push(resolve));
};
export const release = conn => {
if (queue.length > 0) {
logger.info('Releasing connection to a waiting consumer.');
const resolve = queue.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the pool.');
connections.push(conn);
}
};
export const shutdown = async () => {
logger.info('Shutting down all connections in the pool.');
const allConns = [...connections];
connections.length = 0; // Clear the pool
for (const conn of allConns) {
await conn.quit();
}
};

55
src/nzb.js Normal file
View File

@ -0,0 +1,55 @@
import 'dotenv/config';
import { getDb } from './database.js';
import { create } from 'xmlbuilder2';
import fs from 'fs/promises';
import log4js from './logger.js';
const logger = log4js.getLogger('nzb');
async function createNzb(fileId) {
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
if (!file) {
logger.error(`File with ID ${fileId} not found.`);
return;
}
logger.info(`Creating NZB for file: ${file.filename}`);
const messageIds = JSON.parse(file.message_ids);
const root = create({ version: '1.0', encoding: 'UTF-8' })
.dtd({ pubID: '-//newzBin//DTD NZB 1.1//EN', sysID: 'http://www.newzbin.com/DTD/nzb/nzb-1.1.dtd' })
.ele('nzb', { xmlns: 'http://www.newzbin.com/DTD/2003/nzb' });
const nzbFile = root.ele('file', {
poster: file.poster,
date: file.date,
subject: file.filename,
});
const groups = nzbFile.ele('groups');
// This should be dynamic in a real application
groups.ele('group').txt('alt.binaries.test');
const segments = nzbFile.ele('segments');
for (let i = 1; i <= file.parts; i++) {
const segment = messageIds[i];
segments.ele('segment', { 'bytes': segment.size.toString(), 'number': i.toString() }).txt(segment.id);
}
const xml = root.end({ prettyPrint: true });
const nzbFilename = `${file.filename}.nzb`;
await fs.writeFile(nzbFilename, xml);
logger.info(`NZB file created: ${nzbFilename}`);
}
const fileId = parseInt(process.argv[2], 10);
if (isNaN(fileId)) {
logger.error('Please provide a valid file ID as a command-line argument.');
process.exit(1);
}
createNzb(fileId);

22
src/yenc.test.js Normal file
View File

@ -0,0 +1,22 @@
import yencode from 'yencode';
import fs from 'fs/promises';
import { Buffer } from 'buffer';
async function runTest() {
const encodedData = await fs.readFile('HjVfQlWmHdUrQeQkRiLkTwEj-1779830864932@nyuu.bin');
const correctlyDecodedData = await fs.readFile('Dragon.Ball.S01E119.MULTI.BDRip.REMASTERED.1080p.x264.DTS-LILAS.par2');
const decodedBuffer = yencode.decode(encodedData);
await fs.writeFile('decoded.bin', decodedBuffer)
if (Buffer.compare(decodedBuffer, correctlyDecodedData) === 0) {
console.log('Test passed: Decoded data matches the correctly decoded file.');
} else {
console.error('Test failed: Decoded data does not match the correctly decoded file.');
console.error('Decoded buffer length:', decodedBuffer.length);
console.error('Correct buffer length:', correctlyDecodedData.length);
await fs.writeFile('test-decoded-output.bin', decodedBuffer);
}
}
runTest();