前言
在做 SSR Stream Render 的时候遇到了 Node.js 的 Stream,但是对其总是一知半解。正好最近 ChatGPT 很火,找他学一学吧,没想到真的把我教会了。PS:文末有跟 ChatGPT 的精彩对话(请忽略我稀烂的英语)。
为什么需要 Stream
首先我们通过一个简单的例子来说明一下,使用流的好处。如下所示,我们将一个大文件读取到另一个文件中:
1 | const fs = require('fs') |
通过活动监视器,我们发现该进程内存占用为 300 MB 左右:
如果,我们换成流,情况就不一样了:
1 | const fs = require('fs') |
看来 Stream 在处理大数据的时候是非常好的工具,接下来就让我们通过打比方的方式来进行理解吧。
通过比喻来理解 Stream
Readable Stream
首先,对于 Readable Stream
,我们可以把他比喻成一个水龙头:
水龙头的水来自于哪,需要具体的 Readable Stream
来实现。比如 fs.createReadStream
创建的 Readable Stream
其水源自于文件,process.stdin
水源自于标准输入。
两个状态 flowing 和 paused
水龙头有两个状态 flowing
和 paused
,即龙头打开或关闭。初始化一个 Readable Stream
时,默认是关闭的:
1 | const readStream = fs.createReadStream('./file') |
当我们监听 data
事件时,会自动打开开关:
1 | const readStream = fs.createReadStream('./file') |
且会通知水源往龙头中灌水,这样,水就流到了 data
事件的回调函数中:
我们也可以通过 resume
方法来手动开启水龙头,不过要小心,有可能导致水丢失:
1 | const readStream = fs.createReadStream('./file') |
这就好比先把水龙头打开了,然后再放桶子,肯定会漏掉一些水。
当然,我们也可以调用 pause
关闭水龙头,比如下面这个例子在接收到第一批水后就关闭了水龙头:
1 | const readStream = fs.createReadStream('./big.file') |
buffer
上面代码调用 pause
后水源的水不会停止,会流到水龙头的一个 buffer
中,直到达到 highWaterMark
(最高水位线)则停止:
我们可以通过代码验证一下:
1 | const readStream = fs.createReadStream('./big.file') |
而且,我们可以重新再次打开水龙头,此时会先消耗掉 buffer
中的水,然后再从源头读取,比如下面这个例子(文末 ChatGPT 给的例子也可以):
1 | const readStream = fs.createReadStream('./big.file') |
使用 read 来手动取水
有没有发现,上面这些例子都是水龙头来多少水(即代码中的 chunk
)我们就接多少水,有没有可能我们自己控制接水的多少呢?答案是肯定的,我们可以调用 read
这个方法,比如下面这个例子:
1 | const readStream = fs.createReadStream('./big.file') |
不过,上面的这个代码是读不到数据的。原因在于,read
方法是从 buffer
中读取数据,而此时 buffer
里面还是空的。我们需要这样:
1 | const readStream = fs.createReadStream('./big.file') |
调用 on('readable'...
会触发水源往 buffer
中灌水,当 buffer
中灌满水后,会调用 readable
的回调函数,此时可以通过 read
方法来消费 buffer
中的水。这里有个问题,当我们 read
的数据超过了 buffer
中的怎么办?我们来实验一下:
1 | const readStream = fs.createReadStream('./big.file') |
运行后,控制台打印如下:
1 | Stream is readable (new data received in buffer) |
分析这个日志,我们发现第一次 readable
事件并没有进入 while
循环,且第一次之后 highWaterMark
的值增加了。经过一番源码调试后,我得到了结论,图示如下:
第一次触发 readable
事件,此时 buffer
中的数据为 65536
,而我们需要读取 65537
的数据,数据不够 read
返回 null
。并且发现 read
读取的数据大于 highWaterMark
,所以更新该参数为原来的两倍,即 131072
,然后以该值从水源中再读入一段数据到一个新的节点中 (buffer
是一个链表)。
然后,触发第二次 readable
事件,此时 buffer
数据总长度为 65536 + 131072 = 196608
,我们可以读入两次 65537
的数据。此时 buffer
数据总长度变为 196608 - 2 * 65537 = 65534
,数据又不够了,read
返回 null
,且由于 read
读取的数据小于 highWaterMark
,不需要更新,仍然以原来的值从水源中再读入一段数据到一个新的节点中。
然后,触发第三次 readable
…
除了使用这些已有的 Readable Stream
,Node.js 还支持我们自定义。
自定义 Readable Stream
自定义 Readable Stream
有以下两种方式:
1 | // 1 |
其中,调用 push
就是上面说的“水源往水龙头中注水”的动作,该方法中传入 null
表示水源中的水已注完。
下面,我们写一个可以不断产生大写字母的 Readable Stream
:
1 | const {Readable} = require('stream') |
注意,我们初始化 readStream
的时候传入了 highWaterMark
,这样每次调用 _read
的时候参数就是 1 了。
到此,Readalbe Stream
的核心基本上就介绍完了,接下来介绍 Writable Stream
。
Writable Stream
我们把 Writable Stream
比作一个有入口和出口的池子:
池子的水最终流向哪,需要具体的 Writable Stream
来实现。比如 fs.createWriteStream
创建的 Writable Stream
其水流向文件,process.stdout
水流向标准输出。
两种工作模式
水池也有两种工作模式,一种是入口来的水直接流向出口(此时,相当于在入口和出口间接了一根水管),一种是入口的水先流到池子中(源码中是存在 buffered
这个属性中),出口慢慢进行消费:
我们初始化一个 Writable Stream
时,然后写一些数据试试:
1 | const writeStream = fs.createWriteStream('./file') |
此时,采用的是第二种模式。如何切换成第一种模式呢?可以这样:
1 | const writeStream = fs.createWriteStream('./file') |
通过对比,我想你应该恍然大悟了。第一段代码 writeStream
初始化后,可能出口那边还没有准备好,此时往池子中灌水显然只能先放到池子里。第二段代码是在 writeStream
的 open
事件触发后再往水池中灌水,此时出口已就绪,可以直接流出了。
cork 和 uncork
Writable Stream
还有一个比较有趣的方法是 cork
,即把出口塞住,此时水池的工作模式变为第二种(很显然,出口塞住了,只能先把水灌到池子里)。比如,下面这个例子:
1 | const writeStream = fs.createWriteStream('./file') |
我们可以通过调用 uncork
重新打开出口,比如下面这个例子:
1 | const writeStream = fs.createWriteStream('./file') |
write 的返回值
write
函数是有返回值的,当返回 false
时,表示池子中的水位超过了 highWaterMark
(16 KB),此时正确的做法应该停止继续往池子中注水,等待池子中的水排干了(即触发 drain
事件)再继续注水:
1 | const writeStream = fs.createWriteStream('./file3') |
但是事实上你也可以什么都不管,一直注水:
1 | const writeStream = fs.createWriteStream('./file3') |
这样的后果是机器内存会全部占满,注意这里不会受 Node.js 的运行内存限制,因为 write
的数据最后都会转为 Buffer
。
而且 Node.js 还会自动去刷 buffered
中的数据(这一块没有仔细研究,结论是通过实验得出的),比如上面的代码改成如下这样内存就不会一直增长了:
1 | const writeStream = fs.createWriteStream('./file3') |
这里跟上面不同的是,write
是放在定时器中调用的,这就给了 Node.js 去刷数据的机会。
自定义 Writable Stream
同样的,自定义 Writable Stream
也有两种方式:
1 | // 1 |
比如,我们可以写一个简单的写入文件的 Stream:
1 | class FileWriter extends Writable { |
以上就是 Writable Stream
的核心了,其他方法和事件比较简单,就不过多介绍了。
介绍完这两个东西,接下来我们把他们合起来再讨论讨论。
Readable Stream + Writable Stream
同时讨论这两个东西,最经典的莫过于 pipe
了,比如下面这个代码:
1 | const fs = require('fs') |
起作用就相当于把水龙头和水池用一个管子连起来了:
这样,水就源源不断地从水源处流向目标了。
其原理也是监听了 Readable Stream
的 data
事件,获取到 chunk
写入 Writable Stream
:
1 | src.on('data', ondata) |
不过,从代码中可以看到,pipe
还帮我们处理了当水源放水速度大于水池出水速度的场景。
这种场景下,某一个时刻,水池中的水会超过水池的最高水位线,此时 write
返回 false
,水龙头会 pause
:
当水池中的水流完,会触发 drain
,水龙头会 resume
:
Object Mode
Stream 默认只支持 String 和 Buffer,不过我们也可以改为使用 Object Mode。下面这个例子展示了如何开启 Object Mode:
1 | const {Writable} = require('stream') |
如果改成 false
,则调用 write
的时候会报错:
1 | TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object |
到这里,Stream 的基础知识就介绍得差不多了,不过还有两个比较高级的 Stream 也不能漏了。
Duplex
Duplex
的意思是双重的,即一个 Stream 同时可用作 Readable Stream
和 Writable Stream
。
下面这个例子实现了一个简单的 Duplex
:
1 | const {Duplex} = require('stream') |
上面的代码可以用下面这个图来表示:
其功能就是将输入转换成了大写,然后交给下一个 Writable Stream
。其实对于这种对数据进行转换的功能,Node.js 还提供了一种特别的 Duplex
,叫做 Tranform
。还是上面的例子,用 Tranform
来实现更加简洁:
1 | const {Transform} = require('stream') |
以上就是 Stream 的所有核心内容了,写文章太累了,头都要秃了!如果觉得有帮助的话,求点赞、收藏、转发。
补充内容
和 ChatGPT 讨论技术
参考
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/