Node.js pipe实现源码解析
发布时间 - 2026-01-11 02:45:38 点击率:次从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码
readable.on('readable', (err) => {
if(err) throw err
writable.write(readable.read())
})
为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据
readable.pipe(writable)
现在,就让我们来看看它是如何实现的吧
pipe
首先需要先调用 Readable 的 pipe() 方法
// lib/_stream_readable.js
Readable.prototype.pipe = function(dest, pipeOpts) {
var src = this;
var state = this._readableState;
// 记录 Writable
switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
// ...
src.once('end', endFn);
dest.on('unpipe', onunpipe);
// ...
dest.on('drain', ondrain);
// ...
src.on('data', ondata);
// ...
// 保证 error 事件触发时,onerror 首先被执行
prependListener(dest, 'error', onerror);
// ...
dest.once('close', onclose);
// ...
dest.once('finish', onfinish);
// ...
// 触发 Writable 的 pipe 事件
dest.emit('pipe', src);
// 将 Readable 改为 flow 模式
if (!state.flowing) {
debug('pipe resume');
src.resume();
}
return dest;
};
执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式
传递数据
Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()
ondata() 相关代码:
// lib/_stream_readable.js
// 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
// 详情见 https://github.com/nodejs/node/issues/7278
var increasedAwaitDrain = false;
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
// 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
// 进入 pause 模式
src.pause();
}
}
在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable
此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住
当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发
触发 drain 事件,执行 ondrain()
// lib/_stream_readable.js
var ondrain = pipeOnDrain(src);
function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
// awaitDrain === 0,且有 data 监听器
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
每个 drain 事件触发时,都会减少 awaitDrain,直到 awaitDrain 为 0。此时,调用 flow(src),使 Readable 进入 flow 模式
到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 Writable,直到所有数据写入完成
unpipe
不管写入过程中是否出现错误,最后都会执行 unpipe()
// lib/_stream_readable.js
// ...
function unpipe() {
debug('unpipe');
src.unpipe(dest);
}
// ...
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };
// 啥也没有
if (state.pipesCount === 0)
return this;
// 只有一个
if (state.pipesCount === 1) {
if (dest && dest !== state.pipes)
return this;
// 没有指定就 unpipe 所有
if (!dest)
dest = state.pipes;
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this, unpipeInfo);
return this;
}
// 没有指定就 unpipe 所有
if (!dest) {
var dests = state.pipes;
var len = state.pipesCount;
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}
// 找到指定 Writable,并 unpipe
var index = state.pipes.indexOf(dest);
if (index === -1)
return this;
state.pipes.splice(index, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
dest.emit('unpipe', this, unpipeInfo);
return this;
};
Readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件
unpipe 事件触发后,调用 onunpipe(),清理相关数据
// lib/_stream_readable.js
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
// 清理相关数据
cleanup();
}
}
}
End
在整个 pipe 的过程中,Readable 是主动方 ( 负责整个 pipe 过程:包括数据传递、unpipe 与异常处理 ),Writable 是被动方 ( 只需要触发 drain 事件 )
总结一下 pipe 的过程:
- 首先执行 readbable.pipe(writable),将 readable 与 writable 对接上
- 当 readable 中有数据时,readable.emit('data'),将数据写入 writable
- 如果 writable.write(chunk) 返回 false,则进入 pause 模式,等待 drain 事件触发
- drain 事件全部触发后,再次进入 flow 模式,写入数据
- 不管数据写入完成或发生中断,最后都会调用 unpipe()
- unpipe() 调用 Readable.prototype.unpipe(),触发 dest 的 unpipe 事件,清理相关数据
参考:
https://github.com/nodejs/node/blob/master/lib/_stream_readable.js
https://github.com/nodejs/node/blob/master/lib/_stream_writable.js
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
# Node.js
# pipe实现
# nodejs
# pipe
# 理解nodejs的stream和pipe机制的原理和实现
# JavaScript Reduce使用详解
# 详解JavaScript之Array.reduce源码解读
# JavaScript中reduce()的5个基本用法示例
# JS数组reduce()方法原理及使用技巧解析
# JS数组Reduce方法功能与用法实例详解
# JS数组方法reduce的用法实例分析
# JavaScript reduce和reduceRight详解
# 详解JS中的compose函数和pipe函数用法
# 清零
# 过程中
# 让我们
# 中有
# 它是
# 来看看
# 使其
# 只有一个
# 只需要
# 写到
# 后会
# 这会
# 绑定
# 必须先
# 大家多多
# 如何实现
# 再向
# 就让我们
# 两篇
# 出现错误
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
Laravel如何配置和使用队列处理异步任务_Laravel队列驱动与任务分发实例
三星网站视频制作教程下载,三星w23网页如何全屏?
Laravel如何实现RSS订阅源功能_Laravel动态生成网站XML格式订阅内容【教程】
laravel怎么用DB facade执行原生SQL查询_laravel DB facade原生SQL执行方法
网站制作大概要多少钱一个,做一个平台网站大概多少钱?
如何在七牛云存储上搭建网站并设置自定义域名?
Laravel PHP版本要求一览_Laravel各版本环境要求对照
如何快速查询网站的真实建站时间?
Laravel Seeder怎么填充数据_Laravel数据库填充器的使用方法与技巧
Laravel Debugbar怎么安装_Laravel调试工具栏配置指南
千库网官网入口推荐 千库网设计创意平台入口
如何用免费手机建站系统零基础打造专业网站?
Win11怎么修改DNS服务器 Win11设置DNS加速网络【指南】
如何用y主机助手快速搭建网站?
linux写shell需要注意的问题(必看)
如何在局域网内绑定自建网站域名?
清除minerd进程的简单方法
儿童网站界面设计图片,中国少年儿童教育网站-怎么去注册?
利用python获取某年中每个月的第一天和最后一天
Laravel如何使用Eloquent ORM进行数据库操作?(CRUD示例)
Laravel如何使用Guzzle调用外部接口_Laravel发起HTTP请求与JSON数据解析【详解】
油猴 教程,油猴搜脚本为什么会网页无法显示?
Laravel怎么集成Vue.js_Laravel Mix配置Vue开发环境
如何快速使用云服务器搭建个人网站?
长沙企业网站制作哪家好,长沙水业集团官方网站?
googleplay官方入口在哪里_Google Play官方商店快速入口指南
UC浏览器如何设置启动页 UC浏览器启动页设置方法
如何在阿里云通过域名搭建网站?
Android 常见的图片加载框架详细介绍
Laravel如何实现全文搜索_Laravel Scout集成Algolia或Meilisearch教程
Laravel如何自定义分页视图?(Pagination示例)
如何在IIS中新建站点并配置端口与IP地址?
Win11摄像头无法使用怎么办_Win11相机隐私权限开启教程【详解】
Windows10如何删除恢复分区_Win10 Diskpart命令强制删除分区
php 三元运算符实例详细介绍
小米17系列还有一款新机?主打6.9英寸大直屏和旗舰级影像
中山网站制作网页,中山新生登记系统登记流程?
深圳网站制作的公司有哪些,dido官方网站?
如何在景安服务器上快速搭建个人网站?
Laravel如何实现API速率限制?(Rate Limiting教程)
phpredis提高消息队列的实时性方法(推荐)
香港服务器网站测试全流程:性能评估、SEO加载与移动适配优化
实例解析Array和String方法
Laravel如何实现密码重置功能_Laravel密码找回与重置流程
Linux虚拟化技术教程_KVMQEMU虚拟机安装与调优
MySQL查询结果复制到新表的方法(更新、插入)
Android okhttputils现在进度显示实例代码
Gemini怎么用新功能实时问答_Gemini实时问答使用【步骤】
Claude怎样写约束型提示词_Claude约束提示词写法【教程】
Laravel如何实现一对一模型关联?(Eloquent示例)

