隨著多核時代的到來,并行開發(fā)越來越展示出它的強大威力,像我們這樣的碼農(nóng)再也不用過多的關(guān)注底層線程的實現(xiàn)和手工控制,
要了解并行開發(fā),需要先了解下兩個概念:“硬件線程”和“軟件線程”。
1. 硬件線程
相信大家手頭的電腦都是雙核以上的,像我這樣古董的電腦都是雙核的,這樣的雙核叫做物理內(nèi)核。
硬件線程又叫做邏輯內(nèi)核,我們可以在”任務(wù)管理器“中查看”性能“標(biāo)簽頁,如下圖,我們知道有2個硬件線程。
一般情況下,一個物理內(nèi)核對應(yīng)一個邏輯內(nèi)核,比如我這里的2對2。當(dāng)然如果你的cpu采用的是超線程技術(shù),那么可能就會有4個物理內(nèi)核對應(yīng)
8個硬件線程,現(xiàn)在有很多服務(wù)器都有8個硬件線程,上午在公司的服務(wù)器上截了個圖。
我們要知道并行開發(fā)要做的事情就是將任務(wù)分?jǐn)偨o這些硬件線程去并行執(zhí)行來達到負載和加速。
2. 軟件線程
相信這個大家最熟悉了,我們知道傳統(tǒng)的代碼都是串行的,就一個主線程,當(dāng)我們?yōu)榱藢崿F(xiàn)加速而開了很多工作線程,這些工作線程
也就是軟件線程。
好,我們知道了基本概念就ok了,在.net 4.0中,微軟給我們提供了一個新的命名空間:System.Threading.Tasks。這里面有很多好玩
的東西,作為第一篇就介紹下最基礎(chǔ),最簡單的Parallel的使用。
一: Parallel的使用
在Parallel下面有三個常用的方法invoke,for和forEach。
1: Parallel.Invoke
這是最簡單,最簡潔的將串行的代碼并行化。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var watch = Stopwatch.StartNew(); 6 7 watch.Start(); 8 9 Run1(); 10 11 Run2(); 12 13 Console.WriteLine("我是串行開發(fā),總共耗時:{0}\n", watch.ElapsedMilliseconds); 14 15 watch.Restart(); 16 17 Parallel.Invoke(Run1, Run2); 18 19 watch.Stop(); 20 21 Console.WriteLine("我是并行開發(fā),總共耗時:{0}", watch.ElapsedMilliseconds); 22 23 Console.Read(); 24 } 25 26 static void Run1() 27 { 28 Console.WriteLine("我是任務(wù)一,我跑了3s"); 29 Thread.Sleep(3000); 30 } 31 32 static void Run2() 33 { 34 Console.WriteLine("我是任務(wù)二,我跑了5s"); 35 Thread.Sleep(5000); 36 } 37 }
在這個例子中可以獲取二點信息:
第一:一個任務(wù)是可以分解成多個任務(wù),采用分而治之的思想。
第二:盡可能的避免子任務(wù)之間的依賴性,因為子任務(wù)是并行執(zhí)行,所以就沒有誰一定在前,誰一定在后的規(guī)定了。
2:Parallel.for
我們知道串行代碼中也有一個for,但是那個for并沒有用到多核,而Paraller.for它會在底層根據(jù)硬件線程的運行狀況來充分的使用所有的可
利用的硬件線程,注意這里的Parallel.for的步行是1。
這里我們來演示一下,向一個線程安全的集合插入數(shù)據(jù),當(dāng)然這個集合采用原子性來實現(xiàn)線程同步,比那些重量級的鎖機制更加的節(jié)省消耗。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 for (int j = 1; j < 4; j++) 6 { 7 Console.WriteLine("\n第{0}次比較", j); 8 9 ConcurrentBag<int> bag = new ConcurrentBag<int>(); 10 11 var watch = Stopwatch.StartNew(); 12 13 watch.Start(); 14 15 for (int i = 0; i < 20000000; i++) 16 { 17 bag.Add(i); 18 } 19 20 Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds); 21 22 GC.Collect(); 23 24 bag = new ConcurrentBag<int>(); 25 26 watch = Stopwatch.StartNew(); 27 28 watch.Start(); 29 30 Parallel.For(0, 20000000, i => 31 { 32 bag.Add(i); 33 }); 34 35 Console.WriteLine("并行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds); 36 37 GC.Collect(); 38 39 } 40 } 41 }
可以看的出,加速的效果還是比較明顯的。
3:Parallel.forEach forEach的獨到之處就是可以將數(shù)據(jù)進行分區(qū),每一個小區(qū)內(nèi)實現(xiàn)串行計算,分區(qū)采用Partitioner.Create實現(xiàn)。
class Program { static void Main(string[] args) { for (int j = 1; j < 4; j++) { Console.WriteLine("\n第{0}次比較", j);
ConcurrentBag<int> bag = new ConcurrentBag<int>();
var watch = Stopwatch.StartNew();
watch.Start();
for (int i = 0; i < 3000000; i++) { bag.Add(i); }
Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
bag = new ConcurrentBag<int>();
watch = Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i => { for (int m = i.Item1; m < i.Item2; m++) { bag.Add(m); } });
Console.WriteLine("并行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
} } }
這里還是要說一下:Partitioner.Create(0, 3000000)。
第一:我們要分區(qū)的范圍是0-3000000。
第二:我們肯定想知道系統(tǒng)給我們分了幾個區(qū)? 很遺憾,這是系統(tǒng)內(nèi)部協(xié)調(diào)的,無權(quán)告訴我們,當(dāng)然系統(tǒng)也不反對我們自己指定分區(qū)個數(shù),
這里可以使用Partitioner.Create的第六個重載,比如這樣:Partitioner.Create(0, 3000000, Environment.ProcessorCount),
因為 Environment.ProcessorCount能夠獲取到當(dāng)前的硬件線程數(shù),所以這里也就開了2個區(qū)。
下面分享下并行計算中我們可能有的疑惑?
<1> 如何中途退出并行循環(huán)?
是的,在串行代碼中我們break一下就搞定了,但是并行就不是這么簡單了,不過沒關(guān)系,在并行循環(huán)的委托參數(shù)中提供了一個
ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現(xiàn)。
Break: 當(dāng)然這個是通知并行計算盡快的退出循環(huán),比如并行計算正在迭代100,那么break后程序還會迭代所有小于100的。
Stop:這個就不一樣了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。
下面舉個例子,當(dāng)?shù)?000的時候退出循環(huán)
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var watch = Stopwatch.StartNew(); 6 7 watch.Start(); 8 9 ConcurrentBag<int> bag = new ConcurrentBag<int>(); 10 11 Parallel.For(0, 20000000, (i, state) => 12 { 13 if (bag.Count == 1000) 14 { 15 state.Break(); 16 return; 17 } 18 bag.Add(i); 19 }); 20 21 Console.WriteLine("當(dāng)前集合有{0}個元素。", bag.Count); 22 23 } 24 }
<2> 并行計算中拋出異常怎么處理?
首先任務(wù)是并行計算的,處理過程中可能會產(chǎn)生n多的異常,那么如何來獲取到這些異常呢?普通的Exception并不能獲取到異常,然而為并行誕生的AggregateExcepation就可以獲取到一組異常。
class Program { static void Main(string[] args) { try { Parallel.Invoke(Run1, Run2); } catch (AggregateException ex) { foreach (var single in ex.InnerExceptions) { Console.WriteLine(single.Message); } }
Console.Read(); }
static void Run1() { Thread.Sleep(3000); throw new Exception("我是任務(wù)1拋出的異常"); }
static void Run2() { Thread.Sleep(5000);
throw new Exception("我是任務(wù)2拋出的異常"); } }
<3> 并行計算中我可以留一個硬件線程出來嗎?
默認(rèn)的情況下,底層機制會盡可能多的使用硬件線程,然而我們使用手動指定的好處是我們可以在2,4,8個硬件線程的情況下來進行測量加速比。
class Program { static void Main(string[] args) { var bag = new ConcurrentBag<int>();
ParallelOptions options = new ParallelOptions();
//指定使用的硬件線程數(shù)為1 options.MaxDegreeOfParallelism = 1;
Parallel.For(0, 300000, options, i => { bag.Add(i); });
Console.WriteLine("并行計算:集合有:{0}", bag.Count);
} }
|