《手搓》TaskFactory帶(dai)你安全的起飛
一、TaskFactory也能手搓
- 是的TaskFactory也能手搓
- 而且效果是杠杠的
二、現在繼續說程序優化的事情
1. 話說產品經理小馬給工程師小牛提了需求批量獲取產品詳情
- 小牛三下五除二就干上線了
- 代碼那是寫的干凈又漂亮,沒有一行多余的代碼
- 只是性能有一點點瑕疵
- 每個商品獲取要0.1秒,獲取10個就是1秒多
- 小馬說: 我看博客園有個博主說了,不要在循環里面直接await
- 小牛說: 是嗎?轉發我看一下
_output.WriteLine($"begin {DateTime.Now:HH:mm:ss.fff}");
Stopwatch sw = Stopwatch.StartNew();
List<Product> products = new(10);
for (int i = 0; i < 10; i++)
{
var id = i;
var item = await GetProductAsync(id);
products.Add(item);
}
sw.Stop();
_output.WriteLine($"end {DateTime.Now:HH:mm:ss.fff}, Elapsed {sw.ElapsedMilliseconds}");
// begin 09:10:06.086
// end 09:10:07.206, Elapsed 1118
private const int _concurrentLimit = 6;
private readonly ConcurrentControl _control = new();
private readonly ITestOutputHelper _output = output;
internal async Task<Product> GetProductAsync(int id)
{
_control.Increment();
await Task.Delay(100);
var concurrent = _control.Count;
_control.Decrement();
if (concurrent > _concurrentLimit)
{
throw new Exception("Server is busy!!!");
}
return new(id);
}
2. 小牛說so easy,稍等
- 小牛立馬悟了
- 馬上把代碼優化了
- 可是一上線程序就掛了
- 由于上游允許的最高并發是6,10個并發把上游給打掛了
- 小牛說: 小馬你坑慘我了呀
- 小牛立馬私信來罵那個博客園博主
- 對筆者就是那個博主
- 自己挖的坑終歸是要填的,為此只好給小牛安利手搓TaskFactory
_output.WriteLine($"begin {DateTime.Now:HH:mm:ss.fff}");
Stopwatch sw = Stopwatch.StartNew();
List<Task<Product>> tasks = new(10);
for (int i = 0; i < 10; i++)
{
var id = i;
var task = GetProductAsync(id);
tasks.Add(task);
}
var products = await Task.WhenAll(tasks);
sw.Stop();
_output.WriteLine($"end {DateTime.Now:HH:mm:ss.fff}, Elapsed {sw.ElapsedMilliseconds}");
// begin 09:22:05.622
// System.Exception : Server is busy!!!
3. 用手搓TaskFactory來優化
- ConcurrentTaskFactory就是手搓的TaskFactory
- ConcurrencyLevel設置為6
- 使用StartTask發起異步請求
- 其他代碼都不變
- 耗時不到0.3秒,比原來快了3倍多
- http服務是天生的多線程
- 即使你單線程調用數據庫,在大量抓取的情況下,數據庫也會被打掛
- 不知道大家有沒有打掛過自家數據庫的經歷
- 用上《手搓》TaskFactory,可以對上游做到并發可控,程序改動還極小
- 小牛看得那是目瞪口呆,這是怎么實現的
- 小牛說: 快教教我
- 筆者說: 沒問題,我寫篇博文你看了自然就明白了
- 于是這篇文章誕生了
- 話說博客園也偶發數據庫被打掛的情況
- 不知道Dudu會不會看到這篇文章
var options = new ReduceOptions { ConcurrencyLevel = 6 };
var factory = new ConcurrentTaskFactory(options);
_output.WriteLine($"begin {DateTime.Now:HH:mm:ss.fff}");
Stopwatch sw = Stopwatch.StartNew();
List<Task<Product>> tasks = new(10);
for (int i = 0; i < 10; i++)
{
var id = i;
var task = factory.StartTask(() => GetProductAsync(id));
tasks.Add(task);
}
var products = await Task.WhenAll(tasks);
sw.Stop();
_output.WriteLine($"end {DateTime.Now:HH:mm:ss.fff}, Elapsed {sw.ElapsedMilliseconds}");
// begin 09:33:03.070
// end 09:33:03.370, Elapsed 299
三、 揭秘《手搓》TaskFactory
1. 單并發異步操作
- ConcurrencyLevel設置為1
- 異步調用10次
- 雖然執行異步的線程ID不止一個
- 實際執行每0.1秒執行一次
- 也就是等上一次執行完才開始下一次
- 現在誰還敢說異步線程不是乖寶寶!!!
var options = new ReduceOptions { ConcurrencyLevel = 1 };
var factory = new ConcurrentTaskFactory(options);
List<Task<Product>> tasks = new(10);
for (int i = 0; i < 10; i++)
{
var id = i;
var task = factory.StartTask(() => GetProductAsync(id));
tasks.Add(task);
}
var products = await Task.WhenAll(tasks);
Assert.NotNull(products);
Assert.Equal(10, products.Length);
internal async Task<Product> GetProductAsync(int id)
{
_control.Increment();
await Task.Delay(100);
var concurrent = _control.Count;
_control.Decrement();
if (concurrent > _concurrentLimit)
{
throw new Exception("Server is busy!!!");
}
_output.WriteLine($"Thread{Environment.CurrentManagedThreadId} GetProductAsync({id}),{DateTime.Now:HH:mm:ss.fff}");
return new(id);
}
// Thread10 GetProductAsync(0),10:04:37.009
// Thread11 GetProductAsync(1),10:04:37.135
// Thread32 GetProductAsync(2),10:04:37.263
// Thread11 GetProductAsync(3),10:04:37.391
// Thread32 GetProductAsync(4),10:04:37.518
// Thread11 GetProductAsync(5),10:04:37.646
// Thread11 GetProductAsync(6),10:04:37.774
// Thread11 GetProductAsync(7),10:04:37.902
// Thread11 GetProductAsync(8),10:04:38.030
// Thread11 GetProductAsync(9),10:04:38.158
2. 并發測試
- ConcurrencyLevel設置為4
- 異步調用100次
- 總耗時3秒
- 4個并發清晰可見
var options = new ReduceOptions { ConcurrencyLevel = 4 };
var factory = new ConcurrentTaskFactory(options);
_output.WriteLine($"begin {DateTime.Now:HH:mm:ss.fff}");
Stopwatch sw = Stopwatch.StartNew();
List<Task<Product>> tasks = new(100);
for (int i = 0; i < 100; i++)
{
var id = i;
var task = factory.StartTask(() => GetProductAsync(id));
tasks.Add(task);
}
var products = await Task.WhenAll(tasks);
sw.Stop();
_output.WriteLine($"end {DateTime.Now:HH:mm:ss.fff}, Elapsed {sw.ElapsedMilliseconds}");
Assert.NotNull(products);
Assert.Equal(100, products.Length);
// begin 10:20:45.317
// Thread36 GetProductAsync(0),10:20:45.487
// Thread11 GetProductAsync(2),10:20:45.487
// Thread8 GetProductAsync(3),10:20:45.487
// Thread35 GetProductAsync(1),10:20:45.487
// Thread11 GetProductAsync(6),10:20:45.614
// Thread35 GetProductAsync(4),10:20:45.614
// Thread8 GetProductAsync(7),10:20:45.614
// Thread36 GetProductAsync(5),10:20:45.614
// Thread35 GetProductAsync(9),10:20:45.742
// Thread8 GetProductAsync(8),10:20:45.742
// Thread41 GetProductAsync(11),10:20:45.742
// Thread37 GetProductAsync(10),10:20:45.742
// Thread37 GetProductAsync(12),10:20:45.869
// Thread8 GetProductAsync(14),10:20:45.869
// Thread11 GetProductAsync(13),10:20:45.869
// Thread41 GetProductAsync(15),10:20:45.869
// Thread8 GetProductAsync(18),10:20:45.997
// Thread35 GetProductAsync(17),10:20:45.997
// Thread41 GetProductAsync(19),10:20:45.997
// Thread11 GetProductAsync(16),10:20:45.997
// Thread11 GetProductAsync(20),10:20:46.125
// Thread8 GetProductAsync(22),10:20:46.125
// Thread35 GetProductAsync(21),10:20:46.125
// Thread41 GetProductAsync(23),10:20:46.125
// Thread37 GetProductAsync(27),10:20:46.253
// Thread41 GetProductAsync(26),10:20:46.253
// Thread35 GetProductAsync(25),10:20:46.253
// Thread11 GetProductAsync(24),10:20:46.253
// Thread35 GetProductAsync(29),10:20:46.381
// Thread41 GetProductAsync(28),10:20:46.381
// Thread11 GetProductAsync(31),10:20:46.381
// Thread37 GetProductAsync(30),10:20:46.381
// Thread8 GetProductAsync(32),10:20:46.507
// Thread37 GetProductAsync(34),10:20:46.507
// Thread41 GetProductAsync(35),10:20:46.507
// Thread11 GetProductAsync(33),10:20:46.507
// Thread37 GetProductAsync(39),10:20:46.635
// Thread11 GetProductAsync(37),10:20:46.635
// Thread41 GetProductAsync(36),10:20:46.635
// Thread35 GetProductAsync(38),10:20:46.635
// Thread41 GetProductAsync(40),10:20:46.763
// Thread37 GetProductAsync(41),10:20:46.763
// Thread35 GetProductAsync(42),10:20:46.763
// Thread11 GetProductAsync(43),10:20:46.763
// Thread41 GetProductAsync(47),10:20:46.891
// Thread8 GetProductAsync(44),10:20:46.891
// Thread11 GetProductAsync(46),10:20:46.891
// Thread37 GetProductAsync(45),10:20:46.891
// Thread37 GetProductAsync(51),10:20:47.018
// Thread8 GetProductAsync(49),10:20:47.018
// Thread41 GetProductAsync(48),10:20:47.018
// Thread11 GetProductAsync(50),10:20:47.018
// Thread41 GetProductAsync(55),10:20:47.146
// Thread11 GetProductAsync(54),10:20:47.146
// Thread8 GetProductAsync(52),10:20:47.146
// Thread37 GetProductAsync(53),10:20:47.146
// Thread11 GetProductAsync(59),10:20:47.274
// Thread8 GetProductAsync(58),10:20:47.274
// Thread37 GetProductAsync(56),10:20:47.274
// Thread41 GetProductAsync(57),10:20:47.274
// Thread41 GetProductAsync(62),10:20:47.402
// Thread11 GetProductAsync(63),10:20:47.402
// Thread37 GetProductAsync(60),10:20:47.402
// Thread8 GetProductAsync(61),10:20:47.402
// Thread41 GetProductAsync(66),10:20:47.530
// Thread88 GetProductAsync(64),10:20:47.530
// Thread35 GetProductAsync(65),10:20:47.530
// Thread11 GetProductAsync(67),10:20:47.530
// Thread11 GetProductAsync(68),10:20:47.658
// Thread41 GetProductAsync(70),10:20:47.658
// Thread8 GetProductAsync(69),10:20:47.658
// Thread35 GetProductAsync(71),10:20:47.658
// Thread41 GetProductAsync(74),10:20:47.786
// Thread95 GetProductAsync(75),10:20:47.786
// Thread35 GetProductAsync(73),10:20:47.786
// Thread88 GetProductAsync(72),10:20:47.786
// Thread95 GetProductAsync(78),10:20:47.914
// Thread41 GetProductAsync(77),10:20:47.914
// Thread88 GetProductAsync(79),10:20:47.914
// Thread8 GetProductAsync(76),10:20:47.914
// Thread95 GetProductAsync(80),10:20:48.042
// Thread41 GetProductAsync(83),10:20:48.042
// Thread8 GetProductAsync(82),10:20:48.042
// Thread35 GetProductAsync(81),10:20:48.042
// Thread95 GetProductAsync(84),10:20:48.170
// Thread35 GetProductAsync(86),10:20:48.170
// Thread41 GetProductAsync(85),10:20:48.170
// Thread8 GetProductAsync(87),10:20:48.170
// Thread11 GetProductAsync(90),10:20:48.297
// Thread88 GetProductAsync(88),10:20:48.297
// Thread8 GetProductAsync(89),10:20:48.297
// Thread35 GetProductAsync(91),10:20:48.297
// Thread11 GetProductAsync(95),10:20:48.425
// Thread8 GetProductAsync(94),10:20:48.425
// Thread41 GetProductAsync(93),10:20:48.425
// Thread35 GetProductAsync(92),10:20:48.425
// Thread41 GetProductAsync(98),10:20:48.553
// Thread35 GetProductAsync(99),10:20:48.553
// Thread8 GetProductAsync(97),10:20:48.553
// Thread88 GetProductAsync(96),10:20:48.553
// end 10:20:48.553, Elapsed 3235
3. 《手搓》TaskFactory異步排隊原理
- 《手搓》TaskFactory控制住了異步的開始和結束
- 當發起異步請求后,該線程并不在《手搓》TaskFactory的線程池中
- 為了控制并發,也扣除并發配額
- 并在注冊ContinueWith中返還并發配額
- 控制住了異步的開始和結束,就相當于異步線程也在《手搓》TaskFactory的線程池中運行
- 配額不夠,后面的異步線程就不會觸發
四、總結
- 《手搓》TaskFactory里面內置了線程池
- 應該定義為靜態的字段或屬性
- 也可以注冊到容器中為單例對象
- 可以對每個數據庫或其他上游資源分別建一個實例,用來控制并發
- 這樣就可以放心大膽的使用并發
- 程序性能開始起飛
- 同時程序的安全性也得到了堅實的保障
- 涉及IO異步操作線程池可以設大一點,只要不會打掛上游就行
- IO異步線程和普通線程不一樣,不會太影響CPU
- 同步和異步IO的線程最好不要混用,影響配額的準確配置
另外源碼托管地址: ,歡迎大家直接查看源碼。
gitee同步更新:
如果大家喜歡(huan)請動動您發財的小(xiao)手手幫忙點一下Star,謝(xie)謝(xie)!!!