ReadableStream的代码实现

56 min read

ReadableStream 的实现包括以下模块:

  1. 构造函数

ReadableStream 的构造函数包括一个参数,即一个ReadableStreamDefaultController实例对象。ReadableStreamDefaultController负责控制数据流的生成和消费。

class ReadableStream {
  constructor(controller, options) {
    if (typeof controller !== 'object' || controller === null ||
        typeof controller.start !== 'function' ||
        typeof controller.pull !== 'function' ||
        typeof controller.cancel !== 'function' ||
        typeof controller.desiredSize !== 'number') {
      throw new TypeError('ReadableStream constructor must be provided a controller ' +
                          'with start, pull, cancel, and desiredSize functions');
    }

    this._controller = controller;

    this._readableStreamController = undefined;
    this._state = 'readable';
    this._started = false;
    this._closedPromise = undefined;
    this._closedPromiseResolve = undefined;
    this._closedPromiseReject = undefined;
    this._storedError = undefined;
    this._disturbed = false;
  }
}
  1. 阅读数据流

通过调用ReadableStream 的getReader方法,可获取一个ReadableStreamDefaultReader 对象,用于读取数据流中的数据。

class ReadableStream {
  getReader(options) {
    return new ReadableStreamDefaultReader(this, options);
  }
}
  1. 控制数据流

通过ReadableStreamDefaultController对象对数据流进行控制。

class ReadableStreamDefaultController {
  constructor(stream, underlyingSource, size, highWaterMark) {
    if (!(stream instanceof ReadableStream)) {
      throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance');
    }

    this._stream = stream;
    this._underlyingSource = underlyingSource;
    this._queue = [];
    this._pullingPromise = undefined;
    this._pullingPromiseResolve = undefined;
    this._pullingPromiseReject = undefined;
    this._controlledReadableStream = undefined;
    this._queueTotalSize = 0;
    this._queueSize = 0;

    const { size: normalizedSize, highWaterMark: normalizedHighWaterMark } =
        normalizeReadableStreamOptions(size, highWaterMark);

    this._strategySize = normalizedSize;
    this._strategyHWM = normalizedHighWaterMark;
  }

  get desiredSize() {
    if (this._queueSize === 0) {
      return this._strategyHWM - this._queueTotalSize;
    }
    return this._queueSize;
  }

  close() {
    const state = this._stream._state;
    if (state !== 'readable') {
      return;
    }

    if (streamHasDefaultReader(this._stream)) {
      const reader = this._stream._readableStreamController._controlledReadableStream.getReader();
      if (isReadableStreamDefaultReader(reader) && reader._readRequests.length > 0) {
        reader._readRequests.shift()('done');
      }
    }

    if (this._queueSize > 0) {
      this._queue = [];
      this._queueTotalSize = 0;
    }

    this._stream._state = 'closed';
    this._controlledReadableStream = undefined;

    const closedPromise = this._stream._closedPromise;
    this._stream._closedPromise = undefined;
    this._stream._closedPromiseResolve = undefined;
    this._stream._closedPromiseReject = undefined;

    resolvePromise(closedPromise, undefined);
  }
  
  enqueue(chunk) {
    if (typeof chunk !== 'object' || chunk === null) {
      this.error(new TypeError('Data to be enqueued must be an object or null'));
      return;
    }

    if (this._stream._state !== 'readable') {
      return;
    }

    const size = this._strategySize(chunk);
    if (this._queueSize > this._strategyHWM - size) {
      this._stream._state = 'waiting';

      const resolve = () => {
        if (this._stream._state !== 'waiting') {
          return;
        }

        this._stream._state = 'readable';
        this._enqueueInReadable(chunk);
      };

      if (this._pullingPromise !== undefined) {
        this._pullingPromise.then(resolve, resolve);
      } else {
        this._pullingPromise = new Promise((resolve, reject) => {
          this._pullingPromiseResolve = resolve;
          this._pullingPromiseReject = reject;
        });
        this._pullingPromise.then(resolve, resolve);
      }
    } else {
      this._enqueueInReadable(chunk);
    }
  }
  
  error(e) {
    if (this._stream._state !== 'readable') {
      throw new TypeError('Cannot call error() in a non-readable state');
    }

    this._stream._state = 'errored';
    this._storedError = e;

    if (streamHasDefaultReader(this._stream)) {
      const reader = this._stream._readableStreamController._controlledReadableStream.getReader();

      if (isReadableStreamDefaultReader(reader)) {
        reader._readRequests.forEach(streamRecord => streamRecord.promise.reject(e));
        reader._readRequests = new SimpleQueue();
      }
    }

    if (this._queueSize === 0) {
      const closedPromise = this._stream._closedPromise;
      this._stream._closedPromise = undefined;
      this._stream._closedPromiseResolve = undefined;
      this._stream._closedPromiseReject = undefined;

      resolvePromise(closedPromise, undefined);
    }
  }
  1. 读取数据流

通过ReadableStreamDefaultReader对象,对数据流进行读取和消费。

class ReadableStreamDefaultReader {
  constructor(stream, options = {}) {
    if (!(stream instanceof ReadableStream)) {
      throw new TypeError('ReadableStreamDefaultReader can only be constructed with a ReadableStream instance');
    }
    if (stream._state === 'closed') {
      throw new TypeError('ReadableStream is closed');
    }

    if (options.mode === 'byob' && !derivedIsReadableStreamBYOBReader(stream)) {
      throw new TypeError('ReadableStreamReader can only have a mode of "default" on byte source streams');
    }

    this._stream = stream;
    this._readRequests = new SimpleQueue();
    this._ownerReadableStream = undefined;
    this._closedPromise = new Promise((resolve, reject) => {
      this._closedPromise_resolve = resolve;
      this._closedPromise_reject = reject;
    });

    this._readIntoRequests = new SimpleQueue();
    this._totalQueuedData = 0;
    this._isBYOBReader = options.mode === 'byob';
    this._storedError = undefined;
  }

  get closed() {
    if (!isReadableStreamDefaultReader(this)) {
      return Promise.reject(defaultReaderBrandCheckException('closed'));
    }

    return this._closedPromise;
  }

  cancel(reason) {
    if (!isReadableStreamDefaultReader(this)) {
      return Promise.reject(defaultReaderBrandCheckException('cancel'));
    }

    if (this._stream._state === 'closed') {
      return Promise.resolve();
    }

    this._readRequests = new SimpleQueue();
    this._readIntoRequests = new SimpleQueue();
    this._storedError = new TypeError(reason);

    const result = promiseRejectedWith(this._storedError);
    if (streamHasDefaultReader(this._stream) && this._stream._readableStreamController._controlledReadableStream === this._stream) {
      this._callReadableStreamPullIfNeeded();

      const reader = this._stream._readableStreamController._controlledReadableStream.getReader();
      if (isReadableStreamDefaultReader(reader)) {
        reader._readRequests.forEach(streamRecord => streamRecord.promise.reject(this._storedError));
        reader._readRequests = new SimpleQueue();
      }
    }

    return result;
  }

  private _readSteps(chunk) {
    const stream = this._ownerReadableStream;
    if (this._isBYOBReader) {
      assert(typeof chunk === 'object' && chunk.constructor === Uint8Array);
      assert(this._readIntoRequests.length > 0);

      const request = this._readIntoRequests.shift();
      request.bytesFilled = fillArrayFromChunk(chunk, request.data);

      if (request.bytesFilled < request.size) {
        backlogReduced = false;
        this._readIntoRequests.unshift(request);
        stream._state = 'waiting';
      } else {
        request.promiseResolve(undefined);
        backlogReduced = true;
      }
    } else {
      assert(this._readRequests.length > 0);

      const request = this._readRequests.shift();
      request.promiseResolve({ value: chunk, done: false });
      backlogReduced = true;
    }

    assert(backlogReduced);
    this._totalQueuedData -= this._strategyHWM;

    if (this._totalQueuedData <= 0) {
      if (streamHasDefaultReader(stream) && this._ownerReadableStream._state === 'waiting') {
        assert(stream._readableStreamController._controlledReadableStream === stream);
        stream._readableStreamController._pullingPromiseResolve();
      }
    }
  }

  private _readError(error) {
    const stream = this._ownerReadableStream;
    if (this._isBYOBReader) {
      assert(this._readIntoRequests.length > 0);

      const request = this._readIntoRequests.shift();
      request.promiseReject(error);
    } else {
      assert(this._readRequests.length > 0);

      const request = this._readRequests.shift();
      request.promiseReject(error);
    }

    this._callReadableStreamPullIfNeeded();
  }

  _callReadableStreamPullIfNeeded() {
    const stream = this._ownerReadableStream;

    if (!streamHasDefaultReader(stream)) {
      return;
    }

    if (this._closedPromise.state === 'fulfilled') {
      return;
    }

    if (stream._state !== 'readable') {
      return;
    }

    if (isReadableStreamLocked(stream) === true && stream._reader !== this) {
      return;
    }

    if (this._readRequests.length === 0 && (!this._isBYOBReader || this._readIntoRequests.length === 0)) {
      return;
    }

    if (stream._readableStreamController._inChunk && isReadableStreamLocked(stream) === true) {
      return;
    }

    stream._readableStreamController._inChunk = true;

    const pullPromise = stream._readableStreamController.pull();
    assert(pullPromise instanceof Promise);
    pullPromise.then(() => {
      stream._readableStreamController._inChunk = false;
      this._callReadableStreamPullIfNeeded();
    }).catch(e => {
      stream._readableStreamController._inChunk = false;
      this._ownerReadableStream.error(e);
    });
  }

  read() {
    if (!isReadableStreamDefaultReader(this)) {
      return Promise.reject(defaultReaderBrandCheckException('read'));
    }

    if (this._storedError !== undefined) {
      return Promise.reject(this._storedError);
    }

    const stream = this._ownerReadableStream;
    stream._disturbed = true;

    if (stream._state === 'errored') {
      return Promise.reject(stream._storedError);
    }

    if (stream._state === 'closed') {
      return Promise.resolve({ value: undefined, done: true });
    }

    const { value, done } = this._readValueFromQueue();
    const p = this._readRequests.shift();

    if (done === true) {
      this._closedPromise_resolve(undefined);
      this._closedPromise_resolve = undefined;
      this._closedPromise_reject = undefined;
      this._closedPromise = resolvedPromise;
    } else {
      this._callReadableStreamPullIfNeeded();
    }

    return Promise.resolve({ value, done });
  }
}