Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details

I have a stream which just generates some data(just a sequence of numbers):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();

a function which tries to take n next chunks:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}

and want to use it like this:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })

What is the idiomatic way to do that?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
294 views
Welcome To Ask or Share your Answers For Others

1 Answer

Using ReadableStream you could use a single function to check if elements of current chunk of data is equal to expected result.

Create variables, CHUNK and N, where CHUNK is the number of elements to slice or splice from original array, N is the variable incremented by CHUNK at each .enqueue() call within pull() call.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...