android - Buffer events, start new group 1 second after the last event from previous group -
i on android , created observable of touch downs. , buffer them long new events coming within second. when second elapsed last touch down emission, want create list of collected events , emit it, , start new collection.
i have 2 snippets of code work either don't seem reactive (#1) or unnecessarily complex (#2). here are:
- i use
buffer(func0<? extends observable<? extends tclosing>> bufferclosingselector)overload - when returned selector emits item, means second elapsed last source emission , buffer can emitted. - the
obserbableselector function returnspublishsubjectcan decide when push emissions it. - there exists
runnabletask pushes new emissions closing subject. task scheduled (via android handler) run second after processed source touch down emission happened. when new source emission of new touch down happens afterwards within second, task cancelled, , new 1 scheduled again after second.
here relevant android code:
final publishsubject<motionevent> touchpublishsubject = publishsubject.create(); final viewgroup viewgroup = (viewgroup) findviewbyid(android.r.id.content); viewgroup.setontouchlistener(new view.ontouchlistener() { @override public boolean ontouch(view v, motionevent event) { touchpublishsubject.onnext(event); return true; } }); final publishsubject<object> windowclosepublishsubject = publishsubject.create(); final handler handler = new handler(looper.getmainlooper()); final runnable r = new runnable() { @override public void run() { log.d(tag, "will emit closing item"); windowclosepublishsubject.onnext("now!"); } }; viewobservable .bindview(viewgroup, touchpublishsubject) .filter(new func1<motionevent, boolean>() { @override public boolean call(motionevent motionevent) { return motionevent.getaction() == motionevent.action_down; } }) .doonnext(new action1<motionevent>() { @override public void call(motionevent motionevent) { // restart timer log.d(tag, "cancelling closing"); handler.removecallbacks(r); log.d(tag, "scheduling closing"); handler.postdelayed(r, 1000l); // show touch log.d(tag, motionevent.tostring()); } }) .buffer(new func0<observable<?>>() { @override public observable<?> call() { log.d(tag, "creating buffer closing selector"); return windowclosepublishsubject .doonnext(new action1<object>() { @override public void call(object o) { log.d(tag, "emitting closing item '" + o + "'"); } }); } }) .subscribe(new action1<list<motionevent>>() { @override public void call(list<motionevent> motionevents) { // show number of touch downs log.d(tag, "got " + motionevents.size() + " touch downs"); } }); i don't fancy usage of handler , in solution, looked further.
the second snippet (the touchpublishsubject , touch listener same):
- i reuse touch down generating
touchpublishsubjectwindow closing observable, debouncing first 1 sec timeout - apparently, debouncing happens on
scheduler.computation(), moves observing same scheduler , need useobserveon(androidschedulers.mainthread())- find bit strange nestedobservable's scheduler, closes buffer windows, promotes whole chain happen in scheduler well
the code:
final publishsubject<motionevent> touchpublishsubject = publishsubject.create(); final viewgroup viewgroup = (viewgroup) findviewbyid(android.r.id.content); viewgroup.setontouchlistener(new view.ontouchlistener() { @override public boolean ontouch(view v, motionevent event) { touchpublishsubject.onnext(event); return true; } }); viewobservable .bindview(viewgroup, touchpublishsubject) .filter(new func1<motionevent, boolean>() { @override public boolean call(motionevent motionevent) { return motionevent.getaction() == motionevent.action_down; } }) .doonnext(new action1<motionevent>() { @override public void call(motionevent motionevent) { // show touch log.d(tag, motionevent.tostring()); } }) .buffer(new func0<observable<?>>() { @override public observable<?> call() { log.d(tag, "creating buffer closing selector"); return touchpublishsubject .debounce(1l, timeunit.seconds) .doonnext(new action1<object>() { @override public void call(object o) { log.d(tag, "emitting closing item '" + o + "'"); } }); } }) .observeon(androidschedulers.mainthread()) .subscribe(new action1<list<motionevent>>() { @override public void call(list<motionevent> motionevents) { // show number of touch downs log.d(tag, "got " + motionevents.size() + " touch downs"); } }); this code works , better first one, feels more should done rx. complex because of nested observable , brain gymnastics necessary it. there buffer overload i'm missing automatically same (i.e. close window after last emission 1 sec old)?
edit: 1 of comments made me aware of presentation ben christensen, , found this: http://nerds.weddingpartyapp.com/tech/2015/01/05/debouncedbuffer-used-in-rxbus-example/, links few implementation of problem. seems pretty common requirement, nice have built-in operator this. anyways, consider solution presented in these other sources , here canon type of problems.
Comments
Post a Comment