system.reactive - C# Rx - ignoring error -
i have stream of independent events, handled asynchronously reactive extension. handler may fail whatever reasons, stream continues on.
however, in rx, right after error occurs, automatically unsubscribes. somehow configurable?
example:
async task<unit> actionasync(int i) { if (i > 1) throw new exception(); i.dump(); return unit.default; } void main() { var sb = new subject<int>(); sb.selectmany(actionasync).subscribe( _ => { }, ex => { ex.dump(); } ); sb.onnext(1); sb.onnext(2); sb.onnext(3); }
i'd have following output:
- 1
- exception
- 3
can achieve without try/catching in actionasync
?
there contract of behaviour in rx stream can onnext*(onerror|oncompleted)
. in other words 0 or more onnext
, 1 of either onerror
or oncompleted
@ end.
so, no can't configure rx. no longer rx if did.
what can do, however, write query can retry source.
if write code this:
async task<int> actionasync(int i) { if (i == 2) throw new exception(); return i; } void main() { var sb = new subject<int>(); sb .selectmany(actionasync) .do(_ => { }, ex => ex.dump()) .retry() .subscribe(_ => _.dump()); sb.onnext(1); sb.onnext(2); sb.onnext(3); }
then get:
1 exception of type 'system.exception' thrown. 3
as per comment asking performance issues, there aren't performance issues in using .retry()
, there behavioural issue.
if source cold - var sb = new [] { 1, 2, 3 }.toobservable();
- .retry()
start entire observable sequence again , result in infinite sequence of:
1 exception of type 'system.exception' thrown. 1 exception of type 'system.exception' thrown. 1 exception of type 'system.exception' thrown. 1 exception of type 'system.exception' thrown. 1 exception of type 'system.exception' thrown. 1 exception of type 'system.exception' thrown. ...
in code's case observable hot observable doesn't happen.
if wish on cold observable need make hot via .publish(...)
. this:
var sb = new[] { 1, 2, 3 }.toobservable(); sb .publish(sbp => sbp .selectmany(actionasync) .do(_ => { }, ex => ex.dump()) .retry()) .subscribe(_ => _.dump());
then expected behaviour returns.
Comments
Post a Comment