文章

并发请求控制实现深度解析

并发请求控制实现深度解析

掌握并发请求控制的核心实现原理,学会限制并发数量、管理请求队列、实现失败重试,构建健壮的异步请求调度系统。

一、背景与问题

在前端开发中,我们经常需要批量发起多个异步请求:

  • 批量上传文件:同时上传 100 个文件
  • 批量获取数据:并发请求多个接口
  • 图片预加载:同时加载大量图片资源

直接使用 Promise.all() 全部并发存在风险:

  1. 服务器压力过大:瞬间发起数百个请求,可能触发服务端限流或熔断
  2. 浏览器限制:浏览器对同一域名并发数有限制(通常 6 个),超出会排队
  3. 内存峰值:大量并发请求同时占用内存,可能导致页面卡顿
  4. 失败难以控制:一个请求失败可能影响整体流程

解决方案:实现并发请求控制器,限制同时进行的请求数量,队列管理待执行任务。


二、核心概念与定义

2.1 并发控制(Concurrency Control)

控制同时执行的异步任务数量,超出限制的任务进入队列等待。

1
2
3
4
5
6
7
8
9
┌─────────────────────────────────────────────────────────┐
│                    并发控制器                            │
├─────────────────────────────────────────────────────────┤
│  执行中(最大 N 个)    │    等待队列                      │
│  ┌───┐ ┌───┐ ┌───┐   │   ┌───┐ ┌───┐ ┌───┐ ┌───┐    │
│  │ 1 │ │ 2 │ │ 3 │   │   │ 4 │ │ 5 │ │ 6 │ │ 7 │    │
│  └───┘ └───┘ └───┘   │   └───┘ └───┘ └───┘ └───┘    │
│       ↓ 完成          │       ↑ 取出执行                │
└─────────────────────────────────────────────────────────┘

2.2 请求队列(Request Queue)

存储待执行任务的先进先出(FIFO)队列,支持:

  • 入队(enqueue):添加新任务
  • 出队(dequeue):取出下一个任务
  • 清空(clear):清空所有待执行任务

2.3 失败重试(Retry Mechanism)

请求失败时自动重试的策略:

  • 重试次数:最大重试次数限制
  • 重试间隔:指数退避或固定间隔
  • 重试条件:根据错误类型决定是否重试

三、最小示例

3.1 基硎版并发控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ConcurrencyController {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent;  // 最大并发数
    this.running = 0;                     // 当前执行数
    this.queue = [];                      // 等待队列
  }

  // 执行任务
  async run(task) {
    // 如果达到并发上限,进入队列等待
    if (this.running >= this.maxConcurrent) {
      await new Promise(resolve => this.queue.push(resolve));
    }
    
    this.running++;
    try {
      return await task();
    } finally {
      this.running--;
      // 完成一个,取出下一个执行
      if (this.queue.length > 0) {
        const next = this.queue.shift();
        next();  // 唤醒等待的任务
      }
    }
  }
}

// 使用示例
const controller = new ConcurrencyController(3);

const tasks = Array.from({ length: 10 }, (_, i) => 
  () => fetch(`/api/data/${i}`).then(r => r.json())
);

// 所有任务通过控制器执行
Promise.all(tasks.map(task => controller.run(task)));

四、核心知识点拆解

4.1 并发限制的实现原理

核心思想:使用计数器 + 队列实现限流

1
2
3
4
5
6
// 关键代码解析
if (this.running >= this.maxConcurrent) {
  // 创建一个 Promise,将 resolve 存入队列
  // 任务会在这里等待,直到被唤醒
  await new Promise(resolve => this.queue.push(resolve));
}

流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
任务进入
    │
    ▼
running < max ? ──── 否 ──→ 入队等待
    │                        │
    是                       │
    │                        │
    ▼                        │
执行任务                      │
    │                        │
    ▼                        │
任务完成                      │
    │                        │
    ▼                        │
running--                    │
    │                        │
    ▼                        │
队列有任务? ──── 是 ──→ 取出并唤醒 ──┘
    │
    否
    │
    ▼
  结束

4.2 请求队列管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class RequestQueue {
  constructor() {
    this.queue = [];
  }

  // 入队
  enqueue(task, priority = 0) {
    // 支持优先级:高优先级插队
    const item = { task, priority };
    const index = this.queue.findIndex(item => item.priority < priority);
    if (index === -1) {
      this.queue.push(item);
    } else {
      this.queue.splice(index, 0, item);
    }
  }

  // 出队
  dequeue() {
    const item = this.queue.shift();
    return item?.task;
  }

  // 获取队列长度
  get size() {
    return this.queue.length;
  }

  // 清空队列
  clear() {
    this.queue = [];
  }
}

4.3 失败重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async function retryRequest(task, options = {}) {
  const {
    maxRetries = 3,           // 最大重试次数
    delay = 1000,             // 基硎延迟
    backoff = 'exponential',  // 退避策略:'fixed' | 'exponential'
    shouldRetry = () => true  // 重试条件判断
  } = options;

  let lastError;
  
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await task();
    } catch (error) {
      lastError = error;
      
      // 最后一次尝试不再重试
      if (attempt === maxRetries) break;
      
      // 判断是否应该重试
      if (!shouldRetry(error)) break;
      
      // 计算延迟时间
      const waitTime = backoff === 'exponential' 
        ? delay * Math.pow(2, attempt)  // 指数退避:1s, 2s, 4s...
        : delay;                         // 固定延迟
      
      await new Promise(resolve => setTimeout(resolve, waitTime));
    }
  }
  
  throw lastError;
}

// 使用示例
retryRequest(
  () => fetch('/api/data').then(r => r.json()),
  {
    maxRetries: 3,
    delay: 1000,
    backoff: 'exponential',
    shouldRetry: (err) => err.status >= 500  // 5xx 错误才重试
  }
);

4.4 完整版并发调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class ConcurrencyScheduler {
  constructor(options = {}) {
    this.maxConcurrent = options.maxConcurrent || 6;
    this.running = 0;
    this.queue = [];
    this.results = [];
    this.errors = [];
  }

  // 添加任务
  add(task, options = {}) {
    return new Promise((resolve, reject) => {
      this.queue.push({
        task,
        resolve,
        reject,
        retries: options.maxRetries || 0,
        attempt: 0
      });
      this.next();
    });
  }

  // 执行下一个任务
  next() {
    // 达到并发上限或队列为空
    if (this.running >= this.maxConcurrent || this.queue.length === 0) {
      return;
    }

    this.running++;
    const item = this.queue.shift();
    
    this.execute(item)
      .then(result => {
        this.results.push(result);
        item.resolve(result);
      })
      .catch(error => {
        // 重试逻辑
        if (item.attempt < item.retries) {
          item.attempt++;
          this.queue.unshift(item);  // 重新放回队首
        } else {
          this.errors.push(error);
          item.reject(error);
        }
      })
      .finally(() => {
        this.running--;
        this.next();  // 尝试执行下一个
      });
  }

  // 执行单个任务
  async execute(item) {
    return await item.task();
  }

  // 批量添加任务
  async addAll(tasks, options = {}) {
    return Promise.all(
      tasks.map(task => this.add(task, options))
    );
  }

  // 获取状态
  get status() {
    return {
      running: this.running,
      pending: this.queue.length,
      completed: this.results.length,
      failed: this.errors.length
    };
  }
}

// 使用示例
const scheduler = new ConcurrencyScheduler({ maxConcurrent: 3 });

const tasks = urls.map(url => () => fetch(url).then(r => r.json()));

const results = await scheduler.addAll(tasks, { maxRetries: 2 });
console.log(scheduler.status);
// { running: 0, pending: 0, completed: 10, failed: 0 }

五、实战案例

5.1 批量上传文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async function batchUploadFiles(files, options = {}) {
  const { maxConcurrent = 3, onProgress, onSuccess, onError } = options;
  
  const scheduler = new ConcurrencyScheduler({ maxConcurrent });
  
  let completed = 0;
  const total = files.length;
  
  const tasks = files.map((file, index) => async () => {
    const result = await uploadFile(file);
    completed++;
    
    onProgress?.({
      completed,
      total,
      percent: Math.round((completed / total) * 100),
      current: file.name
    });
    
    onSuccess?.(result, index);
    return result;
  });
  
  try {
    return await scheduler.addAll(tasks);
  } catch (error) {
    onError?.(error);
    throw error;
  }
}

// 使用
batchUploadFiles(fileList, {
  maxConcurrent: 3,
  onProgress: (info) => {
    console.log(`进度: ${info.percent}% - ${info.current}`);
  },
  onSuccess: (result, index) => {
    console.log(`文件 ${index} 上传成功`);
  }
});

5.2 图片预加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async function preloadImages(urls, maxConcurrent = 6) {
  const scheduler = new ConcurrencyScheduler({ maxConcurrent });
  
  const tasks = urls.map(url => () => new Promise((resolve, reject) => {
    const img = new Image();
    img.onload = () => resolve(img);
    img.onerror = reject;
    img.src = url;
  }));
  
  return scheduler.addAll(tasks);
}

// 使用
const imageUrls = ['/img/1.jpg', '/img/2.jpg', '/img/3.jpg', ...];
await preloadImages(imageUrls, 6);

5.3 接口批量请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class ApiBatchRequest {
  constructor(baseURL, maxConcurrent = 6) {
    this.baseURL = baseURL;
    this.scheduler = new ConcurrencyScheduler({ maxConcurrent });
  }

  async get(paths, options = {}) {
    const tasks = paths.map(path => () => 
      fetch(`${this.baseURL}${path}`, options)
        .then(r => {
          if (!r.ok) throw new Error(`HTTP ${r.status}`);
          return r.json();
        })
    );
    
    return this.scheduler.addAll(tasks, { maxRetries: 2 });
  }

  async post(endpoints, data) {
    const tasks = endpoints.map(({ path, body }) => () =>
      fetch(`${this.baseURL}${path}`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(body ?? data)
      }).then(r => r.json())
    );
    
    return this.scheduler.addAll(tasks);
  }
}

// 使用
const api = new ApiBatchRequest('https://api.example.com', 4);
const results = await api.get(['/users', '/posts', '/comments']);

六、底层原理

6.1 为什么需要并发控制?

浏览器限制

浏览器HTTP/1.1 同域名并发数HTTP/2 并发数
Chrome6100+
Firefox6100+
Safari6100+

服务端限制

  • Nginx 默认 worker_connections = 1024
  • Node.js 默认 maxSockets = Infinity
  • 云服务 API 限流(如 AWS API Gateway)

6.2 Promise 队列等待原理

1
2
3
4
5
6
7
8
9
10
11
// 核心:利用 Promise 的 resolve 实现等待/唤醒机制

// 等待
const waitPromise = new Promise(resolve => {
  // 将 resolve 存起来,稍后调用
  pendingResolves.push(resolve);
});

// 唤醒
const resolve = pendingResolves.shift();
resolve();  // 调用 resolve,waitPromise 变为 fulfilled

6.3 微任务与宏任务的配合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务使用微任务(Promise.then)
// 延迟重试使用宏任务(setTimeout)

async next() {
  if (this.running >= this.maxConcurrent) return;
  
  this.running++;
  const item = this.queue.shift();
  
  try {
    const result = await item.task();  // 微任务
    item.resolve(result);
  } catch (error) {
    // 延迟重试使用宏任务
    setTimeout(() => {
      this.queue.unshift(item);
      this.next();
    }, this.retryDelay);
  } finally {
    this.running--;
    this.next();  // 微任务调度下一个
  }
}

七、高频面试题解析

Q1:实现一个并发限制为 3 的请求调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
function limitRequest(urls, maxLimit = 3) {
  return new Promise((resolve) => {
    const results = [];
    let index = 0;
    let completed = 0;
    
    async function run() {
      if (index >= urls.length) return;
      
      const i = index++;
      try {
        results[i] = await fetch(urls[i]).then(r => r.json());
      } catch (error) {
        results[i] = { error };
      }
      
      completed++;
      if (completed === urls.length) {
        resolve(results);
      } else {
        run();  // 继续下一个
      }
    }
    
    // 启动 maxLimit 个并发
    for (let i = 0; i < Math.min(maxLimit, urls.length); i++) {
      run();
    }
  });
}

Q2:Promise.all 如何实现并发控制?

Promise.all 本身不控制并发,需要配合调度器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async function promiseAllWithLimit(tasks, limit) {
  const results = [];
  const executing = new Set();
  
  for (const [index, task] of tasks.entries()) {
    const p = Promise.resolve().then(() => task());
    results[index] = p;
    executing.add(p);
    
    const cleanup = () => executing.delete(p);
    p.then(cleanup, cleanup);
    
    if (executing.size >= limit) {
      await Promise.race(executing);
    }
  }
  
  return Promise.all(results);
}

Q3:如何实现请求超时 + 重试?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async function fetchWithRetry(url, options = {}) {
  const { timeout = 5000, maxRetries = 3, retryDelay = 1000 } = options;
  
  for (let i = 0; i <= maxRetries; i++) {
    try {
      const controller = new AbortController();
      const timeoutId = setTimeout(() => controller.abort(), timeout);
      
      const response = await fetch(url, {
        ...options,
        signal: controller.signal
      });
      
      clearTimeout(timeoutId);
      return response;
    } catch (error) {
      if (i === maxRetries) throw error;
      await new Promise(r => setTimeout(r, retryDelay * Math.pow(2, i)));
    }
  }
}

Q4:并发控制与串行执行的区别?

特性并发控制串行执行
执行方式多任务同时执行逐个执行
总耗时≈ 单任务耗时 × (任务数 / 并发数)单任务耗时 × 任务数
资源占用较高较低
适用场景IO 密集型任务顺序依赖任务

八、总结与扩展

核心要点

  1. 并发控制:通过计数器 + 队列限制同时执行的任务数
  2. 队列管理:先进先出,支持优先级插队
  3. 失败重试:指数退避策略,避免雪崩效应
  4. 状态监控:实时获取执行、等待、完成、失败数量

扩展阅读

  • RxJSmergeAll(maxConcurrent) 操作符
  • p-limit:npm 库,轻量并发控制
  • async-pool:ES7 async/await 实现
  • Bottleneck:功能完整的速率限制器

最佳实践

  1. 根据服务端承载能力设置并发数(通常 3-6)
  2. 实现请求超时机制,避免无限等待
  3. 区分可重试错误和不可重试错误
  4. 添加进度回调和错误回调
  5. 大批量任务考虑分批执行

并发请求控制是前端性能优化的重要手段,合理使用可以显著提升批量操作效率,同时保护服务端稳定性。

本文由作者按照 CC BY 4.0 进行授权