CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3305字,纯文字阅读大概需要5分钟。
内容图文
![CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式](/upload/InfoBanner/zyjiaocheng/947/bf7421016a84431da48666dfa639f28a.jpg)
这是我想做的事情:
>在需要处理的项目的内存中保持队列(即IsProcessed = 0)
>每5秒从db中获取未处理的项目,如果它们尚未在队列中,则添加它们
>从队列中连续提取项目,进行处理,并且每次处理项目时,都在db中对其进行更新(IsProcessed = 1)
>做到“尽可能平行”
我有一个服务的构造函数,例如
public MyService()
{
Ticker.Elapsed += FillQueue;
}
当服务启动时,我启动计时器
protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}
我的FillQueue就像
private static async void FillQueue(object source, ElapsedEventArgs e)
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}
我的ConsumeWork就像
private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}
但是,这可能是一个幼稚的实现,并且我想知道.NET是否具有正是这种情况所需的任何类型的类.
解决方法:
尽管@JSteward的答案是一个好的开始,但是您可以使用mixing up TPL-Dataflow和Rx.NET extensions进行改进,因为数据流块很容易成为数据的观察者,而使用Rx Timer则可以为您省去很多麻烦(Rx.Timer explanation) .
我们可以根据您的需要调整MSDN article,如下所示:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
另外,您对已处理项目的检查也不是很准确,因为有可能在完成处理时从db中选择未处理项目,但未在数据库中对其进行更新.在这种情况下,项目将从Queue< T>中删除,然后由生产者再次添加到队列中,这就是为什么我将ConcurrentBag<T>添加到此解决方案中(HashSet< T>不是线程安全的):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
您可以通过其他方式实现缓存清除,例如,启动“ GC”计时器,该计时器将定期从缓存中删除已处理的项目.
要停止事件和处理项目,您应该处置订阅,并可能完成ActionBlock:
subscription.Dispose();
action.Complete();
您可以在他们的guidelines on github中找到有关Rx.Net的更多信息.
内容总结
以上是互联网集市为您收集整理的CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式全部内容,希望文章能够帮你解决CodeGo.net>如何在Windows服务内实现一个连续的生产者-消费者模式所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。