掌握并发请求控制的核心实现原理,学会限制并发数量、管理请求队列、实现失败重试,构建健壮的异步请求调度系统。
一、背景与问题
在前端开发中,我们经常需要批量发起多个异步请求:
- 批量上传文件:同时上传 100 个文件
- 批量获取数据:并发请求多个接口
- 图片预加载:同时加载大量图片资源
直接使用 Promise.all() 全部并发存在风险:
- 服务器压力过大:瞬间发起数百个请求,可能触发服务端限流或熔断
- 浏览器限制:浏览器对同一域名并发数有限制(通常 6 个),超出会排队
- 内存峰值:大量并发请求同时占用内存,可能导致页面卡顿
- 失败难以控制:一个请求失败可能影响整体流程
解决方案:实现并发请求控制器,限制同时进行的请求数量,队列管理待执行任务。
二、核心概念与定义
2.1 并发控制(Concurrency Control)
控制同时执行的异步任务数量,超出限制的任务进入队列等待。
1
2
3
4
5
6
7
8
9
| ┌─────────────────────────────────────────────────────────┐
│ 并发控制器 │
├─────────────────────────────────────────────────────────┤
│ 执行中(最大 N 个) │ 等待队列 │
│ ┌───┐ ┌───┐ ┌───┐ │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
│ │ 1 │ │ 2 │ │ 3 │ │ │ 4 │ │ 5 │ │ 6 │ │ 7 │ │
│ └───┘ └───┘ └───┘ │ └───┘ └───┘ └───┘ └───┘ │
│ ↓ 完成 │ ↑ 取出执行 │
└─────────────────────────────────────────────────────────┘
|
2.2 请求队列(Request Queue)
存储待执行任务的先进先出(FIFO)队列,支持:
- 入队(enqueue):添加新任务
- 出队(dequeue):取出下一个任务
- 清空(clear):清空所有待执行任务
2.3 失败重试(Retry Mechanism)
请求失败时自动重试的策略:
- 重试次数:最大重试次数限制
- 重试间隔:指数退避或固定间隔
- 重试条件:根据错误类型决定是否重试
三、最小示例
3.1 基硎版并发控制器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| class ConcurrencyController {
constructor(maxConcurrent) {
this.maxConcurrent = maxConcurrent; // 最大并发数
this.running = 0; // 当前执行数
this.queue = []; // 等待队列
}
// 执行任务
async run(task) {
// 如果达到并发上限,进入队列等待
if (this.running >= this.maxConcurrent) {
await new Promise(resolve => this.queue.push(resolve));
}
this.running++;
try {
return await task();
} finally {
this.running--;
// 完成一个,取出下一个执行
if (this.queue.length > 0) {
const next = this.queue.shift();
next(); // 唤醒等待的任务
}
}
}
}
// 使用示例
const controller = new ConcurrencyController(3);
const tasks = Array.from({ length: 10 }, (_, i) =>
() => fetch(`/api/data/${i}`).then(r => r.json())
);
// 所有任务通过控制器执行
Promise.all(tasks.map(task => controller.run(task)));
|
四、核心知识点拆解
4.1 并发限制的实现原理
核心思想:使用计数器 + 队列实现限流
1
2
3
4
5
6
| // 关键代码解析
if (this.running >= this.maxConcurrent) {
// 创建一个 Promise,将 resolve 存入队列
// 任务会在这里等待,直到被唤醒
await new Promise(resolve => this.queue.push(resolve));
}
|
流程图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| 任务进入
│
▼
running < max ? ──── 否 ──→ 入队等待
│ │
是 │
│ │
▼ │
执行任务 │
│ │
▼ │
任务完成 │
│ │
▼ │
running-- │
│ │
▼ │
队列有任务? ──── 是 ──→ 取出并唤醒 ──┘
│
否
│
▼
结束
|
4.2 请求队列管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| class RequestQueue {
constructor() {
this.queue = [];
}
// 入队
enqueue(task, priority = 0) {
// 支持优先级:高优先级插队
const item = { task, priority };
const index = this.queue.findIndex(item => item.priority < priority);
if (index === -1) {
this.queue.push(item);
} else {
this.queue.splice(index, 0, item);
}
}
// 出队
dequeue() {
const item = this.queue.shift();
return item?.task;
}
// 获取队列长度
get size() {
return this.queue.length;
}
// 清空队列
clear() {
this.queue = [];
}
}
|
4.3 失败重试机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| async function retryRequest(task, options = {}) {
const {
maxRetries = 3, // 最大重试次数
delay = 1000, // 基硎延迟
backoff = 'exponential', // 退避策略:'fixed' | 'exponential'
shouldRetry = () => true // 重试条件判断
} = options;
let lastError;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await task();
} catch (error) {
lastError = error;
// 最后一次尝试不再重试
if (attempt === maxRetries) break;
// 判断是否应该重试
if (!shouldRetry(error)) break;
// 计算延迟时间
const waitTime = backoff === 'exponential'
? delay * Math.pow(2, attempt) // 指数退避:1s, 2s, 4s...
: delay; // 固定延迟
await new Promise(resolve => setTimeout(resolve, waitTime));
}
}
throw lastError;
}
// 使用示例
retryRequest(
() => fetch('/api/data').then(r => r.json()),
{
maxRetries: 3,
delay: 1000,
backoff: 'exponential',
shouldRetry: (err) => err.status >= 500 // 5xx 错误才重试
}
);
|
4.4 完整版并发调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| class ConcurrencyScheduler {
constructor(options = {}) {
this.maxConcurrent = options.maxConcurrent || 6;
this.running = 0;
this.queue = [];
this.results = [];
this.errors = [];
}
// 添加任务
add(task, options = {}) {
return new Promise((resolve, reject) => {
this.queue.push({
task,
resolve,
reject,
retries: options.maxRetries || 0,
attempt: 0
});
this.next();
});
}
// 执行下一个任务
next() {
// 达到并发上限或队列为空
if (this.running >= this.maxConcurrent || this.queue.length === 0) {
return;
}
this.running++;
const item = this.queue.shift();
this.execute(item)
.then(result => {
this.results.push(result);
item.resolve(result);
})
.catch(error => {
// 重试逻辑
if (item.attempt < item.retries) {
item.attempt++;
this.queue.unshift(item); // 重新放回队首
} else {
this.errors.push(error);
item.reject(error);
}
})
.finally(() => {
this.running--;
this.next(); // 尝试执行下一个
});
}
// 执行单个任务
async execute(item) {
return await item.task();
}
// 批量添加任务
async addAll(tasks, options = {}) {
return Promise.all(
tasks.map(task => this.add(task, options))
);
}
// 获取状态
get status() {
return {
running: this.running,
pending: this.queue.length,
completed: this.results.length,
failed: this.errors.length
};
}
}
// 使用示例
const scheduler = new ConcurrencyScheduler({ maxConcurrent: 3 });
const tasks = urls.map(url => () => fetch(url).then(r => r.json()));
const results = await scheduler.addAll(tasks, { maxRetries: 2 });
console.log(scheduler.status);
// { running: 0, pending: 0, completed: 10, failed: 0 }
|
五、实战案例
5.1 批量上传文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| async function batchUploadFiles(files, options = {}) {
const { maxConcurrent = 3, onProgress, onSuccess, onError } = options;
const scheduler = new ConcurrencyScheduler({ maxConcurrent });
let completed = 0;
const total = files.length;
const tasks = files.map((file, index) => async () => {
const result = await uploadFile(file);
completed++;
onProgress?.({
completed,
total,
percent: Math.round((completed / total) * 100),
current: file.name
});
onSuccess?.(result, index);
return result;
});
try {
return await scheduler.addAll(tasks);
} catch (error) {
onError?.(error);
throw error;
}
}
// 使用
batchUploadFiles(fileList, {
maxConcurrent: 3,
onProgress: (info) => {
console.log(`进度: ${info.percent}% - ${info.current}`);
},
onSuccess: (result, index) => {
console.log(`文件 ${index} 上传成功`);
}
});
|
5.2 图片预加载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| async function preloadImages(urls, maxConcurrent = 6) {
const scheduler = new ConcurrencyScheduler({ maxConcurrent });
const tasks = urls.map(url => () => new Promise((resolve, reject) => {
const img = new Image();
img.onload = () => resolve(img);
img.onerror = reject;
img.src = url;
}));
return scheduler.addAll(tasks);
}
// 使用
const imageUrls = ['/img/1.jpg', '/img/2.jpg', '/img/3.jpg', ...];
await preloadImages(imageUrls, 6);
|
5.3 接口批量请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| class ApiBatchRequest {
constructor(baseURL, maxConcurrent = 6) {
this.baseURL = baseURL;
this.scheduler = new ConcurrencyScheduler({ maxConcurrent });
}
async get(paths, options = {}) {
const tasks = paths.map(path => () =>
fetch(`${this.baseURL}${path}`, options)
.then(r => {
if (!r.ok) throw new Error(`HTTP ${r.status}`);
return r.json();
})
);
return this.scheduler.addAll(tasks, { maxRetries: 2 });
}
async post(endpoints, data) {
const tasks = endpoints.map(({ path, body }) => () =>
fetch(`${this.baseURL}${path}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body ?? data)
}).then(r => r.json())
);
return this.scheduler.addAll(tasks);
}
}
// 使用
const api = new ApiBatchRequest('https://api.example.com', 4);
const results = await api.get(['/users', '/posts', '/comments']);
|
六、底层原理
6.1 为什么需要并发控制?
浏览器限制:
| 浏览器 | HTTP/1.1 同域名并发数 | HTTP/2 并发数 |
|---|
| Chrome | 6 | 100+ |
| Firefox | 6 | 100+ |
| Safari | 6 | 100+ |
服务端限制:
- Nginx 默认
worker_connections = 1024 - Node.js 默认
maxSockets = Infinity - 云服务 API 限流(如 AWS API Gateway)
6.2 Promise 队列等待原理
1
2
3
4
5
6
7
8
9
10
11
| // 核心:利用 Promise 的 resolve 实现等待/唤醒机制
// 等待
const waitPromise = new Promise(resolve => {
// 将 resolve 存起来,稍后调用
pendingResolves.push(resolve);
});
// 唤醒
const resolve = pendingResolves.shift();
resolve(); // 调用 resolve,waitPromise 变为 fulfilled
|
6.3 微任务与宏任务的配合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // 执行任务使用微任务(Promise.then)
// 延迟重试使用宏任务(setTimeout)
async next() {
if (this.running >= this.maxConcurrent) return;
this.running++;
const item = this.queue.shift();
try {
const result = await item.task(); // 微任务
item.resolve(result);
} catch (error) {
// 延迟重试使用宏任务
setTimeout(() => {
this.queue.unshift(item);
this.next();
}, this.retryDelay);
} finally {
this.running--;
this.next(); // 微任务调度下一个
}
}
|
七、高频面试题解析
Q1:实现一个并发限制为 3 的请求调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| function limitRequest(urls, maxLimit = 3) {
return new Promise((resolve) => {
const results = [];
let index = 0;
let completed = 0;
async function run() {
if (index >= urls.length) return;
const i = index++;
try {
results[i] = await fetch(urls[i]).then(r => r.json());
} catch (error) {
results[i] = { error };
}
completed++;
if (completed === urls.length) {
resolve(results);
} else {
run(); // 继续下一个
}
}
// 启动 maxLimit 个并发
for (let i = 0; i < Math.min(maxLimit, urls.length); i++) {
run();
}
});
}
|
Q2:Promise.all 如何实现并发控制?
Promise.all 本身不控制并发,需要配合调度器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| async function promiseAllWithLimit(tasks, limit) {
const results = [];
const executing = new Set();
for (const [index, task] of tasks.entries()) {
const p = Promise.resolve().then(() => task());
results[index] = p;
executing.add(p);
const cleanup = () => executing.delete(p);
p.then(cleanup, cleanup);
if (executing.size >= limit) {
await Promise.race(executing);
}
}
return Promise.all(results);
}
|
Q3:如何实现请求超时 + 重试?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| async function fetchWithRetry(url, options = {}) {
const { timeout = 5000, maxRetries = 3, retryDelay = 1000 } = options;
for (let i = 0; i <= maxRetries; i++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
...options,
signal: controller.signal
});
clearTimeout(timeoutId);
return response;
} catch (error) {
if (i === maxRetries) throw error;
await new Promise(r => setTimeout(r, retryDelay * Math.pow(2, i)));
}
}
}
|
Q4:并发控制与串行执行的区别?
| 特性 | 并发控制 | 串行执行 |
|---|
| 执行方式 | 多任务同时执行 | 逐个执行 |
| 总耗时 | ≈ 单任务耗时 × (任务数 / 并发数) | 单任务耗时 × 任务数 |
| 资源占用 | 较高 | 较低 |
| 适用场景 | IO 密集型任务 | 顺序依赖任务 |
八、总结与扩展
核心要点
- 并发控制:通过计数器 + 队列限制同时执行的任务数
- 队列管理:先进先出,支持优先级插队
- 失败重试:指数退避策略,避免雪崩效应
- 状态监控:实时获取执行、等待、完成、失败数量
扩展阅读
- RxJS:
mergeAll(maxConcurrent) 操作符 - p-limit:npm 库,轻量并发控制
- async-pool:ES7 async/await 实现
- Bottleneck:功能完整的速率限制器
最佳实践
- 根据服务端承载能力设置并发数(通常 3-6)
- 实现请求超时机制,避免无限等待
- 区分可重试错误和不可重试错误
- 添加进度回调和错误回调
- 大批量任务考虑分批执行
并发请求控制是前端性能优化的重要手段,合理使用可以显著提升批量操作效率,同时保护服务端稳定性。