ReadableStream 的实现包括以下模块:
- 构造函数
ReadableStream 的构造函数包括一个参数,即一个ReadableStreamDefaultController实例对象。ReadableStreamDefaultController负责控制数据流的生成和消费。
class ReadableStream {
constructor(controller, options) {
if (typeof controller !== 'object' || controller === null ||
typeof controller.start !== 'function' ||
typeof controller.pull !== 'function' ||
typeof controller.cancel !== 'function' ||
typeof controller.desiredSize !== 'number') {
throw new TypeError('ReadableStream constructor must be provided a controller ' +
'with start, pull, cancel, and desiredSize functions');
}
this._controller = controller;
this._readableStreamController = undefined;
this._state = 'readable';
this._started = false;
this._closedPromise = undefined;
this._closedPromiseResolve = undefined;
this._closedPromiseReject = undefined;
this._storedError = undefined;
this._disturbed = false;
}
}
- 阅读数据流
通过调用ReadableStream 的getReader方法,可获取一个ReadableStreamDefaultReader 对象,用于读取数据流中的数据。
class ReadableStream {
getReader(options) {
return new ReadableStreamDefaultReader(this, options);
}
}
- 控制数据流
通过ReadableStreamDefaultController对象对数据流进行控制。
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark) {
if (!(stream instanceof ReadableStream)) {
throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance');
}
this._stream = stream;
this._underlyingSource = underlyingSource;
this._queue = [];
this._pullingPromise = undefined;
this._pullingPromiseResolve = undefined;
this._pullingPromiseReject = undefined;
this._controlledReadableStream = undefined;
this._queueTotalSize = 0;
this._queueSize = 0;
const { size: normalizedSize, highWaterMark: normalizedHighWaterMark } =
normalizeReadableStreamOptions(size, highWaterMark);
this._strategySize = normalizedSize;
this._strategyHWM = normalizedHighWaterMark;
}
get desiredSize() {
if (this._queueSize === 0) {
return this._strategyHWM - this._queueTotalSize;
}
return this._queueSize;
}
close() {
const state = this._stream._state;
if (state !== 'readable') {
return;
}
if (streamHasDefaultReader(this._stream)) {
const reader = this._stream._readableStreamController._controlledReadableStream.getReader();
if (isReadableStreamDefaultReader(reader) && reader._readRequests.length > 0) {
reader._readRequests.shift()('done');
}
}
if (this._queueSize > 0) {
this._queue = [];
this._queueTotalSize = 0;
}
this._stream._state = 'closed';
this._controlledReadableStream = undefined;
const closedPromise = this._stream._closedPromise;
this._stream._closedPromise = undefined;
this._stream._closedPromiseResolve = undefined;
this._stream._closedPromiseReject = undefined;
resolvePromise(closedPromise, undefined);
}
enqueue(chunk) {
if (typeof chunk !== 'object' || chunk === null) {
this.error(new TypeError('Data to be enqueued must be an object or null'));
return;
}
if (this._stream._state !== 'readable') {
return;
}
const size = this._strategySize(chunk);
if (this._queueSize > this._strategyHWM - size) {
this._stream._state = 'waiting';
const resolve = () => {
if (this._stream._state !== 'waiting') {
return;
}
this._stream._state = 'readable';
this._enqueueInReadable(chunk);
};
if (this._pullingPromise !== undefined) {
this._pullingPromise.then(resolve, resolve);
} else {
this._pullingPromise = new Promise((resolve, reject) => {
this._pullingPromiseResolve = resolve;
this._pullingPromiseReject = reject;
});
this._pullingPromise.then(resolve, resolve);
}
} else {
this._enqueueInReadable(chunk);
}
}
error(e) {
if (this._stream._state !== 'readable') {
throw new TypeError('Cannot call error() in a non-readable state');
}
this._stream._state = 'errored';
this._storedError = e;
if (streamHasDefaultReader(this._stream)) {
const reader = this._stream._readableStreamController._controlledReadableStream.getReader();
if (isReadableStreamDefaultReader(reader)) {
reader._readRequests.forEach(streamRecord => streamRecord.promise.reject(e));
reader._readRequests = new SimpleQueue();
}
}
if (this._queueSize === 0) {
const closedPromise = this._stream._closedPromise;
this._stream._closedPromise = undefined;
this._stream._closedPromiseResolve = undefined;
this._stream._closedPromiseReject = undefined;
resolvePromise(closedPromise, undefined);
}
}
- 读取数据流
通过ReadableStreamDefaultReader对象,对数据流进行读取和消费。
class ReadableStreamDefaultReader {
constructor(stream, options = {}) {
if (!(stream instanceof ReadableStream)) {
throw new TypeError('ReadableStreamDefaultReader can only be constructed with a ReadableStream instance');
}
if (stream._state === 'closed') {
throw new TypeError('ReadableStream is closed');
}
if (options.mode === 'byob' && !derivedIsReadableStreamBYOBReader(stream)) {
throw new TypeError('ReadableStreamReader can only have a mode of "default" on byte source streams');
}
this._stream = stream;
this._readRequests = new SimpleQueue();
this._ownerReadableStream = undefined;
this._closedPromise = new Promise((resolve, reject) => {
this._closedPromise_resolve = resolve;
this._closedPromise_reject = reject;
});
this._readIntoRequests = new SimpleQueue();
this._totalQueuedData = 0;
this._isBYOBReader = options.mode === 'byob';
this._storedError = undefined;
}
get closed() {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(defaultReaderBrandCheckException('closed'));
}
return this._closedPromise;
}
cancel(reason) {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(defaultReaderBrandCheckException('cancel'));
}
if (this._stream._state === 'closed') {
return Promise.resolve();
}
this._readRequests = new SimpleQueue();
this._readIntoRequests = new SimpleQueue();
this._storedError = new TypeError(reason);
const result = promiseRejectedWith(this._storedError);
if (streamHasDefaultReader(this._stream) && this._stream._readableStreamController._controlledReadableStream === this._stream) {
this._callReadableStreamPullIfNeeded();
const reader = this._stream._readableStreamController._controlledReadableStream.getReader();
if (isReadableStreamDefaultReader(reader)) {
reader._readRequests.forEach(streamRecord => streamRecord.promise.reject(this._storedError));
reader._readRequests = new SimpleQueue();
}
}
return result;
}
private _readSteps(chunk) {
const stream = this._ownerReadableStream;
if (this._isBYOBReader) {
assert(typeof chunk === 'object' && chunk.constructor === Uint8Array);
assert(this._readIntoRequests.length > 0);
const request = this._readIntoRequests.shift();
request.bytesFilled = fillArrayFromChunk(chunk, request.data);
if (request.bytesFilled < request.size) {
backlogReduced = false;
this._readIntoRequests.unshift(request);
stream._state = 'waiting';
} else {
request.promiseResolve(undefined);
backlogReduced = true;
}
} else {
assert(this._readRequests.length > 0);
const request = this._readRequests.shift();
request.promiseResolve({ value: chunk, done: false });
backlogReduced = true;
}
assert(backlogReduced);
this._totalQueuedData -= this._strategyHWM;
if (this._totalQueuedData <= 0) {
if (streamHasDefaultReader(stream) && this._ownerReadableStream._state === 'waiting') {
assert(stream._readableStreamController._controlledReadableStream === stream);
stream._readableStreamController._pullingPromiseResolve();
}
}
}
private _readError(error) {
const stream = this._ownerReadableStream;
if (this._isBYOBReader) {
assert(this._readIntoRequests.length > 0);
const request = this._readIntoRequests.shift();
request.promiseReject(error);
} else {
assert(this._readRequests.length > 0);
const request = this._readRequests.shift();
request.promiseReject(error);
}
this._callReadableStreamPullIfNeeded();
}
_callReadableStreamPullIfNeeded() {
const stream = this._ownerReadableStream;
if (!streamHasDefaultReader(stream)) {
return;
}
if (this._closedPromise.state === 'fulfilled') {
return;
}
if (stream._state !== 'readable') {
return;
}
if (isReadableStreamLocked(stream) === true && stream._reader !== this) {
return;
}
if (this._readRequests.length === 0 && (!this._isBYOBReader || this._readIntoRequests.length === 0)) {
return;
}
if (stream._readableStreamController._inChunk && isReadableStreamLocked(stream) === true) {
return;
}
stream._readableStreamController._inChunk = true;
const pullPromise = stream._readableStreamController.pull();
assert(pullPromise instanceof Promise);
pullPromise.then(() => {
stream._readableStreamController._inChunk = false;
this._callReadableStreamPullIfNeeded();
}).catch(e => {
stream._readableStreamController._inChunk = false;
this._ownerReadableStream.error(e);
});
}
read() {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(defaultReaderBrandCheckException('read'));
}
if (this._storedError !== undefined) {
return Promise.reject(this._storedError);
}
const stream = this._ownerReadableStream;
stream._disturbed = true;
if (stream._state === 'errored') {
return Promise.reject(stream._storedError);
}
if (stream._state === 'closed') {
return Promise.resolve({ value: undefined, done: true });
}
const { value, done } = this._readValueFromQueue();
const p = this._readRequests.shift();
if (done === true) {
this._closedPromise_resolve(undefined);
this._closedPromise_resolve = undefined;
this._closedPromise_reject = undefined;
this._closedPromise = resolvedPromise;
} else {
this._callReadableStreamPullIfNeeded();
}
return Promise.resolve({ value, done });
}
}