Trying out node's Generators and Streams

Disclaimer: I'm quite new to generators, iterators, streams, event sourcing, EventStore, and CQRS.

I'm prototyping a microservice at Textalk, using Event Sourcing and CQRS, trying out the event store EventStore (yes, the name is quite ambiguous).  There is a node library implementing it's tcp API, but it's modelled after the .NET lib and doesn't make use of modern nodeisms.

So, wanting to be able to reduce a stream of events up to given eventnumber and await the result, I thought I'd make a wrapper using Generators.  Given that applyEvent() takes the current state object, applies the event and returns the new state, I could write:

const state = await(Promise.reduce(
  es.readStreamEventsUntil(stream, 0, 100, options), applyEvent, 0
))

That requires bluebird promises, to get Promice.reduce.  The implementation was roughly:

EspPrototype.readStreamEventsUntil = function*(stream, from, to, params) {
  const self = this

  const options = Object.assign({}, self.defaults, params)
  const promises = []

  for (let i = from; i <= to; i++) {
    const prom = {}
    prom.promise = new Promise((res, rej) => {prom.resolve = res, prom.reject = rej})
    promises.push(prom)
  }

  const timer = setTimeout(
    () => promises.shift().reject(new Error('Timeout reached')), options.timeout
  )

  self.readStreamEventsForward(
    stream, from, to - from + 1, options.resolveLinkTos, options.requireMaster,
    (event) => promises.shift().resolve(event)
  ).then((result) => {
    // Old events are now read.  Do we need to wait for more?
    if (promises.length === 0) return clearTimeout(timer)

    let correlationId
    self.subscribeToStream(stream, options.resolveLinkTos, (event) => {
      promises.shift().resolve(event)

      if (promises.length === 0) {
        self.unsubscribeFromStream(correlationId)
        clearTimeout(timer)
      }
    }).then((correlationIdIn) => {correlationId = correlationIdIn})
  })

  for (const prom in promises) yield promises[prom].promise
}

The principle here is yielding promises, and Promise.reduce handling a generator yielding promises.  When not knowing how many promises we should yield, we need to yield asynchronously.  That can be done with await, or using fibers directly:

let nextResolve, nextReject
let nextPromise = new Promise((res, rej) => {nextResolve = res; nextReject = rej})

self.readStreamEventsForward(
  stream, from, options.maxCount, options.resolveLinkTos, options.requireMaster,
    event => {
      // Store the current resolve and make a new next promise.
      const resolve = nextResolve
      nextPromise = new Promise((res, rej) => {nextResolve = res; nextReject = rej})
      promises.push(nextPromise)

      resolve(event)
    }
  ).then(result => nextPromise.push(null))
)

while (true) {  // Keep yielding until nextPromise gives null
  const nextValue = await(nextPromise)
  if (nextValue === null) return
  yield nextValue
}

But there are several problems with this.  The await will cause a Fiber halt, and inside an outer await for the Promise.reduce, it will mess upp the asynchronicity and cause all events to be handled only when every promise is already yielded.  It will give the wrapper dependencies on awaitasync and thereby fibers.  It lets me use the syntax I want, but in a hackish way.

What I really wanted wasn't a generator, but a stream.  The readStreamEventsForward would be short and clean:

EspPrototype.readStreamEventsForward = function(streamId, from, params) {
  const self    = this
  const options = Object.assign({}, self.defaults, params)
  const stream  = new EventStoreStream()

  self.connection.readStreamEventsForward(
    streamId, from, options.maxCount, options.resolveLinkTos, options.requireMaster,
    event => stream.push(event), self.credentials,
    read  => stream.push(null)
  )

  return stream
}

I can't make a Promise.reduce on a stream, but I could use Highland in consuming code to reduce it without adding dependencies to the wrapper.  The usage would be:

_(es.readStreamEventsForward(streamId, 0)).reduce({}, applyEvent).pull(useResult)

But I can't await a Highland stream.  I was trying to find some way for Highland to return it's thunk to no avail.  So, I made a small wrapper:

const await_ = stream => await(new Promise((res, rej) => stream.errors(rej).toArray(res)))

// Reduce all previous events to current state.
const state = await_(
  _(es.readStreamEventsForward(streamId, 0)).reduce({}, applyEvent)
)

Ahh, finally untangled and reduced, the projection could deliver it's state calculated on the fly.  (…or if stored at a certain event, just switch the 0 for the eventNumber and {} for the stored state).

I've put up the wrapper library on GitHub as event-store-promise, because it also gives promises, and ESP is a better acronym than ESS.  It is by no means production ready or stable in its API.

Show Comments