c# – 同时处理单个和批处理请求的体系结构
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了c# – 同时处理单个和批处理请求的体系结构,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3448字,纯文字阅读大概需要5分钟。
内容图文
![c# – 同时处理单个和批处理请求的体系结构](/upload/InfoBanner/zyjiaocheng/791/0782598c906c4863bbb67c8981076233.jpg)
我有一个Windows服务托管的WCF服务.此服务公开了两种方法:
> bool ProcessClaim(字符串选项,ref string xml);将一些数据作为输入,进行一些处理(包括IO绑定操作,如DB查询),并返回结果.
> void RunJob(string ticket);立即返回.根据票据,从存储器(例如DB或文件系统)读取输入数据,对每个数据元素执行相同的处理,并将结果保存回存储器.批次通常包括许多索赔.
用户可以调用ProcessClaim来处理单个请求,并调用RunJob来运行批处理.几个批次可以同时运行.每个处理请求都包装为Task,因此所有请求都是并行执行的.
问题不是允许批处理通过安排大量请求来阻塞处理队列.换句话说,如果用户执行大批量,它将在很长一段时间内阻止小批量和单个处理请求.
所以我提出了以下模式,由Albahari(非常简短地)描述:
public sealed class ProcessingQueue : IDisposable
{
private class WorkItem
{
public readonly TaskCompletionSource<string> TaskSource;
public readonly string Options;
public readonly string Claim;
public readonly CancellationToken? CancelToken;
public WorkItem(
TaskCompletionSource<string> taskSource,
string options,
string claim,
CancellationToken? cancelToken)
{
TaskSource = taskSource;
Options = options;
Claim = claim;
CancelToken = cancelToken;
}
}
public ProcessingQueue()
: this(Environment.ProcessorCount)
{
}
public ProcessingQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
for (var i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
private readonly BlockingCollection<WorkItem> _taskQ;
public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
{
var tcs = new TaskCompletionSource<string>();
_taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
return tcs.Task;
}
public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
{
return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
}
private void Consume()
{
foreach (var workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
{
try
{
workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
private static string ProcessItem(string options, string claim)
{
// do some actual work here
Thread.Sleep(2000); // simulate work;
return options + claim; // return final result
}
}
静态方法ProcessRequest可用于处理单个请求,而实例方法EnqueueTask用于批处理.当然,所有批处理必须使用ProcessingQueue的单个共享实例.虽然这种方法效果很好,并且可以控制多个批次同时运行的速度,但是我觉得有些不对劲:
>必须手动维护工作线程池
>难以猜测最佳工作线程数(我默认使用处理器核心数)
>当没有批处理正在运行时,一堆线程仍然被阻塞,浪费系统资源
>处理块工作线程的IO绑定部分降低了CPU使用效率
我想知道,有没有更好的方法来处理这种情况?
更新:
其中一个要求是为批次提供全部功率,这意味着当用户执行一个批次,并且没有其他传入请求时,所有资源必须专用于处理此批次.
解决方法:
我想说,拥有一个服务接口和一个托管容器来处理这两种非常不同类型的需求可能是一个错误.
您应该将服务分解为两个 – 一个按需返回对各个请求的响应,另一个队列批量查询并在单个线程上处理它们.
通过这种方式,您可以为实时消费者提供高可用性渠道,并为您的批量消费者提供离线渠道.这些可以作为单独的关注点进行部署和管理,允许您在每个服务接口上提供不同的服务级别.
只是我对提出的架构的想法.
UPDATE
事实是您的音量处理通道是一个离线通道.这意味着消费者必须排队等待并且不确定他们的请求返回的时间.
那么工作队列怎么样?每个作业在处理时都会获得所有可用资源.处理完作业后,调用者会收到作业已完成的通知.
内容总结
以上是互联网集市为您收集整理的c# – 同时处理单个和批处理请求的体系结构全部内容,希望文章能够帮你解决c# – 同时处理单个和批处理请求的体系结构所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。