rx java - Error handling for zipped observables -


my use case is: list of permalinks, , need issue 2 rest requests per permalink data in parts. when both requests back, want merge info , (here - print out). want code using zip operator. here current code (together mocks library i'm using):

public class main {      public static void main(string[] args) {         contentmanager cm = new contentmanager();          observable                 .from(cm.getpermalinks(10))                 .flatmap(permalink -> observable.zip(                         observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))),                         observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))),                         (datacontent, streamurlcontent) -> {                             if (datacontent == null || streamurlcontent == null) {                                 system.err.println("not zipping " + datacontent + " , " + streamurlcontent);                                 return observable.empty();                             }                              return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl);                         }))                 .subscribe(system.out::println);     } }  class subscribingrestcallback implements restcallback {      private final subscriber<? super content> subscriber;      public subscribingrestcallback(subscriber<? super content> subscriber) {         this.subscriber = subscriber;     }      @override     public void onsuccess(content content) {         subscriber.onnext(content);         subscriber.oncompleted();     }      @override     public void onfailure(int code, string message) {         system.err.println(message);         subscriber.onnext(null);         subscriber.oncompleted();     } }  public class content {      public final string permalink;      public final string logourl;      public final string streamurl;      public content(string permalink, string logourl, string streamurl) {         this.permalink = permalink;         this.logourl = logourl;         this.streamurl = streamurl;     }      @override     public string tostring() {         return string.format("content [%s, %s, %s]", permalink, logourl, streamurl);     } }  public interface restcallback {      void onsuccess(content content);      void onfailure(int code, string message); }  class contentmanager {      private final random random = new random();      public list<string> getpermalinks(int n) {         list<string> permalinks = new arraylist<>(n);         (int = 1; <= n; ++i) {             permalinks.add("perma_" + i);         }          return permalinks;     }      public void getdatabypermalink(string permalink, restcallback callback) {         getbypermalink(permalink, callback, false);     }      public void getstreambypermalink(string permalink, restcallback callback) {         getbypermalink(permalink, callback, true);     }      private void getbypermalink(string permalink, restcallback callback, boolean stream) {         // simulate network latency , unordered results         new thread(() -> {             try {                 thread.sleep(random.nextint(1000) + 200);             } catch (interruptedexception e) {                 e.printstacktrace();             }             if (random.nextint(100) < 95) {                 string logourl;                 string streamurl;                 if (stream) {                     logourl = null;                     streamurl = "http://" + permalink + "/stream";                 } else {                     logourl = "http://" + permalink + "/logo.png";                     streamurl = null;                 }                 callback.onsuccess(new content(permalink, logourl, streamurl));             } else {                 callback.onfailure(-1, permalink + " data failure");             }         }).start();     } } 

in general, works, don't error handling in implementation. basically, rest requests may fail, in case onfailure method calls subscriber.onnext(null) zip method has work (one request may have failed, other 1 may have not, , don't know failed). then, in zip function need if checks both not null (my code crash if of partial contents null).

i able filter out null using filter operator somewhere, if possible. or maybe there better way emitting null values failure case still works zip function?

first of all, right way notify subscriber error call subscriber.onerror method:

class subscribingrestcallback implements restcallback {     private final subscriber<? super content> subscriber;      public subscribingrestcallback(subscriber<? super content> subscriber) {         this.subscriber = subscriber;     }      @override     public void onsuccess(content content) {         subscriber.onnext(content);         subscriber.oncompleted();     }      @override     public void onfailure(int code, string message) {         subscriber.onerror(new exception(message));     } } 

even if don't want whole stream fail, still need call subscriber.onerror() method. there other ways shallow errors. 1 of them onerrorresumenext operator:

observable         .from(cm.getpermalinks(10))         .flatmap(permalink -> observable.zip(                 observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))),                 observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))),                 (datacontent, streamurlcontent) -> {                     return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl);                 }).onerrorresumenext(observable.empty()))         .subscribe(system.out::println); 

edit

i have 1 last question: if notice zipper functions, return observable.empty() if 2 objects cannot zipped, , once return content. seems wrong. how should handle such error conditions in zipper function?

yes, returning observable.empty() totally wrong. throwing exception zip function seems best solution:

observable         .from(cm.getpermalinks(10))         .flatmap(permalink -> observable.zip(                 observable.<content>create(subscriber -> cm.getdatabypermalink(permalink, new subscribingrestcallback(subscriber))),                 observable.<content>create(subscriber -> cm.getstreambypermalink(permalink, new subscribingrestcallback(subscriber))),                 (datacontent, streamurlcontent) -> {                     if (!isdatavalid(datacontent, streamurlcontent)) {                         throw new runtimeexception("something went wrong.");                     }                     return new content(datacontent.permalink, datacontent.logourl, streamurlcontent.streamurl);                 }).onerrorresumenext(observable.empty()))         .subscribe(system.out::println); 

Comments

Popular posts from this blog

c# - Validate object ID from GET to POST -

node.js - Custom Model Validator SailsJS -

php - Find a regex to take part of Email -