如何在异步任务队列中动态维持固定并发数并优先重试失败任务
发布时间 - 2025-12-25 00:00:00 点击率:次本文介绍如何使用 `asyncio.as_completed()` 动态管理可变长度任务队列,始终保持 n 个并发任务运行;当任务失败时立即重新入队并获得最高优先级,确保快速重试,避免批量提交导致的调度延迟。
在实际异步任务调度场景中(如网络请求、文件下载或批量 API 调用),我们常需满足三个关键约束:
- 固定并发上限(例如最多同时运行 8 个任务);
- 失败任务需立即重试且享有最高优先级;
- 结果需按完成顺序实时产出(而非等待整批结束)。
此时,若一次性创建全部 1000 个 Task(如 asyncio.create_task() 批量调用),不仅内存开销大,更会导致失败任务被“埋没”——因为 asyncio.as_completed(task_list) 仅作用于已创建的 Task 对象集合,无法动态插入新任务或调整优先级。
✅ 正确解法是:只维护一个大小可控的活跃任务池(pool),通过循环“取—执行—回收—补充”实现流式调度。核心思路如下:
- 使用字典 task_pool: Dict[asyncio.Task, Coroutine] 映射任务与其原始协程,便于失败后精准重试;
- 每次循环优先填充至目标并发数(如 8);
- 调用 asyncio.as_completed(task_pool.keys()) 获取首个完成任务;
- await 其结果,成功则 yield,失败则将对应协程重新加入待调度队列(推荐用 list.append() 实现 LIFO 高优先级);
- 无论成败,均从池中移除该任务,并在下一轮循环中自动补入新任务。
以下是生产就绪的参考实现(含错误处理与资源清理):
import asyncio from typing importList, Coroutine, Any, Iterator async def run_with_priority_retry( task_coros: List[Coroutine], max_concurrent: int = 8, ) -> Iterator[Any]: """ 动态维持 max_concurrent 个并发任务,失败任务立即重试(LIFO 优先)。 Args: task_coros: 初始任务协程列表(可修改) max_concurrent: 最大并发数 Yields: 成功执行的结果 """ # 使用 list 模拟优先队列:append → 高优;pop() → 取最高优 pending = list(task_coros) task_pool = {} # asyncio.Task -> Coroutine while pending or task_pool: # ✅ 补充任务至满额 while pending and len(task_pool) < max_concurrent: coro = pending.pop() # LIFO:最后加入的最先执行(高优) task = asyncio.create_task(coro) task_pool[task] = coro if not task_pool: break # ✅ 等待任意一个完成(as_completed 返回迭代器,next 即首个) done, _ = await asyncio.wait( task_pool.keys(), return_when=asyncio.FIRST_COMPLETED ) completed_task = done.pop() # ✅ 提取原始协程并清理池 coro = task_pool.pop(completed_task) try: result = await completed_task yield result except Exception as e: print(f"Task failed with {type(e).__name__}: {e} — re-queued with high priority") pending.append(coro) # 失败任务插到末尾,下次 pop 优先执行 # ✅ 清理已完成 task 引用(防止内存泄漏) completed_task.cancel() try: await completed_task except (asyncio.CancelledError, Exception): pass
? 关键注意事项:
- ❗切勿用 asyncio.as_completed(list) 直接传入动态变化的列表——它接收的是快照,不支持运行时增删;应始终基于稳定集合(如 dict.keys())调用。
- ⚠️ 使用 pending.pop() 而非 pending.pop(0) 实现 O(1) 高优入队/出队;若需严格 FIFO 重试,可改用 collections.deque 并调用 appendleft() + pop()。
- ? 务必显式 cancel() 并 await 已完成的 Task,避免悬空对象累积(尤其在长期运行服务中)。
- ? 若任务本身含重试逻辑(如 tenacity),需与本层重试策略对齐,避免重复或冲突。
该模式已被广泛应用于异步爬虫、批量数据同步及微服务熔断降级等场景,兼顾性能、可控性与健壮性。
# app
# ai
# 爬虫
# 异步任务
# 循环
# append
# 并发
# 对象
# 异步
# 重试
# 而非
# 首个
# 的是
# 新任务
# 最多
# 已被
# 并在
# 不支持
# 应用于
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
ChatGPT 4.0官网入口地址 ChatGPT在线体验官网
,怎么在广州志愿者网站注册?
北京网站制作费用多少,建立一个公司网站的费用.有哪些部分,分别要多少钱?
如何在景安服务器上快速搭建个人网站?
百度浏览器网页无法复制文字怎么办 百度浏览器复制修复
如何在不使用负向后查找的情况下匹配特定条件前的换行符
香港服务器网站生成指南:免费资源整合与高速稳定配置方案
微信公众帐号开发教程之图文消息全攻略
Win11怎么关闭专注助手 Win11关闭免打扰模式设置【操作】
简单实现jsp分页
Laravel观察者模式如何使用_Laravel Model Observer配置
Laravel如何配置和使用缓存?(Redis代码示例)
如何使用 jQuery 正确渲染 Instagram 风格的标签列表
Laravel如何发送系统通知?(Notification渠道示例)
微博html5版本怎么弄发语音微博_语音录制入口及时长限制操作【教程】
如何获取免费开源的自助建站系统源码?
如何快速启动建站代理加盟业务?
Laravel如何发送邮件和通知_Laravel邮件与通知系统发送步骤
详解阿里云nginx服务器多站点的配置
Swift中循环语句中的转移语句 break 和 continue
最好的网站制作公司,网购哪个网站口碑最好,推荐几个?谢谢?
如何在阿里云香港服务器快速搭建网站?
Laravel中的withCount方法怎么高效统计关联模型数量
Python文件流缓冲机制_IO性能解析【教程】
网站广告牌制作方法,街上的广告牌,横幅,用PS还是其他软件做的?
Laravel如何记录日志_Laravel Logging系统配置与自定义日志通道
浅谈Javascript中的Label语句
如何快速查询域名建站关键信息?
矢量图网站制作软件,用千图网的一张矢量图做公司app首页,该网站并未说明版权等问题,这样做算不算侵权?应该如何解决?
潮流网站制作头像软件下载,适合母子的网名有哪些?
JS弹性运动实现方法分析
Laravel如何实现多语言支持_Laravel本地化与国际化(i18n)配置教程
php在windows下怎么调试_phpwindows环境调试操作说明【操作】
小米17系列还有一款新机?主打6.9英寸大直屏和旗舰级影像
LinuxShell函数封装方法_脚本复用设计思路【教程】
Laravel如何配置Horizon来管理队列?(安装和使用)
如何用免费手机建站系统零基础打造专业网站?
Laravel如何设置自定义的日志文件名_Laravel根据日期或用户ID生成动态日志【技巧】
如何在服务器上配置二级域名建站?
HTML透明颜色代码怎么让图片透明_给img元素加透明色的技巧【方法】
使用豆包 AI 辅助进行简单网页 HTML 结构设计
Java垃圾回收器的方法和原理总结
JavaScript如何实现路由_前端路由原理是什么
Laravel如何清理系统缓存命令_Laravel清除路由配置及视图缓存的方法【总结】
rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted
高防服务器租用如何选择配置与防御等级?
如何用美橙互联一键搭建多站合一网站?
Laravel如何发送邮件_Laravel Mailables构建与发送邮件的简明教程
Laravel事件监听器怎么写_Laravel Event和Listener使用教程
如何在阿里云服务器自主搭建网站?


List, Coroutine, Any, Iterator
async def run_with_priority_retry(
task_coros: List[Coroutine],
max_concurrent: int = 8,
) -> Iterator[Any]:
"""
动态维持 max_concurrent 个并发任务,失败任务立即重试(LIFO 优先)。
Args:
task_coros: 初始任务协程列表(可修改)
max_concurrent: 最大并发数
Yields:
成功执行的结果
"""
# 使用 list 模拟优先队列:append → 高优;pop() → 取最高优
pending = list(task_coros)
task_pool = {} # asyncio.Task -> Coroutine
while pending or task_pool:
# ✅ 补充任务至满额
while pending and len(task_pool) < max_concurrent:
coro = pending.pop() # LIFO:最后加入的最先执行(高优)
task = asyncio.create_task(coro)
task_pool[task] = coro
if not task_pool:
break
# ✅ 等待任意一个完成(as_completed 返回迭代器,next 即首个)
done, _ = await asyncio.wait(
task_pool.keys(),
return_when=asyncio.FIRST_COMPLETED
)
completed_task = done.pop()
# ✅ 提取原始协程并清理池
coro = task_pool.pop(completed_task)
try:
result = await completed_task
yield result
except Exception as e:
print(f"Task failed with {type(e).__name__}: {e} — re-queued with high priority")
pending.append(coro) # 失败任务插到末尾,下次 pop 优先执行
# ✅ 清理已完成 task 引用(防止内存泄漏)
completed_task.cancel()
try:
await completed_task
except (asyncio.CancelledError, Exception):
pass