diff --git a/src/bin/download.js b/src/bin/download.js index 6d0c54b..1758787 100644 --- a/src/bin/download.js +++ b/src/bin/download.js @@ -37,6 +37,7 @@ async function downloadFile(fileId, numConnections) { if (!file) { logger.error(`File with ID ${fileId} not found.`); + await pool.shutdown(); return; } @@ -45,7 +46,13 @@ async function downloadFile(fileId, numConnections) { const messageIds = JSON.parse(file.message_ids); const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10)); - const downloadPromises = sortedParts.map(([partNumber, segment]) => + // Sequentially process the first part to initialize the YencFile + const [firstPartNumber, firstSegment] = sortedParts[0]; + await downloadAndProcessPart(pool, firstPartNumber, firstSegment, yencFile); + + // Concurrently process the rest of the parts + const remainingParts = sortedParts.slice(1); + const downloadPromises = remainingParts.map(([partNumber, segment]) => downloadAndProcessPart(pool, partNumber, segment, yencFile) ); @@ -56,7 +63,7 @@ async function downloadFile(fileId, numConnections) { await fs.writeFile(file.filename, completeFile); logger.info(`File "${file.filename}" downloaded successfully.`); } else { - logger.error('Could not download all parts of the file.'); + logger.error('Could not assemble the final file.'); } await pool.shutdown(); diff --git a/src/lib/YencFile.js b/src/lib/YencFile.js index 77624a1..53f9ff2 100644 --- a/src/lib/YencFile.js +++ b/src/lib/YencFile.js @@ -1,6 +1,7 @@ import yencode from 'yencode'; import { crc32 } from 'crc'; import log4js from './logger.js'; +import fs from "fs"; const logger = log4js.getLogger('yenc'); @@ -10,14 +11,13 @@ export class YencFile { this.totalSize = 0; } + // This function now ONLY returns strings. The caller is responsible for parsing. _parseMetaLine(line) { const meta = {}; line.split(' ').forEach(part => { const eqIndex = part.indexOf('='); if (eqIndex !== -1) { - const key = part.slice(0, eqIndex); - const value = part.slice(eqIndex + 1); - meta[key] = isNaN(value) ? value : parseInt(value, 10); + meta[part.slice(0, eqIndex)] = part.slice(eqIndex + 1); } }); return meta; @@ -30,25 +30,28 @@ export class YencFile { const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); - const footerIndex = encodedBuffer.indexOf(footerMarker, headerPartIndex); + const footerIndex = encodedBuffer.lastIndexOf(footerMarker); if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) { throw new Error('Invalid yEnc part: missing header or footer.'); } const isFirstPart = headerBeginIndex !== -1; - const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex; - - const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex); - const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString(); + const contentHeaderIndex = headerPartIndex !== -1 ? headerPartIndex : headerBeginIndex; + + const headerLineEndIndex = encodedBuffer.indexOf('\r\n', contentHeaderIndex); + const headerLine = encodedBuffer.subarray(contentHeaderIndex, headerLineEndIndex).toString(); const headerMeta = this._parseMetaLine(headerLine); const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2); - const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString(); + const footerLine = encodedBuffer.subarray(footerIndex + 2).toString(); const footerMeta = this._parseMetaLine(footerLine); if (isFirstPart && !this.targetBuffer) { - this.totalSize = headerMeta.total; + const beginHeaderLine = encodedBuffer.subarray(headerBeginIndex, encodedBuffer.indexOf('\r\n', headerBeginIndex)).toString(); + const beginMeta = this._parseMetaLine(beginHeaderLine); + this.totalSize = parseInt(beginMeta.total, 10); + if (!this.totalSize) { throw new Error('Could not determine total file size from =ybegin header.'); } @@ -60,27 +63,28 @@ export class YencFile { throw new Error('Cannot process yEnc part without a target buffer. Process the first part (with =ybegin) first.'); } - const contentStartIndex = encodedBuffer.indexOf('\r\n', headerPartIndex) + 2; + const contentStartIndex = headerLineEndIndex + 2; const contentEndIndex = footerIndex; const dataToDecode = encodedBuffer.subarray(contentStartIndex, contentEndIndex); const decoded = yencode.decode(dataToDecode); - if (decoded.length !== footerMeta.size) { - throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${footerMeta.size}).`); + const expectedSize = parseInt(footerMeta.size, 10); + if (decoded.length !== expectedSize) { + throw new Error(`Decoded size (${decoded.length}) does not match expected part size (${expectedSize}).`); } logger.debug('Part size check passed.'); - const calculatedCrc = crc32(decoded).toString(16); - const expectedCrc = footerMeta.pcrc32; + const calculatedCrc = crc32(decoded); + const expectedCrc = parseInt(footerMeta.pcrc32, 16); if (calculatedCrc !== expectedCrc) { - throw new Error(`CRC32 mismatch: expected ${expectedCrc}, but got ${calculatedCrc}.`); + throw new Error(`CRC32 mismatch: expected ${expectedCrc.toString(16)}, but got ${calculatedCrc.toString(16)}.`); } logger.debug('CRC32 check passed.'); - const offset = headerMeta.begin ? headerMeta.begin - 1 : 0; + const offset = parseInt(headerMeta.begin, 10) - 1; decoded.copy(this.targetBuffer, offset); - logger.info(`Processed part ${headerMeta.part}/${headerMeta.total} and wrote to buffer at offset ${offset}.`); + logger.info(`Processed part ${headerMeta.part}/${parseInt(headerMeta.total, 10) || 'N/A'} and wrote to buffer at offset ${offset}.`); } getBuffer() { diff --git a/src/lib/yenc.util.js b/src/lib/yenc.util.js new file mode 100644 index 0000000..19247dd --- /dev/null +++ b/src/lib/yenc.util.js @@ -0,0 +1,51 @@ +/** + * Parses a single line of yEnc metadata. + * @param {string} line The metadata line (e.g., '=ybegin...'). + * @returns {object} A key-value map of the metadata. + */ +function parseMetaLine(line) { + const meta = {}; + line.split(' ').forEach(part => { + const eqIndex = part.indexOf('='); + if (eqIndex !== -1) { + const key = part.slice(0, eqIndex); + const value = part.slice(eqIndex + 1); + meta[key] = isNaN(value) ? value : parseInt(value, 10); + } + }); + console.log({line, meta}) + return meta; +} + +/** + * Extracts structured metadata from a yEnc-encoded buffer. + * @param {Buffer} encodedBuffer The yEnc-encoded buffer. + * @returns {{header: object, footer: object}} An object containing parsed header and footer metadata. + */ +export function parseYencMeta(encodedBuffer) { + const headerBeginMarker = Buffer.from('=ybegin'); + const headerPartMarker = Buffer.from('=ypart'); + const footerMarker = Buffer.from('\r\n=yend'); + + const headerBeginIndex = encodedBuffer.indexOf(headerBeginMarker); + const headerPartIndex = encodedBuffer.indexOf(headerPartMarker); + const footerIndex = encodedBuffer.lastIndexOf(footerMarker); + + if ((headerBeginIndex === -1 && headerPartIndex === -1) || footerIndex === -1) { + throw new Error('Invalid yEnc data: missing header or footer.'); + } + + const isFirstPart = headerBeginIndex !== -1; + const headerIndex = isFirstPart ? headerBeginIndex : headerPartIndex; + + const headerLineEndIndex = encodedBuffer.indexOf('\r\n', headerIndex); + const headerLine = encodedBuffer.subarray(headerIndex, headerLineEndIndex).toString(); + + const footerLineEndIndex = encodedBuffer.indexOf('\r\n', footerIndex + 2); + const footerLine = encodedBuffer.subarray(footerIndex + 2, footerLineEndIndex).toString(); + + return { + header: parseMetaLine(headerLine), + footer: parseMetaLine(footerLine), + }; +} diff --git a/src/workers/BodyWorker.js b/src/workers/BodyWorker.js index e76ad7a..09444f2 100644 --- a/src/workers/BodyWorker.js +++ b/src/workers/BodyWorker.js @@ -1,7 +1,7 @@ import { Queue, Worker } from 'bullmq'; import log4js from '../lib/logger.js'; import { NntpPool } from '../lib/NntpPool.js'; -import { decodeYenc } from '../lib/yenc.util.js'; +import { parseYencMeta } from '../lib/yenc.util.js'; const logger = log4js.getLogger('body'); @@ -30,18 +30,11 @@ export class BodyWorker { try { conn = await this.pool.acquire(); const bodyBuffer = (await conn.body(header['message-id'])).data; - const decodedBuffer = decodeYenc(bodyBuffer); - const firstLine = decodedBuffer.toString().split('\\n')[0]; + const meta = parseYencMeta(bodyBuffer); - const YENC_REGEX = /=ybegin part=(\d+) total=(\d+) line=\d+ size=\d+ name=(.+)/; - const match = firstLine.match(YENC_REGEX); - - if (match) { - const part = parseInt(match[1], 10); - const total = parseInt(match[2], 10); - const filename = match[3].trim(); - - const newSubject = `"${filename}" yEnc (${part}/${total})`; + if (meta.header.name) { + const { name, part, total } = meta.header; + const newSubject = `"${name}" yEnc (${part}/${total})`; header.subject = newSubject; logger.info(`Found yEnc metadata in body. New subject: ${newSubject}`); diff --git a/src/workers/CollectionWorker.js b/src/workers/CollectionWorker.js index bd6c659..0157ea5 100644 --- a/src/workers/CollectionWorker.js +++ b/src/workers/CollectionWorker.js @@ -3,7 +3,7 @@ import log4js from '../lib/logger.js'; import { getDb } from '../lib/database.js'; import { NntpPool } from '../lib/NntpPool.js'; import { createExtractorFromData } from 'node-unrar-js'; -import { decodeYenc } from '../lib/yenc.util.js'; +import { YencFile } from '../lib/YencFile.js'; const logger = log4js.getLogger('collection'); @@ -52,7 +52,11 @@ export class CollectionWorker { conn = await this.pool.acquire(); await conn.group('alt.binaries.test'); const bodyBuffer = (await conn.body(`<${firstPart.id}>`)).data; - const decodedBuffer = decodeYenc(bodyBuffer); + + const yencFile = new YencFile(); + yencFile.processPart(bodyBuffer); + const decodedBuffer = yencFile.getBuffer(); + const extractor = await createExtractorFromData({ data: decodedBuffer }); const fileList = extractor.getFileList(); logger.info(`Files in "${file.filename}":`, fileList);