diff --git a/src/download.js b/src/download.js index 062f454..6e671ea 100644 --- a/src/download.js +++ b/src/download.js @@ -1,13 +1,36 @@ import 'dotenv/config'; import { getDb } from './database.js'; -import { acquire, release, shutdown } from './nntp.pool.js'; -import { decodeYenc } from './yenc.util.js'; +import { acquire, release, shutdown, setPoolSize } from './nntp.pool.js'; +import { decodeYenc, parseYencMeta } from './yenc.util.js'; import fs from 'fs/promises'; import log4js from './logger.js'; const logger = log4js.getLogger('download'); -async function downloadFile(fileId) { +async function downloadPart(partNumber, segment, targetBuffer) { + let conn; + try { + conn = await acquire(); + await conn.group('alt.binaries.test'); + logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`); + const bodyBuffer = (await conn.body(`<${segment.id}>`)).data; + decodeYenc(bodyBuffer, targetBuffer); + } catch (error) { + if (error.code === 430) { + logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`); + } else { + throw error; + } + } finally { + if (conn) { + release(conn); + } + } +} + +async function downloadFile(fileId, numConnections) { + setPoolSize(numConnections); + const db = await getDb(); const file = await db.get('SELECT * FROM files WHERE id = ?', fileId); @@ -16,56 +39,54 @@ async function downloadFile(fileId) { return; } - logger.info(`Downloading file: ${file.filename}`); + logger.info(`Downloading file: ${file.filename} with ${numConnections} connections.`); const messageIds = JSON.parse(file.message_ids); const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10)); - const parts = []; - let conn; - + // Download the first part to get the total file size + const [firstPartNumber, firstSegment] = sortedParts[0]; + let firstBodyBuffer; + let conn = await acquire(); try { - conn = await acquire(); await conn.group('alt.binaries.test'); - for (const [partNumber, segment] of sortedParts) { - if (!segment || !segment.id) { - logger.error(`Message ID for part ${partNumber} not found.`); - continue; - } - try { - logger.debug(`Downloading part ${partNumber}/${file.parts} with message ID: ${segment.id}`); - const bodyBuffer = (await conn.body(`<${segment.id}>`)).data; - const decodedPart = decodeYenc(bodyBuffer); - parts.push(decodedPart); - } catch (error) { - if (error.code === 430) { - logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`); - } else { - throw error; - } - } - } - } catch (error) { - logger.error('Error downloading file parts:', error); + firstBodyBuffer = (await conn.body(`<${firstSegment.id}>`)).data; } finally { - if (conn) { - release(conn); - } + release(conn); } - if (parts.length === file.parts) { - const completeFile = Buffer.concat(parts); - await fs.writeFile(file.filename, completeFile); - logger.info(`File "${file.filename}" downloaded successfully.`); - } else { - logger.error('Could not download all parts of the file.'); + const meta = parseYencMeta(firstBodyBuffer); + const totalSize = meta.total; + if (!totalSize) { + throw new Error('Could not determine total file size from yEnc metadata.'); } + + const targetBuffer = Buffer.alloc(totalSize); + logger.info(`Allocated buffer of size ${totalSize} for file "${file.filename}".`); + + // Decode the first part and write it to the target buffer + decodeYenc(firstBodyBuffer, targetBuffer); + + const downloadPromises = sortedParts.slice(1).map(([partNumber, segment]) => + downloadPart(partNumber, segment, targetBuffer) + ); + + await Promise.all(downloadPromises); + + await fs.writeFile(file.filename, targetBuffer); + logger.info(`File "${file.filename}" downloaded successfully.`); } -const fileId = parseInt(process.argv[2], 10); -if (isNaN(fileId)) { +const args = process.argv.slice(2); +const fileIdArg = args.find(arg => !arg.startsWith('--')); +const connectionsArg = args.find(arg => arg.startsWith('--connections=')); + +const fileId = fileIdArg ? parseInt(fileIdArg, 10) : null; +const numConnections = connectionsArg ? parseInt(connectionsArg.split('=')[1], 10) : 10; + +if (!fileId || isNaN(fileId)) { logger.error('Please provide a valid file ID as a command-line argument.'); process.exit(1); } -downloadFile(fileId).finally(() => shutdown()); +downloadFile(fileId, numConnections).finally(() => shutdown()); diff --git a/src/nntp.pool.js b/src/nntp.pool.js index 1bd9248..f511b23 100644 --- a/src/nntp.pool.js +++ b/src/nntp.pool.js @@ -3,9 +3,15 @@ import log4js from './logger.js'; const logger = log4js.getLogger('pool'); -const POOL_SIZE = 5; -const connections = []; -const queue = []; +let POOL_SIZE = 10; +const allConnections = new Set(); // Tracks all connections for proper shutdown +const idleConnections = []; // Tracks available, ready-to-use connections +const waiters = []; // Queue of promises for tasks waiting for a connection +let createdCount = 0; // A counter for total connections created + +export const setPoolSize = (size) => { + POOL_SIZE = size; +}; const createConnection = async () => { const config = { @@ -19,41 +25,46 @@ const createConnection = async () => { const conn = new NNTP(config.host, 119); await conn.connect(); await conn.login(config.user, config.password); + allConnections.add(conn); // Add to the set for shutdown tracking return conn; }; export const acquire = async () => { - if (connections.length > 0) { + if (idleConnections.length > 0) { logger.debug('Reusing existing connection from pool.'); - return connections.pop(); + return idleConnections.pop(); } - if (connections.length + queue.length < POOL_SIZE) { - logger.info('Creating new connection.'); + if (createdCount < POOL_SIZE) { + createdCount++; + logger.info(`Creating new connection (${createdCount}/${POOL_SIZE}).`); return createConnection(); } - logger.info('Waiting for a connection to become available.'); - return new Promise(resolve => queue.push(resolve)); + logger.info(`Pool maxed out at ${POOL_SIZE}. Waiting for a connection to become available.`); + return new Promise(resolve => waiters.push(resolve)); }; export const release = conn => { - if (queue.length > 0) { - logger.info('Releasing connection to a waiting consumer.'); - const resolve = queue.shift(); + if (waiters.length > 0) { + logger.debug('Releasing connection directly to a waiting task.'); + const resolve = waiters.shift(); resolve(conn); } else { - logger.debug('Returning connection to the pool.'); - connections.push(conn); + logger.debug('Returning connection to the idle pool.'); + idleConnections.push(conn); } }; export const shutdown = async () => { logger.info('Shutting down all connections in the pool.'); - const allConns = [...connections]; - connections.length = 0; // Clear the pool - - for (const conn of allConns) { - await conn.quit(); + const shutdownPromises = []; + for (const conn of allConnections) { + shutdownPromises.push(conn.quit()); } + await Promise.all(shutdownPromises); + allConnections.clear(); + idleConnections.length = 0; + waiters.length = 0; + createdCount = 0; };