c# - Running Task<T> on a custom scheduler -
i creating generic helper class prioritise requests made api whilst restricting parallelisation @ occur.
consider key method of application below;
public iqueuedtaskhandle<tresponse> invokerequest<tresponse>(func<tclient, task<tresponse>> invocation, queuedclientpriority priority, cancellationtoken ct) tresponse : iserviceresponse { var cts = cancellationtokensource.createlinkedtokensource(ct); _logger.debug("queueing task."); var tasktoqueue = task.factory.startnew(async () => { _logger.debug("starting request {0}", task.currentid); return await invocation(_client); }, cts.token, taskcreationoptions.none, _schedulers[priority]).unwrap(); tasktoqueue.continuewith(task => _logger.debug("finished task {0}", task.id), cts.token); return new ecosystemqueuedtaskhandle<tresponse>(cts, priority, tasktoqueue); }
without going many details, want invoke tasks returned task<tresponse>> invocation
when turn in queue arises. using collection of queues constructed using queuedtaskscheduler
indexed unique enumeration;
_queuedtaskscheduler = new queuedtaskscheduler(taskscheduler.default, 3); _schedulers = new dictionary<queuedclientpriority, taskscheduler>(); //enumerate priorities foreach (var priority in enum.getvalues(typeof(queuedclientpriority))) { _schedulers.add((queuedclientpriority)priority, _queuedtaskscheduler.activatenewqueue((int)priority)); }
however, little success can't tasks execute in limited parallelised environment, leading 100 api requests being constructed, fired, , completed in 1 big batch. can tell using fiddler session;
i have read interesting articles , posts (here, here , here) thought detail how go this, far have not been able figure out. understand, async
nature of lambda working in continuation structure designed, marking generated task complete, "insta-completing" it. means whilst queues working fine, runing generated task<t>
on custom scheduler turning out problem.
stephen cleary's answer explains why can't use taskscheduler
purpose , how can use actionblock
limit degree of parallelism. if want add priorities that, think you'll have manually. approach of using dictionary
of queues reasonable, simple implementation (with no support cancellation or completion) of this:
class scheduler { private static readonly priority[] priorities = (priority[])enum.getvalues(typeof(priority)); private readonly ireadonlydictionary<priority, concurrentqueue<func<task>>> queues; private readonly actionblock<func<task>> executor; private readonly semaphoreslim semaphore; public scheduler(int degreeofparallelism) { queues = priorities.todictionary( priority => priority, _ => new concurrentqueue<func<task>>()); executor = new actionblock<func<task>>( invocation => invocation(), new executiondataflowblockoptions { maxdegreeofparallelism = degreeofparallelism, boundedcapacity = degreeofparallelism }); semaphore = new semaphoreslim(0); task.run(watch); } private async task watch() { while (true) { await semaphore.waitasync(); // find item highest priority , send execution foreach (var priority in priorities.reverse()) { func<task> invocation; if (queues[priority].trydequeue(out invocation)) { await executor.sendasync(invocation); } } } } public void invoke(func<task> invocation, priority priority) { queues[priority].enqueue(invocation); semaphore.release(1); } }
Comments
Post a Comment