免費注冊 查看新帖 |

Chinaunix

  平臺 論壇 博客 文庫
最近訪問板塊 發新帖
查看: 35809 | 回復: 2
打印 上一主題 下一主題

關于perl6并發的問題 [復制鏈接]

論壇徽章:
0
跳轉到指定樓層
1 [收藏(0)] [報告]
發表于 2020-05-29 13:24 |只看該作者 |倒序瀏覽
本帖最后由 aef25u 于 2020-05-30 13:53 編輯

簡介:
1、開啟各15個線程的1個生產者與2個消費者A、B(AB同為耗時任務)
2、生產者按條件通過2個Channel向A與B分別發送數據(數據不需按原始順序發送;按業務邏輯會發送不同數據,暫時按發送同一數據模擬)
3、數組@share為共享數據,消費者A、B會向其讀取數據(不需寫入或改變元素)
4、消費者A處理接收的數據,不符合業務邏輯的數據會繼續傳入B進行處理  if $v==999 {$supplierB.emit($v);}
問題:
1、這樣組織代碼有沒有不合理的地方?
2、共享的@share數組安不安全?
3、假如在消費者A、B中使用以下cached 函數,在各自不同線程下能起作用不?

     use experimental :cached;
     sub fun-name( $val1, $val2) is cached {...}


  1. my $TIME = now;
  2. my $supplierA = Supplier.new;
  3. my $channelA = $supplierA.Supply.Channel;
  4. my $supplierB = Supplier.new;
  5. my $channelB = $supplierB.Supply.Channel;
  6. my @share=("C","D");
  7. my $threads=15;
  8. my (@pA,@pB);
  9. for 1 .. $threads {

  10.   @pA.push: start {
  11.       react  {
  12.             whenever $channelA -> $v {
  13.               if $v==999 {$supplierB.emit(1000);}#向消費者B發送數據,真實情況是發送$v
  14.               say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";

  15.           }
  16.      }
  17.   }
  18.   @pB.push: start {
  19.       react  {
  20.             whenever $channelB -> $v {
  21.               sleep 0.5;#真實代碼不需這一行,模擬單個B任務比A任務耗時
  22.              say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
  23.           }
  24.      }
  25.   }
  26. }

  27. my @promises;
  28. for ^1000 -> $r {
  29.     push @promises, start {
  30.           #sleep $r*0.001;
  31.           sleep rand;
  32.           $supplierA.emit($r);
  33.           $supplierB.emit($r);
  34.     };
  35.     if @promises == 15 {
  36.          await Promise.anyof(@promises);
  37.          @promises .= grep({ !$_ });
  38.     }
  39. }

  40. await @promises;

  41. $supplierA.done;
  42. await @pA;
  43. $supplierB.done;
  44. await @pB;
  45. $TIME = now - $TIME;
  46. say $TIME;
復制代碼

自已發現了個問題,要保證A處理完不符合業務邏輯的數據能傳到B不能這樣寫:
  1. $supplierA.done;
  2. $supplierB.done;
  3. await @pA,@pB;
復制代碼






論壇徽章:
0
2 [報告]
發表于 2020-05-30 13:50 |只看該作者
自已發現個問題,要保證A處理完不符業務邏輯的數據能傳到B,不能這書寫
  1. $supplierA.done;
  2. $supplierB.done;
  3. await @pA,@pB;
復制代碼

而要這樣寫
  1. $supplierA.done;
  2. await @pA;
  3. $supplierB.done;
  4. await @pB;
復制代碼
第一種寫法可能出現A還沒處理完$channelB就被關閉了。


論壇徽章:
0
3 [報告]
發表于 2020-06-02 13:33 |只看該作者
本帖最后由 aef25u 于 2020-06-02 16:49 編輯

通過自已上網搜索與親自測試試驗結果,初步可以回答之前提的問題,如果有不正確的地方,希望大神們指正。
1、這樣組織代碼有沒有不合理的地方?
     這樣組織可以滿足我的任務,但之前認為@pA與@pB內不需要sleep是錯誤的。
     供應者需 sleep 0;消費者@pA與@pB分別為 sleep 0.2 sleep 0.1,這樣有利于各個線程跑的任務相對均衡(因我使用的是Windows操作系統,CPU競爭策略屬于搶占式)。當然 @pA設成與@pB一樣也沒問題,但 @pAsleep時間不能太大,否則可能出現B先結束,而@pA仍向關閉的通道B發送數據,但發送不了,任務永遠不會結束。


2、共享的@share數組安不安全?
    共享的資源@share的讀寫均是安全的。

3、假如在消費者A、B中使用以下cached 函數,在各自不同線程下能起作用不?
     use experimental :cached;
     sub fun-name( $val1, $val2) is cached {...}
    這個對我的任務作用不大,而我的任務要求并發數大(在我的任務中我使用了100個并發線程),而分配到的各個線程的任務數小,再加上任務是隨機分到各線程的,需使用cached的值的機會很小

最后,給出更完善的代碼:

  1. my $TIME = now;
  2. my $supplierA = Supplier.new;
  3. my $channelA = $supplierA.Supply.Channel;
  4. my $supplierB = Supplier.new;
  5. my $channelB = $supplierB.Supply.Channel;
  6. my @share=("C","D");
  7. my $threads=15;
  8. my (@pA,@pB);
  9. for 1 .. $threads {

  10.   @pA.push: start {
  11.     my @aFir;
  12.     react  {
  13.         whenever $channelA -> $v {
  14.           sleep 0.2;
  15.           if $v % 2 {$supplierB.emit(1000);}
  16.           say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
  17.           #my $h={"Val"=>$v};
  18.           @aFir.push: $v*2;
  19.         }
  20.     }
  21.     @aFir;
  22.   }
  23.   @pB.push: start {

  24.     my @aSec;
  25.     react  {
  26.         whenever $channelB -> $v {
  27.           sleep 0.1;
  28.           say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
  29.           #my $h={"Val"=>$v};
  30.           @aSec.push:  $v*2;
  31.         }
  32.     }
  33.     @aSec;
  34.   }
  35. }
  36. sleep 1;
  37. my @promises;
  38. for ^1000 -> $r {
  39.   push @promises, start {
  40.         sleep 0;

  41.         $supplierA.emit($r);
  42.         $supplierB.emit($r);
  43.     };
  44.     if @promises == $threads {
  45.        await Promise.anyof(@promises);
  46.        @promises .= grep({ !$_ });
  47.     }
  48. }

  49. await @promises;

  50. $supplierA.done;
  51. my @retAdrgFIR=await @pA;
  52. $supplierB.done;
  53. my @retAdrgSEC=await @pB;

  54. #@retAdrgFIR .=grep({ $_});
  55. #@retAdrgSEC .=grep({ $_});
  56. dd @retAdrgFIR;
  57. dd @retAdrgSEC;

  58. #查看各線程最終分配執行的任務數
  59. my @nFIR=@retAdrgFIR.map( -> $n {$n.List.elems});
  60. my @nSEC=@retAdrgSEC.map( -> $n {$n.List.elems});
  61. dd @nFIR;
  62. dd @nSEC;

  63. $TIME = now - $TIME;
  64. say $TIME;
復制代碼




您需要登錄后才可以回帖 登錄 | 注冊

本版積分規則 發表回復

  

北京盛拓優訊信息技術有限公司. 版權所有 京ICP備16024965號-6 北京市公安局海淀分局網監中心備案編號:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年舉報專區
中國互聯網協會會員  聯系我們:huangweiwei@itpub.net
感謝所有關心和支持過ChinaUnix的朋友們 轉載本站內容請注明原作者名及出處

清除 Cookies - ChinaUnix - Archiver - WAP - TOP
   日韩综合区视频第一页导航,无码JK粉嫩小泬在线观看,午夜精品A片一区二区三区,日日躁夜夜躁狠狠躁麻豆,大胆国模,免费观看无遮挡www的网站