博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
NODE Stream流总结(2)
阅读量:6655 次
发布时间:2019-06-25

本文共 11146 字,大约阅读时间需要 37 分钟。

在中我们详细介绍了Node中的可读流(ReadStream),今天我们继续介绍Node中其他的流。

可写流(Writable Stream)

可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应,所有 Writable 流都实现了 stream.Writable 类定义的接口。例如:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

可写流的原理其实与可读流类似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,当然即使缓存池满了,剩余的数据也是存在内存中,不会丢失,当缓存池中满了并且最后缓存中的数据全部写入之后,可写流会发送一个drain消息。

使用可以参考一下这段代码

let fs = require('fs');let ws = fs.createWriteStream('./1.txt', {    flags: 'w',    mode: 0o666,    autoClose: true,    highWaterMark: 3, // 默认写是16k    encoding: 'utf8',    start: 0});// 写入的数据必须是字符串或者buffer// flag代表是否能继续写,但是返回false也不会丢失,就是会把内容放到内存中,当文件被清空的时候才会改成truelet flag = ws.write(1 + '', 'utf8', () => {}); // 异步的方法console.log(flag);flag = ws.write(1 + '', 'utf8', () => {}); // 异步的方法console.log(flag);ws.end('ok'); // 当写完后 就不能再继续写了ws.write('123'); // write after end// 抽干方法 当都写入完后会触发drain事件// 必须缓存区满了 满了后被清空了才会出发drainws.on('drain', function() {    console.log('drain')});复制代码

可写流实现

现在就让我们来实现一个简单的可写流,可写流很多部分可以复用可读流的逻辑,都需要有一个构造函数来定义一些基本选项属性,然后调用一个open放法打开文件,并且有一个destroy方法来处理关闭逻辑

let EventEmitter = require('events');let fs = require('fs');class WriteStream extends EventEmitter {    constructor(path,options) {        super();        this.path = path;        this.highWaterMark = options.highWaterMark || 16 * 1024;        this.autoClose = options.autoClose || true;        this.mode = options.mode;        this.start = options.start || 0;        this.flags = options.flags || 'w';        this.encoding = options.encoding || 'utf8';        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中        // 在源码中是一个链表 => []        this.buffers = [];        // 标识 是否正在写入        this.writing = false;        // 是否满足触发drain事件        this.needDrain = false;        // 记录写入的位置        this.pos = 0;        // 记录缓存区的大小        this.length = 0;        this.open();    }        destroy() {        if (typeof this.fd !== 'number') {            return this.emit('close');        }        fs.close(this.fd, () => {            this.emit('close')        });    }        open() {        fs.open(this.path, this.flags, this.mode, (err,fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                    this.destroy();                }                return;            }            this.fd = fd;            this.emit('open');        })    }}module.exports = WriteStream;复制代码

接着我们就需要实现一个write函数来让可写流对象调用,在write方法中我们首先将数据转化为buffer,接着判断传入的数据是否大于缓存区大小,如果大于的话则代表我们已经达到了drain事件的第一个条件,接着就要判断现在是否正在将数据写入文件中,如果并没写入进行状态的话那么我们就定义一个_write方法去做写入的动作,否则则代表文件正在写入,那我们就将流传来的数据先放在缓存区中,这样做就是为了确保不会出现同时间往文件写数据的情况,保证了写入数据的有序性

write(chunk, encoding=this.encoding, callback=()=>{}) {    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);    // write 返回一个boolean类型     this.length += chunk.length;     let ret = this.length < this.highWaterMark; // 比较是否达到了缓存区的大小    this.needDrain = !ret; // 是否需要触发needDrain    // 判断是否正在写入 如果是正在写入 就写入到缓存区中    if (this.writing) {        this.buffers.push({            encoding,            chunk,            callback        });    } else {        // 专门用来将内容 写入到文件内        this.writing = true;        this._write(chunk, encoding, () => {            callback();            this.clearBuffer();        });    }    return ret;}复制代码

接着我们就来实现_write方法,这个方法类似于读流中的read方法,我们需要先判断是否获取到了文件描述符fd,确保获取之后再调用fs模块的写入方法,而在写入之后的回调中我们会调用传入_write方法中的一个回调函数clearBuffer,这个方法会去buffers中继续递归地把数据取出,然后调用_write方法去写入,直到全部buffer中的数据取出后,首先我们需要将正在写入状态改成否,这样之后再有write调用就会直接往文件写入,接着我们就需要根据前面drain事件的第一个条件触发与否来决定是否要触发drain事件,如果前面条件满足,即缓存区被填满过,那么此时当我们清空完缓存区之后就需要触发drain事件

clearBuffer() {    let buffer = this.buffers.shift();    if (buffer) {        this._write(buffer.chunk, buffer.encoding, () => {            buffer.callback();            this.clearBuffer()        });    } else {        this.writing = false;        if (this.needDrain) { // 是否需要触发drain 需要就发射drain事件            this.needDrain = false;            this.emit('drain');        }    }}_write(chunk, encoding, callback) {    if (typeof this.fd !== 'number') {        return this.once('open', () => this._write(chunk, encoding, callback));    }    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err,byteWritten) => {        this.length -= byteWritten;        this.pos += byteWritten;                callback(); // 清空缓存区的内容    });}复制代码

最后附上完整的代码

let EventEmitter = require('events');let fs = require('fs');class WriteStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.highWaterMark = options.highWaterMark || 16*1024;        this.autoClose = options.autoClose || true;        this.mode = options.mode;        this.start = options.start || 0;        this.flags = options.flags || 'w';        this.encoding = options.encoding || 'utf8';        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中        // 在源码中是一个链表 => []        this.buffers = [];        // 标识 是否正在写入        this.writing = false;        // 是否满足触发drain事件        this.needDrain = false;        // 记录写入的位置        this.pos = 0;        // 记录缓存区的大小        this.length = 0;        this.open();    }        destroy() {        if (typeof this.fd !== 'number') {            return this.emit('close');        }        fs.close(this.fd, () => {            this.emit('close');        })    }        open() {        fs.open(this.path, this.flags, this.mode, (err,fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                    this.destroy();                }                return;            }            this.fd = fd;            this.emit('open');        })    }        write(chunk, encoding=this.encoding, callback=()=>{}) {        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);        // write 返回一个boolean类型         this.length += chunk.length;         let ret = this.length < this.highWaterMark; // 比较是否达到了缓存区的大小        this.needDrain = !ret; // 是否需要触发needDrain        // 判断是否正在写入 如果是正在写入 就写入到缓存区中        if (this.writing) {            this.buffers.push({                encoding,                chunk,                callback            });        } else {            // 专门用来将内容 写入到文件内            this.writing = true;            this._write(chunk, encoding, () => {                callback();                this.clearBuffer();            });        }        return ret;    }        clearBuffer() {        let buffer = this.buffers.shift();        if (buffer) {            this._write(buffer.chunk, buffer.encoding, () => {                buffer.callback();                this.clearBuffer()            });        } else {            this.writing = false;            if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件                this.needDrain = false;                this.emit('drain');            }        }    }        _write(chunk, encoding, callback){        if (typeof this.fd !== 'number') {            return this.once('open', () => this._write(chunk, encoding, callback));        }        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err,byteWritten) => {            this.length -= byteWritten;            this.pos += byteWritten;                        callback(); // 清空缓存区的内容        });    }}module.exports = WriteStream;复制代码

Pipe

在了解了可读流与可写流之后,那我们在实际使用流的流程又是怎么样的呢,其实就是在读流不断读取文件内容的同时,我们的写流会将读流读到的数据写入到另一个文件中,这样做的好处就是避免了直接用fs的读写文件时会占用大量内存来存放中间转化的数据。另外在Node的socket和http中也会用到这种概念,所以在流中就会有一个pipe方法帮助我们来实现这种读一点写一点的情况。

使用可以参考一下这段代码

let fs = require('fs');let path = require('path');let ReadStream = require('./ReadStream');let WriteStream = require('./WriteStream');let rs = new ReadStream(path.join(__dirname, './1.txt'), {    highWaterMark: 4});let ws = new WriteStream(path.join(__dirname, './2.txt'), {    highWaterMark: 1});// 4 1rs.pipe(ws); 复制代码

接下来就让我们在我们自己实现的读流模块中实现一下pipe这个方法,背后就是我们会监听读流的data事件来持续获取文件中的数据,然后我们就会去调用写流的write方法,如前面所说,每次调用write方法都会返回一个标记告诉我们写流缓存区是否已满,那么当我们得到缓存区已满的信号后就会调用读流的pause方法来暂停读取,然后等到写流的缓存区已经全部写入并且触发drain事件时,我们就会调用resume重新开启读取的流程

pipe(ws) {    this.on('data', (chunk) => {        let flag = ws.write(chunk);        if (!flag) {            this.pause();        }    });    ws.on('drain', () => {        this.resume();    })}复制代码

自定义流

Node自带的读流和写流模块其实都是继承于stream模块中的接口,读流继承于Readable接口,写流则继承于Writable接口,所以我们其实是可以自定义一个流模块,只要继承stream模块对应的接口即可。

  1. 自定义读流

    如果我们要自定义读流的话,那我们就需要继承Readable,Readable里面有一个read()方法,默认调用_read(),所以我们只要复写了_read()方法就可实现读取的逻辑,同时Readable中也提供了一个push方法,调用push方法就会触发data事件,push中的参数就是data事件回调函数的参数,当push传入的参数为null的时候就代表读流停止

    自定义的方法可以参考如下代码

    let { Readable } = require('stream');// 想实现什么流 就继承这个流// Readable里面有一个read()方法,默认掉_read()// Readable中提供了一个push方法你调用push方法就会触发data事件let index = 9;class MyRead extends Readable {    _read() {        // 可读流什么时候停止呢? 当push null的时候停止        if (index-- > 0) return this.push('123');        this.push(null);    }}let mr = new MyRead();mr.on('data', function(data) {    console.log(data);});复制代码
  2. 自定义写流

    与自定义读流类似,自定义写流需要继承Writable接口,并且实现一个_write()方法,这里注意的是_write中可以传入3个参数,chunk, encoding, callback,chunk就是代表写入的数据,通常是一个buffer,encoding是编码类型,通常不会用到,最后的callback要注意,它并不是我们用这个自定义写流调用write时的回调,而是我们上面讲到写流实现时的clearBuffer函数。

    自定义的方法可以参考如下代码

    let { Writable } = require('stream');// 可写流实现_write方法// 源码中默认调用的是Writable中的write方法class MyWrite extends Writable {    _write(chunk, encoding, callback) {        console.log(chunk.toString());        callback(); // clearBuffer    }}let mw = new MyWrite();mw.write('珠峰', 'utf8', () => {    console.log(1);})mw.write('珠峰', 'utf8', () => {    console.log(1);});复制代码

Duplex 双工流

双工流其实就是结合了上面我们说的自定义读流和自定义写流,它既能读也能写,同时可以做到读写之间互不干扰

使用方法可以参考如下代码

let { Duplex } =  require('stream');// 双工流 又能读 又能写,而且读取可以没关系(互不干扰)let d = Duplex({    read() {        this.push('hello');        this.push(null);    },    write(chunk, encoding, callback) {        console.log(chunk);        callback();    }});d.on('data', function(data) {    console.log(data);});d.write('hello');复制代码

Transform 转换流

转换流的本质就是双工流,唯一不同的是它并不需要像上面提到的双工流一样实现read和write,它只需要实现一个transform方法用于写数据给可读流,但是对于读取来说可读流的数据会经过处理后自动放入可写流中,这点上不需要我们去实现

使用方法可以参考如下代码

let { Transform } =  require('stream');// 它的参数和可写流一样let tranform1 = Transform({    transform(chunk, encoding, callback) {        this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中        callback();    }});let tranform2 = Transform({    transform(chunk, encoding, callback){        console.log(chunk.toString());        callback();    }});// 等待你的输入// rs.pipe(ws);// 希望将输入的内容转化成大写在输出出来process.stdin.pipe(tranform1).pipe(tranform2);// 对象流 可读流里只能放buffer或者字符串 对象流里可以放对象复制代码

对象流

默认情况下,流处理的数据是Buffer/String类型的值。对象流的特点就是它有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象。

使用方法可以参考如下代码

const { Transform } = require('stream');let fs = require('fs');let rs = fs.createReadStream('./users.json');rs.setEncoding('utf8');let toJson = Transform({    readableObjectMode: true,    transform(chunk, encoding, callback) {        this.push(JSON.parse(chunk));        callback();    }});let jsonOut = Transform({    writableObjectMode: true,    transform(chunk, encoding, callback) {        console.log(chunk);        callback();    }});rs.pipe(toJson).pipe(jsonOut);复制代码

转载地址:http://djxto.baihongyu.com/

你可能感兴趣的文章
我的友情链接
查看>>
innodb_buffer_pool_size 大小建议
查看>>
Delphi XE2 之 FireMonkey 入门(25) - 数据绑定: TBindingsList: 表达式的灵活性及表达式函数...
查看>>
事件自调用 - 回复 maxcool 的问题
查看>>
Horizon View 6.0 基于RDS的应用发布
查看>>
ubuntu下系统重启dns就被清空的解决方案
查看>>
Ant Examples
查看>>
建立C语言动态链接库
查看>>
红帽企业存储管理之iscsi简单应用
查看>>
Rsync安装使用
查看>>
更改Linux系统日志的时间格式
查看>>
FC-SAN vs. IP-SAN详细技术比较
查看>>
sequioadb源码分析
查看>>
基于嵌入式Linux系统设备驱动程序的开发
查看>>
【smart-transform】取自 Atom 的 babeljs/coffeescript/typescript 智能转 es5 库
查看>>
基于S3C6410的Touch驱动详解
查看>>
使用Jekyll托管github pages的好处
查看>>
java、javaw和javaws的区别
查看>>
Git 远程分支
查看>>
华为S9300、s5700交换机端口镜像配置
查看>>