downloader can handle concurrent connections

This commit is contained in:
Daan Meijer 2026-05-31 21:23:18 +02:00
parent b5647521fd
commit efbc1237d6
2 changed files with 91 additions and 59 deletions

View File

@ -1,13 +1,36 @@
import 'dotenv/config';
import { getDb } from './database.js';
import { acquire, release, shutdown } from './nntp.pool.js';
import { decodeYenc } from './yenc.util.js';
import { acquire, release, shutdown, setPoolSize } from './nntp.pool.js';
import { decodeYenc, parseYencMeta } from './yenc.util.js';
import fs from 'fs/promises';
import log4js from './logger.js';
const logger = log4js.getLogger('download');
async function downloadFile(fileId) {
async function downloadPart(partNumber, segment, targetBuffer) {
let conn;
try {
conn = await acquire();
await conn.group('alt.binaries.test');
logger.debug(`Downloading part ${partNumber} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
decodeYenc(bodyBuffer, targetBuffer);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
} else {
throw error;
}
} finally {
if (conn) {
release(conn);
}
}
}
async function downloadFile(fileId, numConnections) {
setPoolSize(numConnections);
const db = await getDb();
const file = await db.get('SELECT * FROM files WHERE id = ?', fileId);
@ -16,56 +39,54 @@ async function downloadFile(fileId) {
return;
}
logger.info(`Downloading file: ${file.filename}`);
logger.info(`Downloading file: ${file.filename} with ${numConnections} connections.`);
const messageIds = JSON.parse(file.message_ids);
const sortedParts = Object.entries(messageIds).sort(([a], [b]) => parseInt(a, 10) - parseInt(b, 10));
const parts = [];
let conn;
// Download the first part to get the total file size
const [firstPartNumber, firstSegment] = sortedParts[0];
let firstBodyBuffer;
let conn = await acquire();
try {
conn = await acquire();
await conn.group('alt.binaries.test');
for (const [partNumber, segment] of sortedParts) {
if (!segment || !segment.id) {
logger.error(`Message ID for part ${partNumber} not found.`);
continue;
}
try {
logger.debug(`Downloading part ${partNumber}/${file.parts} with message ID: ${segment.id}`);
const bodyBuffer = (await conn.body(`<${segment.id}>`)).data;
const decodedPart = decodeYenc(bodyBuffer);
parts.push(decodedPart);
} catch (error) {
if (error.code === 430) {
logger.error(`Article not found for part ${partNumber} (Message ID: ${segment.id})`);
} else {
throw error;
}
}
}
} catch (error) {
logger.error('Error downloading file parts:', error);
firstBodyBuffer = (await conn.body(`<${firstSegment.id}>`)).data;
} finally {
if (conn) {
release(conn);
}
release(conn);
}
if (parts.length === file.parts) {
const completeFile = Buffer.concat(parts);
await fs.writeFile(file.filename, completeFile);
logger.info(`File "${file.filename}" downloaded successfully.`);
} else {
logger.error('Could not download all parts of the file.');
const meta = parseYencMeta(firstBodyBuffer);
const totalSize = meta.total;
if (!totalSize) {
throw new Error('Could not determine total file size from yEnc metadata.');
}
const targetBuffer = Buffer.alloc(totalSize);
logger.info(`Allocated buffer of size ${totalSize} for file "${file.filename}".`);
// Decode the first part and write it to the target buffer
decodeYenc(firstBodyBuffer, targetBuffer);
const downloadPromises = sortedParts.slice(1).map(([partNumber, segment]) =>
downloadPart(partNumber, segment, targetBuffer)
);
await Promise.all(downloadPromises);
await fs.writeFile(file.filename, targetBuffer);
logger.info(`File "${file.filename}" downloaded successfully.`);
}
const fileId = parseInt(process.argv[2], 10);
if (isNaN(fileId)) {
const args = process.argv.slice(2);
const fileIdArg = args.find(arg => !arg.startsWith('--'));
const connectionsArg = args.find(arg => arg.startsWith('--connections='));
const fileId = fileIdArg ? parseInt(fileIdArg, 10) : null;
const numConnections = connectionsArg ? parseInt(connectionsArg.split('=')[1], 10) : 10;
if (!fileId || isNaN(fileId)) {
logger.error('Please provide a valid file ID as a command-line argument.');
process.exit(1);
}
downloadFile(fileId).finally(() => shutdown());
downloadFile(fileId, numConnections).finally(() => shutdown());

View File

@ -3,9 +3,15 @@ import log4js from './logger.js';
const logger = log4js.getLogger('pool');
const POOL_SIZE = 5;
const connections = [];
const queue = [];
let POOL_SIZE = 10;
const allConnections = new Set(); // Tracks all connections for proper shutdown
const idleConnections = []; // Tracks available, ready-to-use connections
const waiters = []; // Queue of promises for tasks waiting for a connection
let createdCount = 0; // A counter for total connections created
export const setPoolSize = (size) => {
POOL_SIZE = size;
};
const createConnection = async () => {
const config = {
@ -19,41 +25,46 @@ const createConnection = async () => {
const conn = new NNTP(config.host, 119);
await conn.connect();
await conn.login(config.user, config.password);
allConnections.add(conn); // Add to the set for shutdown tracking
return conn;
};
export const acquire = async () => {
if (connections.length > 0) {
if (idleConnections.length > 0) {
logger.debug('Reusing existing connection from pool.');
return connections.pop();
return idleConnections.pop();
}
if (connections.length + queue.length < POOL_SIZE) {
logger.info('Creating new connection.');
if (createdCount < POOL_SIZE) {
createdCount++;
logger.info(`Creating new connection (${createdCount}/${POOL_SIZE}).`);
return createConnection();
}
logger.info('Waiting for a connection to become available.');
return new Promise(resolve => queue.push(resolve));
logger.info(`Pool maxed out at ${POOL_SIZE}. Waiting for a connection to become available.`);
return new Promise(resolve => waiters.push(resolve));
};
export const release = conn => {
if (queue.length > 0) {
logger.info('Releasing connection to a waiting consumer.');
const resolve = queue.shift();
if (waiters.length > 0) {
logger.debug('Releasing connection directly to a waiting task.');
const resolve = waiters.shift();
resolve(conn);
} else {
logger.debug('Returning connection to the pool.');
connections.push(conn);
logger.debug('Returning connection to the idle pool.');
idleConnections.push(conn);
}
};
export const shutdown = async () => {
logger.info('Shutting down all connections in the pool.');
const allConns = [...connections];
connections.length = 0; // Clear the pool
for (const conn of allConns) {
await conn.quit();
const shutdownPromises = [];
for (const conn of allConnections) {
shutdownPromises.push(conn.quit());
}
await Promise.all(shutdownPromises);
allConnections.clear();
idleConnections.length = 0;
waiters.length = 0;
createdCount = 0;
};