rx java - Error handling for zipped observables -
my use case is: list of permalinks, , need issue 2 rest requests per permalink data in parts. when both requests back, want merge info , (here - print out). want code using zip
operator. here current code (together mocks library i'm using):
public class main { public static void main(string[] args) { contentmanager cm = new contentmanager(); observable .from(cm.getpermalinks(10)) .flatmap(permalink -> observable.zip( observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))), observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))), (datacontent, streamurlcontent) -> { if (datacontent == null || streamurlcontent == null) { system.err.println("not zipping " + datacontent + " , " + streamurlcontent); return observable.empty(); } return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl); })) .subscribe(system.out::println); } } class subscribingrestcallback implements restcallback { private final subscriber<? super content> subscriber; public subscribingrestcallback(subscriber<? super content> subscriber) { this.subscriber = subscriber; } @override public void onsuccess(content content) { subscriber.onnext(content); subscriber.oncompleted(); } @override public void onfailure(int code, string message) { system.err.println(message); subscriber.onnext(null); subscriber.oncompleted(); } } public class content { public final string permalink; public final string logourl; public final string streamurl; public content(string permalink, string logourl, string streamurl) { this.permalink = permalink; this.logourl = logourl; this.streamurl = streamurl; } @override public string tostring() { return string.format("content [%s, %s, %s]", permalink, logourl, streamurl); } } public interface restcallback { void onsuccess(content content); void onfailure(int code, string message); } class contentmanager { private final random random = new random(); public list<string> getpermalinks(int n) { list<string> permalinks = new arraylist<>(n); (int = 1; <= n; ++i) { permalinks.add("perma_" + i); } return permalinks; } public void getdatabypermalink(string permalink, restcallback callback) { getbypermalink(permalink, callback, false); } public void getstreambypermalink(string permalink, restcallback callback) { getbypermalink(permalink, callback, true); } private void getbypermalink(string permalink, restcallback callback, boolean stream) { // simulate network latency , unordered results new thread(() -> { try { thread.sleep(random.nextint(1000) + 200); } catch (interruptedexception e) { e.printstacktrace(); } if (random.nextint(100) < 95) { string logourl; string streamurl; if (stream) { logourl = null; streamurl = "http://" + permalink + "/stream"; } else { logourl = "http://" + permalink + "/logo.png"; streamurl = null; } callback.onsuccess(new content(permalink, logourl, streamurl)); } else { callback.onfailure(-1, permalink + " data failure"); } }).start(); } }
in general, works, don't error handling in implementation. basically, rest requests may fail, in case onfailure
method calls subscriber.onnext(null)
zip
method has work (one request may have failed, other 1 may have not, , don't know failed). then, in zip
function need if
checks both not null
(my code crash if of partial content
s null
).
i able filter out null
using filter
operator somewhere, if possible. or maybe there better way emitting null
values failure case still works zip
function?
first of all, right way notify subscriber
error call subscriber.onerror
method:
class subscribingrestcallback implements restcallback { private final subscriber<? super content> subscriber; public subscribingrestcallback(subscriber<? super content> subscriber) { this.subscriber = subscriber; } @override public void onsuccess(content content) { subscriber.onnext(content); subscriber.oncompleted(); } @override public void onfailure(int code, string message) { subscriber.onerror(new exception(message)); } }
even if don't want whole stream fail, still need call subscriber.onerror()
method. there other ways shallow errors. 1 of them onerrorresumenext
operator:
observable .from(cm.getpermalinks(10)) .flatmap(permalink -> observable.zip( observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))), observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))), (datacontent, streamurlcontent) -> { return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl); }).onerrorresumenext(observable.empty())) .subscribe(system.out::println);
edit
i have 1 last question: if notice zipper functions, return observable.empty() if 2 objects cannot zipped, , once return content. seems wrong. how should handle such error conditions in zipper function?
yes, returning observable.empty()
totally wrong. throwing exception zip
function seems best solution:
observable .from(cm.getpermalinks(10)) .flatmap(permalink -> observable.zip( observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))), observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))), (datacontent, streamurlcontent) -> { if (!isdatavalid(datacontent, streamurlcontent)) { throw new runtimeexception("something went wrong."); } return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl); }).onerrorresumenext(observable.empty())) .subscribe(system.out::println);
Comments
Post a Comment