download seems to work, par2 verification worked at least once

This commit is contained in:
Daan Meijer 2026-06-01 00:36:34 +02:00
parent ba6cb101a6
commit dfd3336447
3 changed files with 84 additions and 88 deletions

View File

@ -1,7 +1,7 @@
import yencode from 'yencode'; import yencode from 'yencode';
import { crc32 } from 'crc'; import { crc32 } from 'crc';
import log4js from './logger.js'; import log4js from './logger.js';
import fs from "fs"; import { parseYencMeta } from './yenc.util.js';
const logger = log4js.getLogger('yenc'); const logger = log4js.getLogger('yenc');
@ -11,80 +11,54 @@ export class YencFile {
this.totalSize = 0; this.totalSize = 0;
} }
// This function now ONLY returns strings. The caller is responsible for parsing.
_parseMetaLine(line) {
const meta = {};
line.split(' ').forEach(part => {
const eqIndex = part.indexOf('=');
if (eqIndex !== -1) {
meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1);
}
});
return meta;
}
processPart(encodedBuffer) { processPart(encodedBuffer) {
const headerBeginMarker = Buffer.from('=ybegin'); const meta = parseYencMeta(encodedBuffer);
const headerPartMarker = Buffer.from('=ypart');
const footerMarker = Buffer.from('\r\n=yend');
const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); // Initialize buffer on the first part that has total size info
const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); if (!this.targetBuffer && meta.header?.size) {
const footerIndex = encodedBuffer.lastIndexOf(footerMarker); this.totalSize = parseInt(meta.header.size, 10);
if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) {
throw new Error('Invalid yEnc part: missing header or footer.');
}
const isFirstPart = headerBeginIndex !== -1;
const contentHeaderIndex = headerPartIndex !== -1 ? headerPartIndex : headerBeginIndex;
const headerLineEndIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex);
const headerLine = encodedBuffer.subarray(contentHeaderIndex, headerLineEndIndex).toString();
const headerMeta = this._parseMetaLine(headerLine);
const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2);
const footerLine = encodedBuffer.subarray(footerIndex + 2).toString();
const footerMeta = this._parseMetaLine(footerLine);
if (isFirstPart && !this.targetBuffer) {
const beginHeaderLine = encodedBuffer.subarray(headerBeginIndex, encodedBuffer.indexOf('\r\n', headerBeginIndex)).toString();
const beginMeta = this._parseMetaLine(beginHeaderLine);
this.totalSize = parseInt(beginMeta.total, 10);
if (!this.totalSize) { if (!this.totalSize) {
throw new Error('Could not determine total file size from =ybegin header.'); throw new Error('Could not determine total file size from yEnc metadata.');
} }
this.targetBuffer = Buffer.alloc(this.totalSize); this.targetBuffer = Buffer.alloc(this.totalSize);
logger.info(`Allocated buffer of size ${this.totalSize} for file.`); logger.info(`Allocated buffer of size ${this.totalSize} for file.`);
} }
if (!this.targetBuffer) { if (!this.targetBuffer) {
throw new Error('Cannot process yEnc part without a target buffer. Process the first part (with =ybegin) first.'); throw new Error('Cannot process yEnc part: target buffer not initialized. The first part must contain total file size.');
} }
const contentStartIndex = headerLineEndIndex + 2; const headerPartMarker = Buffer.from('=ypart');
const contentEndIndex = footerIndex; const headerBeginMarker = Buffer.from('=ybegin');
const footerMarker = Buffer.from('\r\n=yend');
// The content starts after the LAST header line.
const partHeaderIndex = encodedBuffer.indexOf(headerPartMarker);
const beginHeaderIndex = encodedBuffer.indexOf(headerBeginMarker);
const contentHeaderIndex = partHeaderIndex !== -1 ? partHeaderIndex : beginHeaderIndex;
const contentStartIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex) + 2;
const contentEndIndex = encodedBuffer.lastIndexOf(footerMarker);
const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex); const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex);
const decoded = yencode.decode(dataToDecode); const decoded = yencode.decode(dataToDecode);
const expectedSize = parseInt(footerMeta.size, 10); const expectedSize = parseInt(meta.footer?.size, 10);
if (decoded.length !== expectedSize) { if (decoded.length !== expectedSize) {
throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`); throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`);
} }
logger.debug('Part size check passed.'); logger.debug('Part size check passed.');
const calculatedCrc = crc32(decoded); const calculatedCrc = crc32(decoded);
const expectedCrc = parseInt(footerMeta.pcrc32, 16); const expectedCrc = parseInt(meta.footer?.pcrc32, 16);
if (calculatedCrc !== expectedCrc) { if (calculatedCrc !== expectedCrc) {
throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`); throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`);
} }
logger.debug('CRC32 check passed.'); logger.debug('CRC32 check passed.');
const offset = parseInt(headerMeta.begin, 10) - 1; const offset = parseInt(meta.part.begin, 10) - 1;
decoded.copy(this.targetBuffer, offset); decoded.copy(this.targetBuffer, offset);
logger.info(`Processed part ${headerMeta.part}/${parseInt(headerMeta.total, 10) || 'N/A'} and wrote to buffer at offset ${offset}.`); logger.info(`Processed part ${meta.header.part}/${meta.header.total || 'N/A'} and wrote to buffer at offset ${offset}.`);
} }
getBuffer() { getBuffer() {

View File

@ -1,51 +1,55 @@
/** /**
* Parses a single line of yEnc metadata. * Parses a single line of yEnc metadata.
* @param {string} line The metadata line (e.g., '=ybegin...'). * @param {string} line The metadata line.
* @returns {object} A key-value map of the metadata. * @returns {object} A key-value map of the metadata.
*/ */
function parseMetaLine(line) { function parseMetaLine(line) {
if (!line) return {};
const meta = {}; const meta = {};
line.split(' ').forEach(part => { line.split(' ').forEach(part => {
const eqIndex = part.indexOf('='); const eqIndex = part.indexOf('=');
if (eqIndex !== -1) { if (eqIndex !== -1) {
const key = part.slice(0, eqIndex); meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1);
const value = part.slice(eqIndex + 1);
meta[key] = isNaN(value) ? value : parseInt(value, 10);
} }
}); });
console.log({line, meta})
return meta; return meta;
} }
/** /**
* Extracts structured metadata from a yEnc-encoded buffer. * Finds a line in a buffer that starts with a specific marker.
* @param {Buffer} buffer The buffer to search.
* @param {string} marker The marker to find (e.g., '=ybegin').
* @returns {string|null} The found line, or null.
*/
function findLine(buffer, marker) {
const markerBuffer = Buffer.from(marker);
const index = buffer.indexOf(markerBuffer);
if (index === -1) return null;
const lineEndIndex = buffer.indexOf(Buffer.from('\r\n'), index);
return buffer.subarray(index, (lineEndIndex !== -1) ? lineEndIndex : buffer.length).toString();
}
/**
* Extracts and merges metadata from all yEnc headers and footers.
* @param {Buffer} encodedBuffer The yEnc-encoded buffer. * @param {Buffer} encodedBuffer The yEnc-encoded buffer.
* @returns {{header: object, footer: object}} An object containing parsed header and footer metadata. * @returns {object} A single, merged object of all metadata.
*/ */
export function parseYencMeta(encodedBuffer) { export function parseYencMeta(encodedBuffer) {
const headerBeginMarker = Buffer.from('=ybegin'); const beginLine = findLine(encodedBuffer, '=ybegin');
const headerPartMarker = Buffer.from('=ypart'); const partLine = findLine(encodedBuffer, '=ypart');
const footerMarker = Buffer.from('\r\n=yend'); const endLine = findLine(encodedBuffer, '=yend');
const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); if (!endLine || (!beginLine && !partLine)) {
const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); throw new Error('Invalid yEnc data: missing required headers or footers.');
const footerIndex = encodedBuffer.lastIndexOf(footerMarker);
if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) {
throw new Error('Invalid yEnc data: missing header or footer.');
} }
const isFirstPart = headerBeginIndex !== -1; // Merge metadata, with more specific lines overwriting general ones.
const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex; const meta = {
header: parseMetaLine(beginLine),
const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex); part: parseMetaLine(partLine),
const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString(); footer: parseMetaLine(endLine),
const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2);
const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString();
return {
header: parseMetaLine(headerLine),
footer: parseMetaLine(footerLine),
}; };
return meta;
} }

View File

@ -2,6 +2,8 @@ import { Queue, Worker } from 'bullmq';
import log4js from '../lib/logger.js'; import log4js from '../lib/logger.js';
import { NntpPool } from '../lib/NntpPool.js'; import { NntpPool } from '../lib/NntpPool.js';
import { parseYencMeta } from '../lib/yenc.util.js'; import { parseYencMeta } from '../lib/yenc.util.js';
import fs from 'fs/promises';
import path from 'path';
const logger = log4js.getLogger('body'); const logger = log4js.getLogger('body');
@ -24,26 +26,42 @@ export class BodyWorker {
async process(job) { async process(job) {
const { header } = job.data; const { header } = job.data;
const messageId = header['message-id'];
logger.debug(`Processing header with unparsable subject: ${header.subject}`); logger.debug(`Processing header with unparsable subject: ${header.subject}`);
let conn; let conn;
try { try {
conn = await this.pool.acquire(); conn = await this.pool.acquire();
const bodyBuffer = (await conn.body(header['message-id'])).data; const bodyBuffer = (await conn.body(messageId)).data;
const meta = parseYencMeta(bodyBuffer);
try {
const meta = parseYencMeta(bodyBuffer);
if (meta.header.name) {
const { name, part, total } = meta.header;
const newSubject = `"${name}" yEnc (${part}/${total})`;
header.subject = newSubject;
if (meta.header.name) { logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`);
const { name, part, total } = meta.header; await this.headerQueue.add('process-header', header);
const newSubject = `"${name}" yEnc (${part}/${total})`; } else {
header.subject = newSubject; logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`);
}
logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); } catch (parseError) {
await this.headerQueue.add('process-header', header); if (parseError.message.includes('Invalid yEnc data')) {
} else { logger.error(`Failed to parse yEnc data for message ID ${messageId}. Dumping buffer for inspection.`);
logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`); const debugDir = path.join(process.cwd(), 'debug');
await fs.mkdir(debugDir, { recursive: true });
const timestamp = new Date().toISOString().replace(/:/g, '-');
const dumpFile = path.join(debugDir, `body-error-${timestamp}-${messageId.replace(/[<>]/g, '')}.bin`);
await fs.writeFile(dumpFile, bodyBuffer);
logger.error(`Problematic body buffer saved to: ${dumpFile}`);
}
// Re-throw the original parsing error to fail the job
throw parseError;
} }
} catch (error) { } catch (error) {
logger.error('Error in body worker:', error); logger.error(`Error in body worker for message ID ${messageId}:`, error);
throw error; // Ensure the job fails if any other error occurs
} finally { } finally {
if (conn) { if (conn) {
this.pool.release(conn); this.pool.release(conn);