Skip to content

Commit

Permalink
fix: bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
nonzzz committed Aug 5, 2024
1 parent e4c111e commit db2de08
Showing 1 changed file with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Readable, Writable } from 'stream'
import type { ReadableOptions, WritableOptions } from 'stream'
import { F_MODE, TypeFlag, encode } from './head'
import type { DecodingHeadOptions, EncodingHeadOptions, decode } from './head'
import { DecodingHeadOptions, EncodingHeadOptions, decode } from './head'
import { List, createList } from './list'
import { noop } from './shared'

Expand Down Expand Up @@ -100,10 +100,12 @@ class FastBytes {
private queue: List<Uint8Array>
bytesLen: number
private offset: number
private position: number
constructor() {
this.queue = createList<Uint8Array>()
this.bytesLen = 0
this.offset = 0
this.position = 0
}

push(b: Uint8Array) {
Expand All @@ -120,27 +122,32 @@ class FastBytes {
return new Uint8Array(0)
}

const elt = this.queue.peek()!
const diff = elt.length - this.offset
if (size >= diff) {
const bb = new Uint8Array(size)
bb.set(elt.subarray(this.offset))
this.queue.shift()
const next = this.queue.peek()!
bb.set(next.subarray(0, size - diff), diff)
this.queue.update(0, next.subarray(size - diff))
this.bytesLen -= size
console.log('xx', this.queue.at(1))
return bb
}
const elt = this.queue.peek()
if (!elt) {
throw new Error(FAST_BYTES_ERROR_MESSAGES.EXCEED_BYTES_LEN)
}

let bb: Uint8Array
const overflow = size >= elt.length
if (overflow) {
bb = new Uint8Array(size)
const preBinary = elt.subarray(this.offset)
bb.set(preBinary, 0)
this.queue.shift()
const next = this.queue.peek()!
const nextBinary = next.subarray(0, size - preBinary.length)
bb.set(nextBinary, preBinary.length)
this.position++
this.queue.update(this.position, next.subarray(size - preBinary.length))
while (size - preBinary.length - nextBinary.length > 0) {
bb.set(this.shift(size - preBinary.length - nextBinary.length), preBinary.length + nextBinary.length)
}
} else {
this.queue.update(this.position, elt.subarray(size))
bb = elt.subarray(0, size)
}
this.offset += size
this.bytesLen -= size

const bb = elt.subarray(this.offset, this.offset += size)

return bb
}
}
Expand All @@ -150,10 +157,10 @@ export class Extract {
private decodeOptions: DecodingHeadOptions
// Aka, The default highWaterMark is 16 kilobytes.
// So. we create a fast queue to store the data.
matrix: List<Uint8Array>
matrix: FastBytes
constructor(options: DecodingHeadOptions) {
this.decodeOptions = options
this.matrix = createList()
this.matrix = new FastBytes()
this.writer = createWriteableStream({
// write(chunk, encoding, callback) {
// c.matrix.push(chunk)
Expand All @@ -168,12 +175,29 @@ export class Extract {
})
}

private decodeHead() {
try {
const head = decode(this.matrix.shift(512), this.decodeOptions)
} catch (error) {
//
}
}

private transport() {
while (this.matrix.bytesLen > 0) {
//

if (this.matrix.bytesLen < 512) {
break
}
this.decodeHead()
}
//
// const reader = createReadbleStream({
// read() {}
// })
// this.matrix.shift()
console.log('transport', this.matrix)
// console.log('transport', this.matrix)
}

get receiver() {
Expand Down Expand Up @@ -201,15 +225,3 @@ export class Extract {
export function createExtract(options: DecodingHeadOptions = {}) {
return new Extract(options)
}

const bytes = new FastBytes()

bytes.push(new Uint8Array(1024))

bytes.push(new Uint8Array(700))

const r = bytes.shift(1025)

// console.log(r)

// console.log(bytes)

0 comments on commit db2de08

Please sign in to comment.