c# - Reactive Extension : 3 Consecutive occurance of a Record -
i have hot observable -
class tablerecord
{ string propertya; datetime dt; dictionary<string,int> checkthreshholdvalues; }
i need check each value present in dictionary, if 3 consecutive values exceeding threshold or not. if yes, need generate new event property. (here each reacord come @ different of 15 minutes ie 4 records in 1 hour.
class generatedrecord { string propertya; datetime lasttimeobserved; string dictionarypropertyname; int lastvalue }
i know have use groupbyuntil buffer, somehow unable think , how split dictionary individual parts still keeping condition appreciated.
example : lets have propertya value in {"towna","townb","townc"} lets properties in dictionary {"pollution","temperature","pressure"}
lets say, recv values per 15 minutes, data -
{"towna" ,"6/14/2015 10:00",{ {"pollution",61},{"temperature",48},{"pressure",40}}} {"towna" ,"6/14/2015 10:15",{ {"pollution",63},{"temperature",63},{"pressure",40}}} {"towna" ,"6/14/2015 10:30",{ {"pollution",49},{"temperature",64},{"pressure",40}}} {"towna" ,"6/14/2015 10:45",{ {"pollution",70},{"temperature",65},{"pressure",40}}}
say threshhold value 60. above example, temperature satisfies condition has value of {63,64,65 } . pollution 2 consecutive values above 60
regards
you should able overlapping windows couldn't work out syntax start new window every time element received , close window when contains 3 items. here conceptual solution based on observable.create , maintaining queue. in case use interval , simple threshold (>3):
observable.create<string>(observer => { var queue = new queue<long>(3); return observable.interval(timespan.fromseconds(1)).subscribe(value => { if (queue.count == 3) { queue.dequeue(); } queue.enqueue(value); if (queue.count == 3) { var values = queue.toarray(); if (values.all(v => v > 3)) { observer.onnext("all above 3. " + values[0] + " " + values[1] + " " + values[2]); } } }); })
Comments
Post a Comment