android - Buffer events, start new group 1 second after the last event from previous group -


i on android , created observable of touch downs. , buffer them long new events coming within second. when second elapsed last touch down emission, want create list of collected events , emit it, , start new collection.

i have 2 snippets of code work either don't seem reactive (#1) or unnecessarily complex (#2). here are:

  1. i use buffer(func0<? extends observable<? extends tclosing>> bufferclosingselector) overload - when returned selector emits item, means second elapsed last source emission , buffer can emitted.
  2. the obserbable selector function returns publishsubject can decide when push emissions it.
  3. there exists runnable task pushes new emissions closing subject. task scheduled (via android handler) run second after processed source touch down emission happened. when new source emission of new touch down happens afterwards within second, task cancelled, , new 1 scheduled again after second.

here relevant android code:

    final publishsubject<motionevent> touchpublishsubject = publishsubject.create();      final viewgroup viewgroup = (viewgroup) findviewbyid(android.r.id.content);     viewgroup.setontouchlistener(new view.ontouchlistener() {          @override         public boolean ontouch(view v, motionevent event) {             touchpublishsubject.onnext(event);              return true;         }     });      final publishsubject<object> windowclosepublishsubject = publishsubject.create();     final handler handler = new handler(looper.getmainlooper());     final runnable r = new runnable() {          @override         public void run() {             log.d(tag, "will emit closing item");             windowclosepublishsubject.onnext("now!");         }     };      viewobservable             .bindview(viewgroup, touchpublishsubject)             .filter(new func1<motionevent, boolean>() {                  @override                 public boolean call(motionevent motionevent) {                     return motionevent.getaction() == motionevent.action_down;                 }             })             .doonnext(new action1<motionevent>() {                  @override                 public void call(motionevent motionevent) {                     // restart timer                     log.d(tag, "cancelling closing");                     handler.removecallbacks(r);                     log.d(tag, "scheduling closing");                     handler.postdelayed(r, 1000l);                      // show touch                     log.d(tag, motionevent.tostring());                 }             })             .buffer(new func0<observable<?>>() {                  @override                 public observable<?> call() {                     log.d(tag, "creating buffer closing selector");                     return windowclosepublishsubject                             .doonnext(new action1<object>() {                                  @override                                 public void call(object o) {                                     log.d(tag, "emitting closing item '" + o + "'");                                 }                             });                 }             })             .subscribe(new action1<list<motionevent>>() {                  @override                 public void call(list<motionevent> motionevents) {                     // show number of touch downs                     log.d(tag, "got " + motionevents.size() + " touch downs");                 }             }); 

i don't fancy usage of handler , in solution, looked further.

the second snippet (the touchpublishsubject , touch listener same):

  1. i reuse touch down generating touchpublishsubject window closing observable, debouncing first 1 sec timeout
  2. apparently, debouncing happens on scheduler.computation(), moves observing same scheduler , need use observeon(androidschedulers.mainthread()) - find bit strange nested observable's scheduler, closes buffer windows, promotes whole chain happen in scheduler well

the code:

    final publishsubject<motionevent> touchpublishsubject = publishsubject.create();      final viewgroup viewgroup = (viewgroup) findviewbyid(android.r.id.content);     viewgroup.setontouchlistener(new view.ontouchlistener() {          @override         public boolean ontouch(view v, motionevent event) {             touchpublishsubject.onnext(event);              return true;         }     });      viewobservable             .bindview(viewgroup, touchpublishsubject)             .filter(new func1<motionevent, boolean>() {                  @override                 public boolean call(motionevent motionevent) {                     return motionevent.getaction() == motionevent.action_down;                 }             })             .doonnext(new action1<motionevent>() {                  @override                 public void call(motionevent motionevent) {                     // show touch                     log.d(tag, motionevent.tostring());                 }             })             .buffer(new func0<observable<?>>() {                  @override                 public observable<?> call() {                     log.d(tag, "creating buffer closing selector");                     return touchpublishsubject                             .debounce(1l, timeunit.seconds)                             .doonnext(new action1<object>() {                                  @override                                 public void call(object o) {                                     log.d(tag, "emitting closing item '" + o + "'");                                 }                             });                 }             })             .observeon(androidschedulers.mainthread())             .subscribe(new action1<list<motionevent>>() {                  @override                 public void call(list<motionevent> motionevents) {                     // show number of touch downs                     log.d(tag, "got " + motionevents.size() + " touch downs");                 }             }); 

this code works , better first one, feels more should done rx. complex because of nested observable , brain gymnastics necessary it. there buffer overload i'm missing automatically same (i.e. close window after last emission 1 sec old)?

edit: 1 of comments made me aware of presentation ben christensen, , found this: http://nerds.weddingpartyapp.com/tech/2015/01/05/debouncedbuffer-used-in-rxbus-example/, links few implementation of problem. seems pretty common requirement, nice have built-in operator this. anyways, consider solution presented in these other sources , here canon type of problems.


Comments

Popular posts from this blog

javascript - Google App Script ContentService downloadAsFile not working -

javascript - Function overwritting -

php - Find a regex to take part of Email -