system.reactive - C# Rx - ignoring error -


i have stream of independent events, handled asynchronously reactive extension. handler may fail whatever reasons, stream continues on.

however, in rx, right after error occurs, automatically unsubscribes. somehow configurable?

example:

async task<unit> actionasync(int i) {     if (i > 1)         throw new exception();      i.dump();            return unit.default; }  void main() {     var sb = new subject<int>();      sb.selectmany(actionasync).subscribe(         _ => { },         ex =>         {             ex.dump();         }     );       sb.onnext(1);     sb.onnext(2);     sb.onnext(3); } 

i'd have following output:

  • 1
  • exception
  • 3

can achieve without try/catching in actionasync?

there contract of behaviour in rx stream can onnext*(onerror|oncompleted). in other words 0 or more onnext , 1 of either onerror or oncompleted @ end.

so, no can't configure rx. no longer rx if did.

what can do, however, write query can retry source.

if write code this:

async task<int> actionasync(int i) {     if (i == 2)         throw new exception();      return i; }  void main() {     var sb = new subject<int>();      sb         .selectmany(actionasync)         .do(_ => { }, ex => ex.dump())         .retry()         .subscribe(_ => _.dump());      sb.onnext(1);     sb.onnext(2);     sb.onnext(3); } 

then get:

 1 exception of type 'system.exception' thrown.  3 

as per comment asking performance issues, there aren't performance issues in using .retry(), there behavioural issue.

if source cold - var sb = new [] { 1, 2, 3 }.toobservable(); - .retry() start entire observable sequence again , result in infinite sequence of:

 1 exception of type 'system.exception' thrown.  1 exception of type 'system.exception' thrown.  1 exception of type 'system.exception' thrown.  1 exception of type 'system.exception' thrown.  1 exception of type 'system.exception' thrown.  1 exception of type 'system.exception' thrown.  ... 

in code's case observable hot observable doesn't happen.

if wish on cold observable need make hot via .publish(...). this:

var sb = new[] { 1, 2, 3 }.toobservable();  sb     .publish(sbp =>         sbp             .selectmany(actionasync)             .do(_ => { }, ex => ex.dump())             .retry())     .subscribe(_ => _.dump()); 

then expected behaviour returns.


Comments

Popular posts from this blog

java - Jasper subreport showing only one entry from the JSON data source when embedded in the Title band -

serialization - Convert Any type in scala to Array[Byte] and back -

SonarQube Plugin for Jenkins does not find SonarQube Scanner executable -