Concurrent Queue and Semaphore Pattern
The Image Upload That Crashed Chrome
A developer built a photo gallery upload feature. Users can drop 200 photos at once:
async function uploadAll(files) {
await Promise.all(files.map(file => uploadFile(file)));
}
The browser opens 200 HTTP connections simultaneously. Memory spikes to 2GB. Chrome runs out of memory and crashes the tab. Even if it survived, the server would reject most connections — APIs typically allow 6-10 concurrent requests per client.
The fix: limit concurrency. Upload 5 files at a time, not 200. This is the fundamental problem concurrent queues and semaphores solve.
Why You Need Concurrency Control
Think of concurrency control like a theme park ride. The ride fits 5 people. A hundred people are in line. You don't let all 100 rush the ride at once — you let 5 in, wait for them to finish, let the next 5 in. A semaphore is the ride operator counting people in and out.
You need concurrency limits when:
- Rate limits: APIs cap requests per second (100/sec is common)
- Memory: Each in-flight request holds data in memory (image uploads, large payloads)
- Browser limits: Browsers cap concurrent HTTP connections per domain (~6 for HTTP/1.1)
- Server health: Too many concurrent requests from one client can degrade service for everyone
- User experience: Processing too many things at once starves the main thread
The Semaphore Pattern
A semaphore is a counter that controls access to a shared resource. It has two operations: acquire (decrement the counter, wait if zero) and release (increment the counter, wake a waiter).
class Semaphore {
#permits;
#queue = [];
constructor(permits) {
this.#permits = permits;
}
acquire() {
if (this.#permits > 0) {
this.#permits--;
return Promise.resolve();
}
const { promise, resolve } = Promise.withResolvers();
this.#queue.push(resolve);
return promise;
}
release() {
const next = this.#queue.shift();
if (next) {
next();
} else {
this.#permits++;
}
}
}
Usage:
const semaphore = new Semaphore(3);
async function limitedFetch(url) {
await semaphore.acquire();
try {
return await fetch(url);
} finally {
semaphore.release();
}
}
await Promise.all(urls.map(url => limitedFetch(url)));
With 100 URLs and a semaphore of 3, only 3 fetches are active at any time. When one finishes, the next URL starts. Clean, simple, correct.
Concurrent Queue With Max Parallelism
The semaphore pattern works, but a purpose-built concurrent queue is often more ergonomic:
class ConcurrentQueue {
#concurrency;
#running = 0;
#queue = [];
constructor(concurrency) {
this.#concurrency = concurrency;
}
add(task) {
return new Promise((resolve, reject) => {
this.#queue.push({ task, resolve, reject });
this.#run();
});
}
#run() {
while (this.#running < this.#concurrency && this.#queue.length > 0) {
const { task, resolve, reject } = this.#queue.shift();
this.#running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.#running--;
this.#run();
});
}
}
get pending() {
return this.#queue.length;
}
get active() {
return this.#running;
}
}
Usage:
const queue = new ConcurrentQueue(5);
async function uploadPhotos(files) {
const results = await Promise.all(
files.map(file =>
queue.add(() => uploadFile(file))
)
);
return results;
}
The queue ensures at most 5 uploads run simultaneously. Each caller gets back a promise that resolves when their specific task completes. The queue handles scheduling internally.
Adding Progress Tracking
For file uploads and batch operations, users need progress feedback:
class TrackedQueue extends ConcurrentQueue {
#total = 0;
#completed = 0;
#failed = 0;
#onProgress;
constructor(concurrency, onProgress) {
super(concurrency);
this.#onProgress = onProgress;
}
add(task) {
this.#total++;
this.#notifyProgress();
return super.add(task)
.then(result => {
this.#completed++;
this.#notifyProgress();
return result;
})
.catch(err => {
this.#failed++;
this.#notifyProgress();
throw err;
});
}
#notifyProgress() {
this.#onProgress?.({
total: this.#total,
completed: this.#completed,
failed: this.#failed,
active: this.active,
pending: this.pending,
});
}
}
const queue = new TrackedQueue(3, (progress) => {
progressBar.value = (progress.completed + progress.failed) / progress.total;
statusText.textContent =
`${progress.completed}/${progress.total} complete, ${progress.failed} failed`;
});
Priority Queues for Async Tasks
Not all tasks are equally urgent. A priority queue processes high-priority items first:
class PriorityQueue {
#concurrency;
#running = 0;
#queues = { high: [], normal: [], low: [] };
constructor(concurrency) {
this.#concurrency = concurrency;
}
add(task, priority = 'normal') {
return new Promise((resolve, reject) => {
this.#queues[priority].push({ task, resolve, reject });
this.#run();
});
}
#nextTask() {
for (const priority of ['high', 'normal', 'low']) {
if (this.#queues[priority].length > 0) {
return this.#queues[priority].shift();
}
}
return null;
}
#run() {
while (this.#running < this.#concurrency) {
const item = this.#nextTask();
if (!item) break;
const { task, resolve, reject } = item;
this.#running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.#running--;
this.#run();
});
}
}
}
Usage:
const queue = new PriorityQueue(3);
queue.add(() => fetch('/api/analytics'), 'low');
queue.add(() => fetch('/api/user-data'), 'high');
queue.add(() => fetch('/api/recommendations'), 'normal');
queue.add(() => fetch('/api/notifications'), 'high');
User data and notifications jump the line ahead of analytics and recommendations. The user sees critical data first.
Real-World: Image Upload Queue
Here's a complete upload queue with retry, progress, and cancellation:
function createUploadQueue(options = {}) {
const {
concurrency = 3,
maxRetries = 2,
onProgress,
signal,
} = options;
const queue = new ConcurrentQueue(concurrency);
let completed = 0;
let failed = 0;
let total = 0;
function uploadFile(file) {
total++;
return queue.add(async () => {
signal?.throwIfAborted();
let lastError;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
signal,
});
if (!response.ok) throw new Error(`Upload failed: ${response.status}`);
const result = await response.json();
completed++;
onProgress?.({ completed, failed, total, file: file.name });
return result;
} catch (err) {
lastError = err;
if (err.name === 'AbortError') throw err;
}
}
failed++;
onProgress?.({ completed, failed, total, file: file.name });
throw lastError;
});
}
return { uploadFile };
}
const controller = new AbortController();
const uploader = createUploadQueue({
concurrency: 5,
onProgress: ({ completed, total }) => {
console.log(`${completed}/${total} uploaded`);
},
signal: controller.signal,
});
const results = await Promise.allSettled(
files.map(file => uploader.uploadFile(file))
);
Real-World: API Batch Processing
When you need to process thousands of items through an API with rate limits:
async function batchProcess(items, processFn, options = {}) {
const {
concurrency = 10,
batchSize = 50,
delayBetweenBatches = 1000,
} = options;
const queue = new ConcurrentQueue(concurrency);
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.allSettled(
batch.map(item => queue.add(() => processFn(item)))
);
results.push(...batchResults);
const hasMore = i + batchSize < items.length;
if (hasMore && delayBetweenBatches > 0) {
await new Promise(r => setTimeout(r, delayBetweenBatches));
}
}
return results;
}
const results = await batchProcess(
userIds,
async (userId) => {
const response = await fetch(`/api/users/${userId}/sync`, { method: 'POST' });
return response.json();
},
{ concurrency: 5, batchSize: 100, delayBetweenBatches: 2000 }
);
The batchSize and delayBetweenBatches parameters serve different purposes than concurrency. Concurrency limits how many requests are in-flight at once (protecting browser and server resources). Batch size groups items for progress reporting and checkpointing. Delay between batches adds a global rate limit to stay under API quotas. A well-tuned system uses all three: concurrency of 10 for throughput, batches of 100 for progress tracking, and 2-second delays between batches to stay under 3000 requests/minute API limits.
The map-with-concurrency Utility
For simple cases where you just need Promise.all with a concurrency limit, here's the minimal utility:
async function mapConcurrent(items, fn, concurrency) {
const results = new Array(items.length);
let index = 0;
async function worker() {
while (index < items.length) {
const i = index++;
results[i] = await fn(items[i], i);
}
}
const workers = Array.from({ length: concurrency }, () => worker());
await Promise.all(workers);
return results;
}
const thumbnails = await mapConcurrent(
images,
(image) => generateThumbnail(image),
4
);
This spawns N workers that each pull from a shared index. Results are placed at the correct indices, so the output order matches the input order — just like Promise.all.
There's a subtle error handling gap here: if one worker throws, Promise.all rejects with that error, but the other workers keep running and pulling new items from the shared index. They'll continue processing (and potentially failing) even after the caller has moved on. For production use, add an aborted flag that workers check before picking up the next item, so a single failure stops the whole batch cleanly.
| What developers do | What they should do |
|---|---|
| Promise.all(items.map(fn)) with unbounded concurrency Unbounded concurrency exhausts memory (each in-flight request holds data), hits browser connection limits, and can overwhelm servers or trigger rate limiting | Use a concurrent queue or semaphore to limit in-flight operations |
| Forgetting to release semaphore permits on error If an error skips the release call, that permit is permanently consumed. After enough errors, the semaphore deadlocks — no new tasks can ever start | Always release in a finally block |
| Fixed concurrency for all environments Mobile devices have less memory and slower connections. A concurrency of 10 that works on desktop may crash mobile browsers. Read API rate limit headers and adjust dynamically | Adapt concurrency based on context (mobile: lower, desktop: higher, API rate limit: match it) |
| Using priority queues without starvation prevention If high-priority tasks keep arriving, low-priority tasks wait forever. Add a maximum wait time after which tasks get promoted | Add aging or ensure low-priority tasks eventually run |
Design a task scheduler for a web-based IDE that runs ESLint, TypeScript checking, and Prettier simultaneously across multiple files. The constraints: no more than 4 concurrent operations (to avoid pegging the CPU), TypeScript checking should have higher priority than linting, and the user should see real-time progress. Describe the data structures and scheduling algorithm you'd use.