diff --git a/src/stream.ts b/src/stream.ts index 19c8e1d..15be296 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,7 @@ import { Readable, Writable } from 'stream' import type { ReadableOptions, WritableOptions } from 'stream' import { F_MODE, TypeFlag, encode } from './head' -import type { DecodingHeadOptions, EncodingHeadOptions, decode } from './head' +import { DecodingHeadOptions, EncodingHeadOptions, decode } from './head' import { List, createList } from './list' import { noop } from './shared' @@ -100,10 +100,12 @@ class FastBytes { private queue: List bytesLen: number private offset: number + private position: number constructor() { this.queue = createList() this.bytesLen = 0 this.offset = 0 + this.position = 0 } push(b: Uint8Array) { @@ -120,27 +122,32 @@ class FastBytes { return new Uint8Array(0) } - const elt = this.queue.peek()! - const diff = elt.length - this.offset - if (size >= diff) { - const bb = new Uint8Array(size) - bb.set(elt.subarray(this.offset)) - this.queue.shift() - const next = this.queue.peek()! - bb.set(next.subarray(0, size - diff), diff) - this.queue.update(0, next.subarray(size - diff)) - this.bytesLen -= size - console.log('xx', this.queue.at(1)) - return bb - } + const elt = this.queue.peek() if (!elt) { throw new Error(FAST_BYTES_ERROR_MESSAGES.EXCEED_BYTES_LEN) } - + let bb: Uint8Array + const overflow = size >= elt.length + if (overflow) { + bb = new Uint8Array(size) + const preBinary = elt.subarray(this.offset) + bb.set(preBinary, 0) + this.queue.shift() + const next = this.queue.peek()! + const nextBinary = next.subarray(0, size - preBinary.length) + bb.set(nextBinary, preBinary.length) + this.position++ + this.queue.update(this.position, next.subarray(size - preBinary.length)) + while (size - preBinary.length - nextBinary.length > 0) { + bb.set(this.shift(size - preBinary.length - nextBinary.length), preBinary.length + nextBinary.length) + } + } else { + this.queue.update(this.position, elt.subarray(size)) + bb = elt.subarray(0, size) + } + this.offset += size this.bytesLen -= size - const bb = elt.subarray(this.offset, this.offset += size) - return bb } } @@ -150,10 +157,10 @@ export class Extract { private decodeOptions: DecodingHeadOptions // Aka, The default highWaterMark is 16 kilobytes. // So. we create a fast queue to store the data. - matrix: List + matrix: FastBytes constructor(options: DecodingHeadOptions) { this.decodeOptions = options - this.matrix = createList() + this.matrix = new FastBytes() this.writer = createWriteableStream({ // write(chunk, encoding, callback) { // c.matrix.push(chunk) @@ -168,12 +175,29 @@ export class Extract { }) } + private decodeHead() { + try { + const head = decode(this.matrix.shift(512), this.decodeOptions) + } catch (error) { + // + } + } + private transport() { + while (this.matrix.bytesLen > 0) { + // + + if (this.matrix.bytesLen < 512) { + break + } + this.decodeHead() + } + // // const reader = createReadbleStream({ // read() {} // }) // this.matrix.shift() - console.log('transport', this.matrix) + // console.log('transport', this.matrix) } get receiver() { @@ -201,15 +225,3 @@ export class Extract { export function createExtract(options: DecodingHeadOptions = {}) { return new Extract(options) } - -const bytes = new FastBytes() - -bytes.push(new Uint8Array(1024)) - -bytes.push(new Uint8Array(700)) - -const r = bytes.shift(1025) - -// console.log(r) - -// console.log(bytes)