c# - Running Task<T> on a custom scheduler -


i creating generic helper class prioritise requests made api whilst restricting parallelisation @ occur.

consider key method of application below;

    public iqueuedtaskhandle<tresponse> invokerequest<tresponse>(func<tclient, task<tresponse>> invocation, queuedclientpriority priority, cancellationtoken ct) tresponse : iserviceresponse     {         var cts = cancellationtokensource.createlinkedtokensource(ct);         _logger.debug("queueing task.");         var tasktoqueue = task.factory.startnew(async () =>         {             _logger.debug("starting  request {0}", task.currentid);             return await invocation(_client);         }, cts.token, taskcreationoptions.none, _schedulers[priority]).unwrap();         tasktoqueue.continuewith(task => _logger.debug("finished task {0}", task.id), cts.token);         return new ecosystemqueuedtaskhandle<tresponse>(cts, priority, tasktoqueue);     } 

without going many details, want invoke tasks returned task<tresponse>> invocation when turn in queue arises. using collection of queues constructed using queuedtaskscheduler indexed unique enumeration;

        _queuedtaskscheduler = new queuedtaskscheduler(taskscheduler.default, 3);         _schedulers = new dictionary<queuedclientpriority, taskscheduler>();         //enumerate priorities         foreach (var priority in enum.getvalues(typeof(queuedclientpriority)))         {             _schedulers.add((queuedclientpriority)priority, _queuedtaskscheduler.activatenewqueue((int)priority));         } 

however, little success can't tasks execute in limited parallelised environment, leading 100 api requests being constructed, fired, , completed in 1 big batch. can tell using fiddler session;

enter image description here

i have read interesting articles , posts (here, here , here) thought detail how go this, far have not been able figure out. understand, async nature of lambda working in continuation structure designed, marking generated task complete, "insta-completing" it. means whilst queues working fine, runing generated task<t> on custom scheduler turning out problem.

stephen cleary's answer explains why can't use taskscheduler purpose , how can use actionblock limit degree of parallelism. if want add priorities that, think you'll have manually. approach of using dictionary of queues reasonable, simple implementation (with no support cancellation or completion) of this:

class scheduler {     private static readonly priority[] priorities =         (priority[])enum.getvalues(typeof(priority));      private readonly ireadonlydictionary<priority, concurrentqueue<func<task>>> queues;     private readonly actionblock<func<task>> executor;     private readonly semaphoreslim semaphore;      public scheduler(int degreeofparallelism)     {         queues = priorities.todictionary(             priority => priority, _ => new concurrentqueue<func<task>>());          executor = new actionblock<func<task>>(             invocation => invocation(),             new executiondataflowblockoptions             {                 maxdegreeofparallelism = degreeofparallelism,                 boundedcapacity = degreeofparallelism             });          semaphore = new semaphoreslim(0);          task.run(watch);     }      private async task watch()     {         while (true)         {             await semaphore.waitasync();              // find item highest priority , send execution             foreach (var priority in priorities.reverse())             {                 func<task> invocation;                 if (queues[priority].trydequeue(out invocation))                 {                     await executor.sendasync(invocation);                 }             }         }     }      public void invoke(func<task> invocation, priority priority)     {         queues[priority].enqueue(invocation);         semaphore.release(1);     } } 

Comments

Popular posts from this blog

c# - Validate object ID from GET to POST -

node.js - Custom Model Validator SailsJS -

php - Find a regex to take part of Email -