C#中的生产者/混合消费者使用4.0框架类和Blocking Collection
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了C#中的生产者/混合消费者使用4.0框架类和Blocking Collection,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3511字,纯文字阅读大概需要6分钟。
内容图文
![C#中的生产者/混合消费者使用4.0框架类和Blocking Collection](/upload/InfoBanner/zyjiaocheng/779/d4850fa1ac91409988232db9fa6112a7.jpg)
我有一种生产者/消费者情景的情况.生产者永远不会停止,这意味着即使有时间BC中没有物品,也可以在以后添加更多物品.
从.NET Framework 3.5迁移到4.0,我决定使用BlockingCollection作为使用者和生产者之间的并发队列.我甚至添加了一些并行扩展,因此我可以使用带有Parallel.ForEach的BC.
问题是,在消费者线程中,我需要一种混合模型:
>我总是检查BC处理任何到达的项目
Parallel.ForEach(bc.GetConsumingEnumerable(),item =>等
>在这个foreach中,我执行彼此之间不相互依赖的所有任务.
>问题来了.在对前面的任务进行并行化之后,我需要按照它们在BC中的相同FIFO顺序来管理它们的结果.这些结果的处理应该在同步线程中进行.
伪代码中的一个小例子如下:
制片人:
//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}
消费者:
private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
}
});
}
显然,这不能正常工作,因为.GetConsumingEnumerable()会阻塞,直到BC中有另一个项目.我认为我可以完成任务,只需在同一批次中激活4或5个任务,但是:
>我怎么能用任务执行此操作,并且在阻止任务开始之前仍然有一个等待点,直到BC中有一个项目被消耗(如果没有任何内容,我不想开始处理.一旦有BC中的东西我只是开始批量的4个任务,并在每个任务中使用一个TryTake,所以如果没有什么可以采取它们不会阻止,因为我不知道我是否总能达到来自的项目数量BC作为一批任务,例如BC中只留下一个项目和一批4个任务)?
>我怎么能这样做并利用Parallel.For提供的效率?
>我如何以从BC中提取项目的相同FIFO顺序保存任务结果?
>是否还有其他并发类更适合消费者对这类项目的混合处理?
>此外,这是我在StackOverflow中做的第一个问题,所以如果您需要更多数据,或者您认为我的问题不正确,请告诉我.
解决方法:
我想我遵循你的要求,为什么不创建一个ConcurrentBag并在处理时添加到它:
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
var q = new ConcurrentBag<ScannedPage>();
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
q.Add(item);
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
//process items in your list here by sorting using some sequence key
var items = q.OrderBy( o=> o.SeqNbr).ToList();
foreach( var item in items){
...
}
}
这显然不会按照它们添加到BC的确切顺序排列它们,但是您可以像Alex建议的那样向ScannedPage对象添加一些序列nbr,然后对结果进行排序.
这是我如何处理序列:
将其添加到ScannedPage类:
public static int _counter; //public because this is just an example but it would work.
获取序列nbr并在此处指定:
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
lock( this){ //to single thread this process.. not necessary if it's already single threaded of course.
System.Threading.Interlocked.Increment( ref ScannedPage._counter);
scannedPage.SeqNbr = ScannedPage._counter;
...
}
}
内容总结
以上是互联网集市为您收集整理的C#中的生产者/混合消费者使用4.0框架类和Blocking Collection全部内容,希望文章能够帮你解决C#中的生产者/混合消费者使用4.0框架类和Blocking Collection所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。