RxJs - why Rx.Observable.fromNodeCallack(...)(...).retry() does not retry on error? -
i wondering why following code (in coffeescript) not retry expected.
rx = require 'rx' count = 0 functtotest = (cb) -> console.log "count is", count count++ if count 1 cb(new error('some error')) else if count 2 cb(null,2) else if count 3 cb(null,3) else cb(null,4) source = rx.observable.fromnodecallback(functtotest)() onnext = (value) -> console.log value onerror = (err) -> console.log err oncompleted = -> console.log "done" retryablesrc = source.retry(3) retryablesrc.subscribe(onnext, onerror, oncompleted)
it output following messages , quit
count 0 [error: error]
i had thought might because fromnodecallback() return hot observable. test below show not.
rx = require 'rx' count = 0 functtotest = (cb) -> console.log "count is", count count++ if count 1 cb(new error('some error')) else if count 2 cb(null,2) else if count 3 cb(null,3) else cb(null,4) source = rx.observable.fromnodecallback(functtotest)() onnext = (value) -> console.log value onerror = (err) -> console.log err oncompleted = -> console.log "done" retryablesrc = source.retry(3) settimeout ( -> ), 1000
if hot observable, program above should have printed "count 0" message. in reality program waits 1 second , quits.
it hot, or goes hot when first subscribe it.
inside of fromnodecallback
rx.observable.create(...).publishlast().refcount()
meaning when first subscribe execute method, print count emit error. error caught downstream retry, resubscribe thrice received cached error, emit itself.
you can fix using flatmap
ncb = rx.observable.fromnodecallback(functtotest); source = rx.observable.just(ncb).flatmap((fn) -> fn());
Comments
Post a Comment