diff --git a/src/lib/YencFile.js b/src/lib/YencFile.js index 53f9ff2..3d2f0c6 100644 --- a/src/lib/YencFile.js +++ b/src/lib/YencFile.js @@ -1,7 +1,7 @@ import yencode from 'yencode'; import { crc32 } from 'crc'; import log4js from './logger.js'; -import fs from "fs"; +import { parseYencMeta } from './yenc.util.js'; const logger = log4js.getLogger('yenc'); @@ -11,80 +11,54 @@ export class YencFile { this.totalSize = 0; } - // This function now ONLY returns strings. The caller is responsible for parsing. - _parseMetaLine(line) { - const meta = {}; - line.split(' ').forEach(part => { - const eqIndex = part.indexOf('='); - if (eqIndex !== -1) { - meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1); - } - }); - return meta; - } - processPart(encodedBuffer) { - const headerBeginMarker = Buffer.from('=ybegin'); - const headerPartMarker = Buffer.from('=ypart'); - const footerMarker = Buffer.from('\r\n=yend'); + const meta = parseYencMeta(encodedBuffer); - const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); - const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); - const footerIndex = encodedBuffer.lastIndexOf(footerMarker); - - if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) { - throw new Error('Invalid yEnc part: missing header or footer.'); - } - - const isFirstPart = headerBeginIndex !== -1; - const contentHeaderIndex = headerPartIndex !== -1 ? headerPartIndex : headerBeginIndex; - - const headerLineEndIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex); - const headerLine = encodedBuffer.subarray(contentHeaderIndex, headerLineEndIndex).toString(); - const headerMeta = this._parseMetaLine(headerLine); - - const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2); - const footerLine = encodedBuffer.subarray(footerIndex + 2).toString(); - const footerMeta = this._parseMetaLine(footerLine); - - if (isFirstPart && !this.targetBuffer) { - const beginHeaderLine = encodedBuffer.subarray(headerBeginIndex, encodedBuffer.indexOf('\r\n', headerBeginIndex)).toString(); - const beginMeta = this._parseMetaLine(beginHeaderLine); - this.totalSize = parseInt(beginMeta.total, 10); - + // Initialize buffer on the first part that has total size info + if (!this.targetBuffer && meta.header?.size) { + this.totalSize = parseInt(meta.header.size, 10); if (!this.totalSize) { - throw new Error('Could not determine total file size from =ybegin header.'); + throw new Error('Could not determine total file size from yEnc metadata.'); } this.targetBuffer = Buffer.alloc(this.totalSize); logger.info(`Allocated buffer of size ${this.totalSize} for file.`); } if (!this.targetBuffer) { - throw new Error('Cannot process yEnc part without a target buffer. Process the first part (with =ybegin) first.'); + throw new Error('Cannot process yEnc part: target buffer not initialized. The first part must contain total file size.'); } - const contentStartIndex = headerLineEndIndex + 2; - const contentEndIndex = footerIndex; + const headerPartMarker = Buffer.from('=ypart'); + const headerBeginMarker = Buffer.from('=ybegin'); + const footerMarker = Buffer.from('\r\n=yend'); + + // The content starts after the LAST header line. + const partHeaderIndex = encodedBuffer.indexOf(headerPartMarker); + const beginHeaderIndex = encodedBuffer.indexOf(headerBeginMarker); + const contentHeaderIndex = partHeaderIndex !== -1 ? partHeaderIndex : beginHeaderIndex; + + const contentStartIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex) + 2; + const contentEndIndex = encodedBuffer.lastIndexOf(footerMarker); const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex); const decoded = yencode.decode(dataToDecode); - const expectedSize = parseInt(footerMeta.size, 10); + const expectedSize = parseInt(meta.footer?.size, 10); if (decoded.length !== expectedSize) { throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`); } logger.debug('Part size check passed.'); const calculatedCrc = crc32(decoded); - const expectedCrc = parseInt(footerMeta.pcrc32, 16); + const expectedCrc = parseInt(meta.footer?.pcrc32, 16); if (calculatedCrc !== expectedCrc) { throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`); } logger.debug('CRC32 check passed.'); - const offset = parseInt(headerMeta.begin, 10) - 1; + const offset = parseInt(meta.part.begin, 10) - 1; decoded.copy(this.targetBuffer, offset); - logger.info(`Processed part ${headerMeta.part}/${parseInt(headerMeta.total, 10) || 'N/A'} and wrote to buffer at offset ${offset}.`); + logger.info(`Processed part ${meta.header.part}/${meta.header.total || 'N/A'} and wrote to buffer at offset ${offset}.`); } getBuffer() { diff --git a/src/lib/yenc.util.js b/src/lib/yenc.util.js index 19247dd..1b45e77 100644 --- a/src/lib/yenc.util.js +++ b/src/lib/yenc.util.js @@ -1,51 +1,55 @@ /** * Parses a single line of yEnc metadata. - * @param {string} line The metadata line (e.g., '=ybegin...'). + * @param {string} line The metadata line. * @returns {object} A key-value map of the metadata. */ function parseMetaLine(line) { + if (!line) return {}; const meta = {}; line.split(' ').forEach(part => { const eqIndex = part.indexOf('='); if (eqIndex !== -1) { - const key = part.slice(0, eqIndex); - const value = part.slice(eqIndex + 1); - meta[key] = isNaN(value) ? value : parseInt(value, 10); + meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1); } }); - console.log({line, meta}) return meta; } /** - * Extracts structured metadata from a yEnc-encoded buffer. + * Finds a line in a buffer that starts with a specific marker. + * @param {Buffer} buffer The buffer to search. + * @param {string} marker The marker to find (e.g., '=ybegin'). + * @returns {string|null} The found line, or null. + */ +function findLine(buffer, marker) { + const markerBuffer = Buffer.from(marker); + const index = buffer.indexOf(markerBuffer); + if (index === -1) return null; + + const lineEndIndex = buffer.indexOf(Buffer.from('\r\n'), index); + return buffer.subarray(index, (lineEndIndex !== -1) ? lineEndIndex : buffer.length).toString(); +} + +/** + * Extracts and merges metadata from all yEnc headers and footers. * @param {Buffer} encodedBuffer The yEnc-encoded buffer. - * @returns {{header: object, footer: object}} An object containing parsed header and footer metadata. + * @returns {object} A single, merged object of all metadata. */ export function parseYencMeta(encodedBuffer) { - const headerBeginMarker = Buffer.from('=ybegin'); - const headerPartMarker = Buffer.from('=ypart'); - const footerMarker = Buffer.from('\r\n=yend'); + const beginLine = findLine(encodedBuffer, '=ybegin'); + const partLine = findLine(encodedBuffer, '=ypart'); + const endLine = findLine(encodedBuffer, '=yend'); - const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); - const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); - const footerIndex = encodedBuffer.lastIndexOf(footerMarker); - - if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) { - throw new Error('Invalid yEnc data: missing header or footer.'); + if (!endLine || (!beginLine && !partLine)) { + throw new Error('Invalid yEnc data: missing required headers or footers.'); } - const isFirstPart = headerBeginIndex !== -1; - const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex; - - const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex); - const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString(); - - const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2); - const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString(); - - return { - header: parseMetaLine(headerLine), - footer: parseMetaLine(footerLine), + // Merge metadata, with more specific lines overwriting general ones. + const meta = { + header: parseMetaLine(beginLine), + part: parseMetaLine(partLine), + footer: parseMetaLine(endLine), }; + + return meta; } diff --git a/src/workers/BodyWorker.js b/src/workers/BodyWorker.js index 09444f2..33ca372 100644 --- a/src/workers/BodyWorker.js +++ b/src/workers/BodyWorker.js @@ -2,6 +2,8 @@ import { Queue, Worker } from 'bullmq'; import log4js from '../lib/logger.js'; import { NntpPool } from '../lib/NntpPool.js'; import { parseYencMeta } from '../lib/yenc.util.js'; +import fs from 'fs/promises'; +import path from 'path'; const logger = log4js.getLogger('body'); @@ -24,26 +26,42 @@ export class BodyWorker { async process(job) { const { header } = job.data; + const messageId = header['message-id']; logger.debug(`Processing header with unparsable subject: ${header.subject}`); let conn; try { conn = await this.pool.acquire(); - const bodyBuffer = (await conn.body(header['message-id'])).data; - const meta = parseYencMeta(bodyBuffer); + const bodyBuffer = (await conn.body(messageId)).data; + + try { + const meta = parseYencMeta(bodyBuffer); + if (meta.header.name) { + const { name, part, total } = meta.header; + const newSubject = `"${name}" yEnc (${part}/${total})`; + header.subject = newSubject; - if (meta.header.name) { - const { name, part, total } = meta.header; - const newSubject = `"${name}" yEnc (${part}/${total})`; - header.subject = newSubject; - - logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); - await this.headerQueue.add('process-header', header); - } else { - logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`); + logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); + await this.headerQueue.add('process-header', header); + } else { + logger.warn(`Could not find yEnc metadata in body for header: ${header.subject}`); + } + } catch (parseError) { + if (parseError.message.includes('Invalid yEnc data')) { + logger.error(`Failed to parse yEnc data for message ID ${messageId}. Dumping buffer for inspection.`); + const debugDir = path.join(process.cwd(), 'debug'); + await fs.mkdir(debugDir, { recursive: true }); + const timestamp = new Date().toISOString().replace(/:/g, '-'); + const dumpFile = path.join(debugDir, `body-error-${timestamp}-${messageId.replace(/[<>]/g, '')}.bin`); + await fs.writeFile(dumpFile, bodyBuffer); + logger.error(`Problematic body buffer saved to: ${dumpFile}`); + } + // Re-throw the original parsing error to fail the job + throw parseError; } } catch (error) { - logger.error('Error in body worker:', error); + logger.error(`Error in body worker for message ID ${messageId}:`, error); + throw error; // Ensure the job fails if any other error occurs } finally { if (conn) { this.pool.release(conn);