首页 / C# / C# 异步并发操作,只保留最后一次操作
C# 异步并发操作,只保留最后一次操作
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了C# 异步并发操作,只保留最后一次操作,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11973字,纯文字阅读大概需要18分钟。
内容图文
![C# 异步并发操作,只保留最后一次操作](/upload/InfoBanner/zyjiaocheng/1253/338805b7b56c4bd2abe0452f25c81b8d.jpg)
在我们业务操作时,难免会有多次操作,我们期望什么结果呢?
绝大部分情况,应该是只需要最后一次操作的结果,其它操作应该无效。
自定义等待的任务类
1. 可等待的任务类 AwaitableTask:
![技术分享图片](/upload/getfiles/default/2022/10/31/20221031124502551.jpg)
![技术分享图片](/img/jian.gif)
1 /// <summary> 2 /// 可等待的任务 3 /// </summary> 4 public class AwaitableTask 5 { 6 /// <summary> 7 /// 获取任务是否为不可执行状态 8 /// </summary> 9 public bool NotExecutable { get; privateset; } 10 11///<summary> 12/// 设置任务不可执行 13///</summary> 14publicvoid SetNotExecutable() 15 { 16 NotExecutable = true; 17 } 18 19///<summary> 20/// 获取任务是否有效 21/// 注:对无效任务,可以不做处理。减少并发操作导致的干扰 22///</summary> 23publicbool IsInvalid { get; privateset; } = true; 24 25///<summary> 26/// 标记任务无效 27///</summary> 28publicvoid MarkTaskValid() 29 { 30 IsInvalid = false; 31 } 32 33#region Task 34 35privatereadonly Task _task; 36///<summary> 37/// 初始化可等待的任务。 38///</summary> 39///<param name="task"></param> 40public AwaitableTask(Task task) => _task = task; 41 42///<summary> 43/// 获取任务是否已完成 44///</summary> 45publicbool IsCompleted => _task.IsCompleted; 46 47///<summary> 48/// 任务的Id 49///</summary> 50publicint TaskId => _task.Id; 51 52///<summary> 53/// 开始任务 54///</summary> 55publicvoid Start() => _task.Start(); 56 57///<summary> 58/// 同步执行开始任务 59///</summary> 60publicvoid RunSynchronously() => _task.RunSynchronously(); 61 62#endregion 63 64#region TaskAwaiter 65 66///<summary> 67/// 获取任务等待器 68///</summary> 69///<returns></returns> 70public TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 71 72///<summary>Provides an object that waits for the completion of an asynchronous task. </summary> 73 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 74publicstruct TaskAwaiter : INotifyCompletion 75 { 76privatereadonly AwaitableTask _task; 77 78///<summary> 79/// 任务等待器 80///</summary> 81///<param name="awaitableTask"></param> 82public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask; 83 84///<summary> 85/// 任务是否完成. 86///</summary> 87publicbool IsCompleted => _task._task.IsCompleted; 88 89///<inheritdoc /> 90publicvoid OnCompleted(Action continuation) 91 { 92var This = this; 93 _task._task.ContinueWith(t => 94 { 95if (!This._task.NotExecutable) continuation?.Invoke(); 96 }); 97 } 98///<summary> 99/// 获取任务结果 100///</summary>101publicvoid GetResult() => _task._task.Wait(); 102 } 103104#endregion105106 }
无效的操作可以分为以下俩种:
- 已经进行中的操作,后续结果应标记为无效
- 还没开始的操作,后续不执行
自定义任务类型 AwaitableTask中,添加俩个字段NotExecutable、IsInvalid:
1 /// <summary> 2 /// 获取任务是否为不可执行状态 3 /// </summary> 4 public bool NotExecutable { get; privateset; } 5///<summary>6/// 获取任务是否有效 7/// 注:对无效任务,可以不做处理。减少并发操作导致的干扰 8///</summary>9publicbool IsInvalid { get; privateset; } = true;
2. 有返回结果的可等待任务类 AwaitableTask<TResult>:
![技术分享图片](/upload/getfiles/default/2022/10/31/20221031124502551.jpg)
![技术分享图片](/img/jian.gif)
1 /// <summary> 2 /// 可等待的任务 3 /// </summary> 4 /// <typeparam name="TResult"></typeparam> 5 public class AwaitableTask<TResult> : AwaitableTask 6 { 7privatereadonly Task<TResult> _task; 8///<summary> 9/// 初始化可等待的任务 10///</summary>11///<param name="task">需要执行的任务</param>12public AwaitableTask(Task<TResult> task) : base(task) => _task = task; 1314#region TaskAwaiter 1516///<summary>17/// 获取任务等待器 18///</summary>19///<returns></returns>20publicnew TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 2122///<summary>23/// 任务等待器 24///</summary>25 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 26publicnewstruct TaskAwaiter : INotifyCompletion 27 { 28privatereadonly AwaitableTask<TResult> _task; 2930///<summary>31/// 初始化任务等待器 32///</summary>33///<param name="awaitableTask"></param>34public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 3536///<summary>37/// 任务是否已完成。 38///</summary>39publicbool IsCompleted => _task._task.IsCompleted; 4041///<inheritdoc />42publicvoid OnCompleted(Action continuation) 43 { 44var This = this; 45 _task._task.ContinueWith(t => 46 { 47if (!This._task.NotExecutable) continuation?.Invoke(); 48 }); 49 } 5051///<summary>52/// 获取任务结果。 53///</summary>54///<returns></returns>55public TResult GetResult() => _task._task.Result; 56 } 5758#endregion59 }
添加任务等待器,同步等待结果返回:
1 /// <summary> 2 /// 获取任务等待器 3 /// </summary> 4 /// <returns></returns> 5 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 6 7///<summary> 8/// 任务等待器 9///</summary>10 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 11publicnewstruct TaskAwaiter : INotifyCompletion 12 { 13privatereadonly AwaitableTask<TResult> _task; 1415///<summary>16/// 初始化任务等待器 17///</summary>18///<param name="awaitableTask"></param>19public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 2021///<summary>22/// 任务是否已完成。 23///</summary>24publicbool IsCompleted => _task._task.IsCompleted; 2526///<inheritdoc />27publicvoid OnCompleted(Action continuation) 28 { 29var This = this; 30 _task._task.ContinueWith(t => 31 { 32if (!This._task.NotExecutable) continuation?.Invoke(); 33 }); 34 } 3536///<summary>37/// 获取任务结果。 38///</summary>39///<returns></returns>40public TResult GetResult() => _task._task.Result; 41 }
异步任务队列
添加异步任务队列类,用于任务的管理,如添加、执行、筛选等:
![技术分享图片](/upload/getfiles/default/2022/10/31/20221031124502551.jpg)
![技术分享图片](/img/jian.gif)
1 /// <summary> 2 /// 异步任务队列 3 /// </summary> 4 public class AsyncTaskQueue : IDisposable 5 { 6 /// <summary> 7 /// 异步任务队列 8 /// </summary> 9 public AsyncTaskQueue() 10 { 11 _autoResetEvent = new AutoResetEvent(false); 12 _thread = new Thread(InternalRunning) { IsBackground = true }; 13 _thread.Start(); 14 } 15 16#region 执行 17 18///<summary> 19/// 执行异步操作 20///</summary> 21///<typeparam name="T">返回结果类型</typeparam> 22///<param name="func">异步操作</param> 23///<returns>isInvalid:异步操作是否有效;result:异步操作结果</returns> 24publicasync Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 25 { 26var task = GetExecutableTask(func); 27var result = awaitawait task; 28if (!task.IsInvalid) 29 { 30 result = default(T); 31 } 32return (task.IsInvalid, result); 33 } 34 35///<summary> 36/// 执行异步操作 37///</summary> 38///<typeparam name="T"></typeparam> 39///<param name="func"></param> 40///<returns></returns> 41publicasync Task<bool> ExecuteAsync<T>(Func<Task> func) 42 { 43var task = GetExecutableTask(func); 44awaitawait task; 45return task.IsInvalid; 46 } 47 48#endregion 49 50#region 添加任务 51 52///<summary> 53/// 获取待执行任务 54///</summary> 55///<param name="action"></param> 56///<returns></returns> 57private AwaitableTask GetExecutableTask(Action action) 58 { 59var awaitableTask = new AwaitableTask(new Task(action)); 60 AddPenddingTaskToQueue(awaitableTask); 61return awaitableTask; 62 } 63 64///<summary> 65/// 获取待执行任务 66///</summary> 67///<typeparam name="TResult"></typeparam> 68///<param name="function"></param> 69///<returns></returns> 70private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function) 71 { 72var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function)); 73 AddPenddingTaskToQueue(awaitableTask); 74return awaitableTask; 75 } 76 77///<summary> 78/// 添加待执行任务到队列 79///</summary> 80///<param name="task"></param> 81///<returns></returns> 82privatevoid AddPenddingTaskToQueue(AwaitableTask task) 83 { 84//添加队列,加锁。 85lock (_queue) 86 { 87 _queue.Enqueue(task); 88//开始执行任务 89 _autoResetEvent.Set(); 90 } 91 } 92 93#endregion 94 95#region 内部运行 96 97privatevoid InternalRunning() 98 { 99while (!_isDisposed) 100 { 101if (_queue.Count == 0) 102 { 103//等待后续任务104 _autoResetEvent.WaitOne(); 105 } 106while (TryGetNextTask(outvar task)) 107 { 108//如已从队列中删除109if (task.NotExecutable) continue; 110111if (UseSingleThread) 112 { 113 task.RunSynchronously(); 114 } 115else116 { 117 task.Start(); 118 } 119 } 120 } 121 } 122///<summary>123/// 上一次异步操作 124///</summary>125private AwaitableTask _lastDoingTask; 126privatebool TryGetNextTask(out AwaitableTask task) 127 { 128 task = null; 129while (_queue.Count > 0) 130 { 131//获取并从队列中移除任务132if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 133 { 134//设置进行中的异步操作无效135 _lastDoingTask?.MarkTaskValid(); 136 _lastDoingTask = task; 137returntrue; 138 } 139//并发操作,设置任务不可执行140 task.SetNotExecutable(); 141 } 142returnfalse; 143 } 144145#endregion146147#region dispose 148149///<inheritdoc />150publicvoid Dispose() 151 { 152 Dispose(true); 153 GC.SuppressFinalize(this); 154 } 155156///<summary>157/// 析构任务队列 158///</summary>159 ~AsyncTaskQueue() => Dispose(false); 160161privatevoid Dispose(bool disposing) 162 { 163if (_isDisposed) return; 164if (disposing) 165 { 166 _autoResetEvent.Dispose(); 167 } 168 _thread = null; 169 _autoResetEvent = null; 170 _isDisposed = true; 171 } 172173#endregion174175#region 属性及字段 176177///<summary>178/// 是否使用单线程完成任务. 179///</summary>180publicbool UseSingleThread { get; set; } = true; 181182///<summary>183/// 自动取消以前的任务。 184///</summary>185publicbool AutoCancelPreviousTask { get; set; } = false; 186187privatebool _isDisposed; 188privatereadonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>(); 189private Thread _thread; 190private AutoResetEvent _autoResetEvent; 191192#endregion193194 }
1. 自动取消之前的任务 AutoCancelPreviousTask
内部使用线程,循环获取当前任务列表,如果当前任务被标记NotExecutable不可执行,则跳过。
NotExecutable是何时标记的?
获取任务时,标记所有获取的任务为NotExecutable。直到任务列表中为空,那么只执行最后获取的一个任务。
2. 标记已经进行的任务无效 MarkTaskValid
当前进行的任务,无法中止,那么标记为无效即可。
1 /// <summary> 2 /// 上一次异步操作 3 /// </summary> 4 private AwaitableTask _lastDoingTask; 5 private bool TryGetNextTask(out AwaitableTask task) 6 { 7 task = null; 8while (_queue.Count > 0) 9 { 10//获取并从队列中移除任务11if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 12 { 13//设置进行中的异步操作无效14 _lastDoingTask?.MarkTaskValid(); 15 _lastDoingTask = task; 16returntrue; 17 } 18//并发操作,设置任务不可执行19 task.SetNotExecutable(); 20 } 21returnfalse; 22 }
后续执行完后,根据此标记,设置操作结果为空。
1 /// <summary> 2 /// 执行异步操作 3 /// </summary> 4 /// <typeparam name="T"> 返回结果类型 </typeparam> 5 /// <param name="func"> 异步操作 </param> 6 /// <returns> isInvalid:异步操作是否有效;result:异步操作结果 </returns> 7 public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 8 { 9var task = GetExecutableTask(func); 10var result = awaitawait task; 11if (!task.IsInvalid) 12 { 13 result = default(T); 14 } 15return (task.IsInvalid, result); 16 }
实践测试
启动10个并发任务,测试实际的任务队列并发操作管理:
1 public MainWindow() 2 { 3 InitializeComponent(); 4 _asyncTaskQueue = new AsyncTaskQueue 5 { 6 AutoCancelPreviousTask = true, 7 UseSingleThread = true 8 }; 9 } 10private AsyncTaskQueue _asyncTaskQueue; 11privatevoid ButtonBase_OnClick(object sender, RoutedEventArgs e) 12 { 13// 快速启动10个任务14for (var i = 1; i < 10; i++) 15 { 16 Test(_asyncTaskQueue, i); 17 } 18 } 19publicstaticasyncvoid Test(AsyncTaskQueue taskQueue, int num) 20 { 21var result = await taskQueue.ExecuteAsync(async () => 22 { 23 Debug.WriteLine("输入:" + num); 24// 长时间耗时任务25await Task.Delay(TimeSpan.FromSeconds(5)); 26return num * 100; 27 }); 28 Debug.WriteLine($"{num}输出的:" + result); 29 }
测试结果如下:
只有最后一次操作结果,才是有效的。其它9次操作,一次是无效的,8次操作被取消不执行。
Demo,见Github
原文:https://www.cnblogs.com/kybs0/p/11988554.html
内容总结
以上是互联网集市为您收集整理的C# 异步并发操作,只保留最后一次操作全部内容,希望文章能够帮你解决C# 异步并发操作,只保留最后一次操作所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。