Журнал LinuxFormat - перейти на главную

LXF167:Вникать в Erlang

Материал из Linuxformat
Перейти к: навигация, поиск

Erlang Опи­сы­ва­ет­ся сле­дую­щей фор­му­лой: функ­цио­наль­ный язык + про­цес­сы

Erlang: Прак­ти­ка мно­го­за­дач­но­сти

Ан­д­рей Уша­ков в третий раз приступает к упражнениям с функциями. [[Файл: |100px|left|thumb|Наш эксперт Ан­д­рей Уша­ков ак­тив­но при­бли­жа­ет тот день, ко­гда функ­цио­наль­ные язы­ки ста­нут мейн­ст­ри­мом. ]] В про­шлый раз мы соз­да­ли мно­го­за­дач­ные вер­сии функ­ций map и reduce, реа­ли­зую­щие пор­ци­он­ную об­ра­бот­ку ис­ход­ных дан­ных. Се­го­дня мы усложним наш при­мер и из­ба­вим­ся от глав­но­го недостат­ка пре­ды­ду­щей реа­ли­за­ции.

Припомним, на чем мы тогда оста­но­ви­лись. Мы вы­де­ли­ли об­щую часть из функ­ций parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4 в ви­де функ­ции parallel_portion_helper:portion_core/4 и вспо­мо­га­тель­ной функ­ции portion_worker/4, оп­ре­де­лен­ной в мо­ду­ле parallel_portion_helper, но не экс­пор­ти­руе­мой из него. Функ­ция parallel_portion_helper:portion_core/4 вы­пол­ня­ет всю основ­ную ра­бо­ту: раз­би­ва­ет дан­ные на пор­ции, соз­да­ет ра­бо­чие про­цес­сы и раз­да­ет им за­дания, со­би­ра­ет ре­зуль­та­ты трудов ра­бо­чих про­цес­сов и объ­е­ди­ня­ет их в ито­го­вый ре­зуль­тат. Что­бы эту функ­цию мож­но бы­ло применять для по­строения лю­бых функ­ций, спо­соб­ных па­рал­лель­но об­ра­ба­ты­вать спи­ски дан­ных, нужно па­ра­мет­ри­зо­вать ее. Для это­го на­до за­дать две функ­ции: функ­цию WorkerFun для об­ра­бот­ки пор­ций дан­ных в ра­бо­чих про­цес­сах и функ­цию FinalAggrFun для объ­е­динения об­ра­бо­тан­ных дан­ных в ито­го­вый ре­зуль­тат. По­ми­мо па­ра­мет­ров WorkerFun и FinalAggrFun, мы долж­ны так­же пе­ре­дать в функ­цию parallel_portion_helper:portion_core/4 ис­ход­ный спи­сок с дан­ны­ми SourceList и раз­мер пор­ции PortionSize, об­ра­ба­ты­вае­мой одним ра­бо­чим про­цес­сом.

portion_core(WorkerFun, FinalAggrFun, SourceList, PortionSize) ->

process_flag(trap_exit, true),

MasterPid = self(),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

lists:foreach(fun({Index, Portion}) -> spawn_link(fun() -> portion_worker(WorkerFun, Portion, Index, MasterPid) end) end, PreparedData),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

FinalAggrFun(array:to_list(FullStorage)).

Функ­ция portion_worker/4, оп­ре­де­лен­ная в мо­ду­ле parallel_portion_helper, яв­ля­ет­ся те­лом ка­ж­до­го ра­бо­че­го про­цес­са, т. е. функ­ци­ей, ко­то­рую ка­ж­дый ра­бо­чий про­цесс вы­пол­ня­ет во вре­мя сво­ей жизни. Так как на дан­ный мо­мент мы ис­поль­зу­ем мо­дель «од­но­ра­зо­вых про­цес­сов», то функ­ция portion_worker/4 ра­бо­та­ет по очень про­стой схе­ме: об­ра­бо­тать пор­цию ис­ход­ных дан­ных и ото­слать ре­зуль­тат об­ра­бот­ки об­рат­но глав­но­му про­цес­су. Ес­те­ст­вен­но, что функ­ция portion_worker/4 долж­на быть па­ра­мет­ри­зо­ва­на функ­ци­ей Fun для об­ра­бот­ки пор­ции ис­ход­ных дан­ных.

portion_worker(Fun, SourcePortion, Index, MasterPid) ->

DestPortion = Fun(SourcePortion),

MasterPid ! {result, Index, DestPortion}.

Па­рал­лель­ные вер­сии функ­ций map и reduce, соз­дан­ные с ис­поль­зо­ванием функ­ции parallel_portion_helper:portion_core/4 (и вспо­мо­га­тель­ной функ­ции portion_worker/4), вы­гля­дят так:

portion_gen_pmap(_Fun, [], _PortionSize) -> [];

portion_gen_pmap(Fun, SourceList, PortionSize)

when length(SourceList) =< PortionSize ->

lists:map(Fun, SourceList);

portion_gen_pmap(Fun, SourceList, PortionSize) ->

WorkerFun = fun(SourcePortion) -> lists:map(Fun, SourcePortion) end,

parallel_portion_helper:portion_core(WorkerFun, fun lists:append/1, SourceList, PortionSize).

portion_gen_reduce(_Fun, [], {InitValue, _PortionInitValue}, _PortionSize) -> InitValue;

portion_gen_reduce(Fun, SourceList, {InitValue, _PortionInitValue}, PortionSize)

when length(SourceList) =< PortionSize ->

lists:foldl(Fun, InitValue, SourceList);

portion_gen_reduce(Fun, SourceList, {InitValue, PortionInitValue}, PortionSize) ->

ReduceFun = fun(List) -> lists:foldl(Fun, InitValue, List) end,

PortionReduceFun = fun(List) -> lists:foldl(Fun, PortionInitValue, List) end,

parallel_portion_helper:portion_core(PortionReduceFun, ReduceFun, SourceList, PortionSize).

Вид­но, что функ­ции parallel_map:portion_gen_pmap/3 и parallel_reduce: portion_gen_reduce/4 по­лу­ча­ют­ся доста­точ­но про­сты­ми. Они мог­ли бы быть еще про­ще, ес­ли бы мы мог­ли вынести об­ра­бот­ку не па­рал­лель­ных ва­ри­ан­тов в функ­цию parallel_portion_helper:portion_core/4; но как мы по­ка­за­ли в про­шлый раз, сде­лать это невоз­мож­но.

Внима­тель­ные чи­та­те­ли здесь мо­гут вспомнить: мы го­во­ри­ли, что соз­дание функ­ции parallel_portion_helper:portion_core/4 по­зво­лит из­ба­вить­ся от дуб­ли­ро­вания ко­да, а так­же по­зво­лит лег­ко реа­ли­зо­вы­вать мно­го­за­дач­ные вер­сии дру­гих функ­ций. И ес­ли факт из­бав­ления от дуб­ли­ро­вания ко­да мы ви­де­ли на при­ме­ре функ­ций parallel_map:portion_gen_pmap/3 и parallel_reduce: portion_gen_reduce/4, то о воз­мож­но­сти лег­ко реа­ли­зо­вать мно­го­за­дач­ную вер­сию ка­кой-ли­бо дру­гой функ­ции мы в про­шлый раз толь­ко го­во­ри­ли (в свя­зи с ог­раничением на раз­мер ста­тьи). Да­вай­те по­ка­жем, что та­кая воз­мож­ность де­йствительно есть; и в ка­че­­ст­ве та­кой де­мон­ст­ра­ции соз­да­дим мно­го­за­дач­ную вер­сию для опе­ра­ции фильт­ра­ции (для функ­ции filter). Опе­ра­ция фильт­ра­ции для на­бо­ра ис­ход­ных эле­мен­тов (в на­шем слу­чае, для спи­ска) воз­вра­ща­ет толь­ко эле­мен­ты, удов­ле­тво­ря­ющие неко­то­ро­му усло­вию, за­дан­но­му функ­ци­ей-пре­ди­ка­том. В мно­го­за­дач­ной вер­сии опе­ра­ции фильт­ра­ции мы долж­ны раз­бить ис­ход­ный спи­сок на пор­ции, к ка­ж­дой пор­ции при­менить опе­ра­цию фильт­ра­ции (имен­но это дей­ст­вие мы мо­жем вы­пол­нять па­рал­лель­но), по­сле че­го по­лу­чен­ные ре­зуль­та­ты объ­е­динить в ито­го­вый.

При этом сле­ду­ет учесть два мо­мен­та. Во-пер­вых, ре­зуль­та­том опе­ра­ции фильт­ра­ции яв­ля­ет­ся спи­сок та­ко­го же или мень­ше­го раз­ме­ра (в ка­че­­ст­ве ре­зуль­та­та мы мо­жем по­лу­чить и пустой спи­сок). Во-вто­рых, опе­ра­ция фильт­ра­ции долж­на сохранять по­ря­док от­фильт­ро­ван­ных эле­мен­тов от­но­си­тель­но друг дру­га, так как мы фильт­ру­ем эле­мен­ты из упорядоченного спи­ска. С уче­том ска­зан­но­го вы­ше, для реа­ли­за­ции опе­ра­ции фильт­ра­ции с по­мо­щью функ­ции parallel_portion_helper:portion_core/4 па­ра­метр WorkerFun дол­жен быть функ­ци­ей, соз­дан­ной на осно­ве функ­ции lists:filter/2 и функ­ции-пре­ди­ка­та, а па­ра­метр FinalAggrFun дол­жен быть функ­ци­ей lists:append/1. Ко­му-то мо­жет по­ка­зать­ся неоче­вид­ным вы­бор функ­ции lists:append/1 в ка­че­­ст­ве па­ра­мет­ра FinalAggrFun. Од­на­ко при­чи­ны та­ко­го вы­бо­ра ана­ло­гич­ны та­ко­вым для мно­го­за­дач­ной реа­ли­за­ции функ­ции map (см. LXF165/166).

Пе­рей­дем к реа­ли­за­ции мно­го­за­дач­ной вер­сии опе­ра­ции фильт­ра­ции. Это бу­дет функ­ция parallel_filter:portion_gen_filter/3, экс­пор­ти­руе­мая из мо­ду­ля parallel_filter:

portion_gen_filter(_Fun, [], _PortionSize) -> [];

portion_gen_filter(Fun, SourceList, PortionSize)

when length(SourceList) =< PortionSize ->

lists:filter(Fun, SourceList);

portion_gen_filter(Fun, SourceList, PortionSize) ->

WorkerFun = fun(SourcePortion) -> lists:filter(Fun, SourcePortion) end,

parallel_portion_helper:portion_core(WorkerFun, fun lists:append/1, SourceList, PortionSize).

Как и в слу­чае функ­ций parallel_map:portion_gen_pmap/3 и parallel_reduce: portion_gen_reduce/4, те­ло функ­ции parallel_filter:portion_gen_filter/3 со­сто­ит из трех ва­ри­ан­тов. Пер­вый ва­ри­ант об­ра­ба­ты­ва­ет си­туа­цию пусто­го спи­ска ис­ход­ных дан­ных. Вто­рой об­ра­ба­ты­ва­ет си­туа­цию, когда раз­мер ис­ход­ных дан­ных не пре­вы­ша­ет раз­ме­ра пор­ции, на ко­то­рые раз­би­ва­ют­ся ис­ход­ные дан­ные. И, на­конец, тре­тий ва­ри­ант яв­ля­ет­ся об­щим ва­ри­ан­том и об­ра­ба­ты­ва­ет все осталь­ные си­туа­ции.

Мы реа­ли­зо­ва­ли мно­го­за­дач­ную вер­сию опе­ра­ции фильт­ра­ции (функ­цию parallel_filter:portion_gen_filter/3); те­перь по­ра про­ве­рить, что на­ша реа­ли­за­ция ра­бо­та­ет пра­виль­но. Как уже го­во­ри­лось вы­ше, функ­ция parallel_filter:portion_gen_filter/3 со­дер­жит три ва­ри­ан­та; их мы и долж­ны про­ве­рить. Для это­го ском­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и за­пустим кон­соль сре­ды вы­полнения язы­ка Erlang.

Про­ве­рим сна­ча­ла пер­вый ва­ри­ант: вы­зов

parallel_filter:portion_gen_filter(fun(Item) -> length(Item) > 2 end, [], 4)

воз­вра­ща­ет пус­той спи­сок, как и ожи­да­ет­ся. Те­перь про­ве­рим вто­рой ва­ри­ант: вы­зов

parallel_filter:portion_gen_filter(fun(Item) -> length(Item) > 2 end, [«a», «bbb», «cc»], 4)

воз­вра­ща­ет спи­сок строк [«bbb”], дли­на ко­то­рых боль­ше 2. При этом раз­мер ис­ход­но­го спи­ска мень­ше раз­ме­ра пор­ции. Это оз­на­ча­ет, что для вы­полнения бу­дет вы­бран вто­рой ва­ри­ант функ­ции parallel_filter:portion_gen_filter/3. И, на­конец, про­ве­рим тре­тий ва­ри­ант: вы­зов

parallel_filter:portion_gen_filter(fun(Item) -> length(Item) > 2 end, [«a», «bbb», «cc», «dd», «eee»], 2)

воз­вра­ща­ет спи­сок строк [«bbb”, “eee”], дли­на ко­то­рых боль­ше 2. Так как раз­мер спи­ска с ис­ход­ны­ми дан­ны­ми 5, а раз­мер пор­ции 2, то для вы­полнения вы­зо­ва бу­дет соз­да­но 3 ра­бо­чих про­цес­са.

А те­перь да­вай­те внима­тель­но рас­смот­рим на­ше ре­шение. Оно име­ет сле­дую­щий вид: мы раз­би­ва­ем ис­хо­дя­щие дан­ные на пор­ции фик­си­ро­ван­но­го раз­ме­ра, для об­ра­бот­ки ка­ж­дой пор­ции соз­да­ем свой про­цесс, об­ра­ба­ты­ва­ем все пор­ции дан­ных мно­го­за­дач­ным об­ра­зом, со­би­ра­ем ре­зуль­та­ты об­ра­бот­ки от всех про­цес­сов и объ­е­ди­ня­ем все со­б­ран­ные дан­ные в ито­го­вый ре­зуль­тат.

Лег­ко уви­деть глав­ный недоста­ток на­ше­го ре­шения: мы раз­би­ва­ем ис­ход­ные дан­ные на пор­ции фик­си­ро­ван­но­го раз­ме­ра и для об­ра­бот­ки ка­ж­дой пор­ции соз­да­ем соб­ст­вен­ный про­цесс (мо­дель «од­но­ра­зо­вых про­цес­сов»). Оче­вид­но, что при та­ком под­хо­де ко­ли­че­­ст­во соз­дан­ных про­цес­сов бу­дет про­пор­цио­наль­но раз­ме­ру ис­ход­ных дан­ных.

Хо­тя основ­ная идео­ло­гия язы­ка Erlang под­ра­зу­ме­ва­ет, что мы мо­жем соз­да­вать ров­но столь­ко про­цес­сов, сколь­ко нуж­но для ре­шения за­да­чи, все же мак­си­маль­ное ко­ли­че­­ст­во про­цес­сов, ко­то­рое мо­жет быть од­но­вре­мен­но соз­да­но в од­ном эк­зем­п­ля­ре сре­ды вы­полнения Erlang (на од­ном уз­ле), ог­раниче­но. По умол­чанию – не более 32768 про­цес­сов. Но ес­ли мы соз­да­ем эк­зем­п­ляр сре­ды вы­полнения Erlang (узел) с фла­гом +P, то мы мо­жем од­но­вре­мен­но соз­дать до 134217727 про­цес­сов (в за­ви­си­мо­сти от зна­чения па­ра­мет­ра, пе­ре­дан­но­го с фла­гом +P). Это доста­точ­но боль­шое, но все же конеч­ное чис­ло, и вполне воз­мож­на си­туа­ция, когда мы не смо­жем соз­дать оче­ред­ной про­цесс в эк­зем­п­ля­ре сре­ды вы­полнения Erlang (как ра­бо­чий про­цесс од­ной из на­ших реа­ли­за­ций, так и ка­кой-ли­бо сто­ронний про­цесс).

Ре­шение этой про­бле­мы доста­точ­но оче­вид­но: нуж­но от­ка­зать­ся от мо­де­ли «од­но­ра­зо­вых» про­цес­сов (т. е. от соз­дания соб­ст­вен­но­го про­цес­са для об­ра­бот­ки ка­ж­дой пор­ции дан­ных) и ис­поль­зо­вать «мно­го­ра­зо­вые» про­цес­сы. Дру­ги­ми сло­ва­ми, мы долж­ны ог­раничить чис­ло соз­да­вае­мых ра­бо­чих про­цес­сов неко­то­рым по­сто­ян­ным зна­чением, не за­ви­ся­щим от раз­ме­ра ис­ход­ных дан­ных. Это оз­на­ча­ет, что мы долж­ны соз­дать пре­до­пре­де­лен­ный на­бор про­цес­сов до на­ча­ла об­ра­бот­ки ис­ход­ных дан­ных, раз­дать под­го­тов­лен­ные для об­ра­бот­ки пор­ции дан­ных этим про­цес­сам и со­брать ре­зуль­та­ты об­ра­бот­ки с этих про­цес­сов.

В от­ли­чие от си­туа­ции «од­но­ра­зо­вых» ра­бо­чих про­цес­сов, взаи­мо­дей­ст­вие ме­ж­ду пре­до­пре­де­лен­ны­ми ра­бо­чи­ми про­цес­са­ми и глав­ным про­цес­сом долж­но быть бо­лее слож­ным. Дей­ст­ви­тель­но, мы долж­ны дать за­дание ра­бо­че­му про­цес­су на об­ра­бот­ку оче­ред­ной пор­ции, по­сле че­го ожи­дать от него ре­зуль­та­та ра­бо­ты, дать за­дание на об­ра­бот­ку дру­гой пор­ции... и так, по­ка все пор­ции дан­ных не бу­дут об­ра­бо­та­ны. Но мож­но несколь­ко уп­ро­стить это взаи­мо­дей­ст­вие: по­сле раз­биения ис­ход­ных дан­ных на пор­ции мы мо­жем сра­зу раз­дать все за­дания всем ра­бо­чим про­цес­сам, по­сле че­го оста­ет­ся толь­ко со­брать ре­зуль­та­ты их ра­бо­ты. Та­кой под­ход име­ет свои ми­ну­сы (о ко­то­рых мы по­го­во­рим далее), но по­зво­ля­ет уп­ро­стить пе­ре­ход на мо­дель «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов. На дан­ном эта­пе для по­строения оче­ред­ных мно­го­за­дач­ных вер­сий этих функ­ций мы вы­би­ра­ем имен­но этот под­ход. Пре­иму­ще­ст­во его в том, что для сбо­ра ре­зуль­та­тов ра­бо­ты ра­бо­чих про­цес­сов мы мо­жем ис­поль­зо­вать ста­рый сбор­щик ре­зуль­та­тов (реа­ли­зо­ван­ный в функ­ции parallel_common:collect_result/2). Дей­ст­ви­тель­но, не все ли равно, ка­кие про­цес­сы и сколь­ко раз («од­но­ра­зо­вые» один раз или «мно­го­ра­зо­вые» несколь­ко раз) бу­дут нам при­сы­лать ре­зуль­та­ты сво­ей ра­бо­ты, ес­ли нам никак не на­до от­ве­чать на эти со­об­щения.

При­сту­пим к реа­ли­за­ции на­шей оче­ред­ной мно­го­за­дач­ной вер­сии функ­ций map и reduce с уче­том все­го ска­зан­но­го вы­ше. Сле­ду­ет так­же на­помнить о важ­ной до­го­во­рен­но­сти, вве­ден­ной на про­шлом уро­ке: мы до­го­во­ри­лись, что бу­дем сна­ча­ла пи­сать на­бор об­щих ме­то­дов, на осно­ве ко­то­рых стро­ить кон­крет­ные реа­ли­за­ции для функ­ций map и reduce. При­мер та­ко­го под­хо­да мы уже ви­де­ли, реа­ли­зуя ме­то­ды parallel_map:portion_gen_pmap/3 и parallel_reduce: portion_gen_reduce/4 на осно­ве функ­ции parallel_portion_helper:portion_core/4, со­дер­жа­щей об­щую функ­цио­наль­ность. В на­шей реа­ли­за­ции мы по­сту­пим точ­но так же: всю об­щую функ­цио­наль­ность мы бу­дем оп­ре­де­лять (и экс­пор­ти­ро­вать в слу­чае необ­хо­ди­мо­сти) в мо­ду­ле parallel_limited_helper.

Начнем с реа­ли­за­ции функ­ции parallel_limited_helper:limited_worker/1, ко­то­рую бу­дут вы­пол­нять ра­бо­чие про­цес­сы. На­ши ра­бо­чие про­цес­сы яв­ля­ют­ся «мно­го­ра­зо­вы­ми»; это оз­на­ча­ет, что мы в цик­ле (в ре­кур­сив­но-хво­сто­вом вы­зо­ве этой же функ­ции) долж­ны по­лу­чать все со­об­щения, ко­то­рые при­хо­дят в ра­бо­чий про­цесс и об­ра­ба­ты­вать их. Со­об­щения ви­да {task_request, MasterPid, Index, SourcePortion}, где MasterPid – иден­ти­фи­ка­тор глав­но­го про­цес­са, Index – но­мер об­ра­ба­ты­вае­мой пор­ции, Source­Portion – пор­ция ис­ход­ных дан­ных, иниции­ру­ют об­ра­бот­ку пор­ции (и по­сле­дую­щий возврат ре­зуль­та­тов об­ра­бот­ки глав­но­му про­цес­су); все осталь­ные со­об­щения мы вы­бра­сы­ва­ем. Итак, с уче­том все­го ска­зан­но­го, функ­ция parallel_limited_helper:limited_worker/1 име­ет сле­дую­щий вид:

limited_worker(Fun) ->

receive

{task_request, MasterPid, Index, SourcePortion} ->

Dest = Fun(SourcePortion),

MasterPid ! {result, Index, Dest},

limited_worker(Fun);

_Other -> limited_worker(Fun)

end.

Здесь па­ра­метр Fun – это функ­ция для об­ра­бот­ки пор­ции ис­ход­ных дан­ных; ее за­да­ют при соз­дании ра­бо­чих по­то­ков. Сле­ду­ет так­же ска­зать, что мы экс­пор­ти­ру­ем функ­цию parallel_limited_helper:limited_worker/1 из мо­ду­ля parallel_limited_helper. Это от­ли­ча­ет­ся от прошлой реа­ли­за­ции функ­ций map и reduce, ко­гда мы не экс­пор­ти­ро­ва­ли функ­цию, вы­полняемую ра­бо­чи­ми про­цес­са­ми. Свя­за­но это с тем, что в мо­де­ли «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов от­вет­ст­вен­ность за соз­дание этих про­цес­сов мы вы­но­сим на­ру­жу (вско­ре мы уви­дим, в чем плюс та­ко­го ре­шения).

Сле­дую­щая функ­ция – та, что раз­да­ет за­дания на­шим «мно­го­ра­зо­вым» ра­бо­чим про­цес­сам. Когда мы ис­поль­зо­ва­ли мо­дель «од­но­ра­зо­вых» про­цес­сов, соз­дание этих про­цес­сов и раз­да­ча им за­даний реа­ли­зо­вы­ва­лась крайне про­сто: про­хо­дом по спи­ску пор­ций ис­ход­ных дан­ных с по­мо­щью функ­ции lists:foreach/2. Для «мно­го­ра­зо­вых» про­цес­сов все несколь­ко сложнее: ка­ж­дый та­кой про­цесс по­лу­ча­ет несколь­ко за­даний, и рас­пре­де­лить за­дания ме­ж­ду ними необ­хо­ди­мо бо­лее-менее рав­но­мер­но. Итак, на­ша за­да­ча за­клю­ча­ет­ся в рав­но­мер­ном рас­пре­де­лении N за­даний по M про­цес­сам. Од­но из воз­мож­ных ре­шений этой за­да­чи вы­гля­дит сле­дую­щим об­ра­зом: мы од­но­вре­мен­но про­хо­дим как по спи­ску за­даний, так и по спи­ску про­цес­сов, на­зна­чая те­ку­щее за­дание те­ку­ще­му про­цес­су. Ес­ли в ка­кой-то мо­мент вре­мени мы дош­ли до кон­ца спи­ска про­цес­сов, а до кон­ца спи­ска за­даний не дош­ли, то про­хо­дить спи­сок про­цес­сов мы на­чи­на­ем сна­ча­ла. Ес­ли же в ка­кой-то мо­мент вре­мени мы дош­ли до кон­ца спи­ска за­дач, а до кон­ца спи­ска про­цес­сов не дош­ли, то мы за­кан­чи­ва­ем ра­бо­ту, т. к. все за­дания уже рас­пре­де­ле­ны. Все это мы реа­ли­зу­ем в па­ре функ­ций send_worker_tasks/2 и send_worker_tasks/3 (мы не экс­пор­ти­ру­ем эти функ­ции из мо­ду­ля parallel_limited_helper):

send_worker_tasks(PreparedData, WorkerList) ->

send_worker_tasks(PreparedData, WorkerList, 1).

send_worker_tasks([], _WorkerList, _WorkerIndex) -> complete;

send_worker_tasks(PreparedData, WorkerList, WorkerIndex)

when WorkerIndex > length(WorkerList) ->

send_worker_tasks(PreparedData, WorkerList, 1);

send_worker_tasks([{Index, Portion} | Rest], WorkerList, WorkerIndex) ->

Worker = lists:nth(WorkerIndex, WorkerList),

Worker ! {task_request, self(), Index, Portion},

send_worker_tasks(Rest, WorkerList, WorkerIndex + 1).

Функ­ция send_worker_tasks/2 яв­ля­ет­ся ин­тер­фей­сом; ра­бо­та вы­пол­ня­ет­ся в функ­ции send_worker_tasks/3. Об­ход по спи­ску за­даний осу­ще­ст­в­ля­ет­ся при по­мо­щи ре­кур­сив­но-хво­сто­во­го вы­зо­ва функ­ци­ей send_worker_tasks/3 са­мой се­бя. При этом по спи­ску за­даний (спи­ску пор­ций ис­ход­ных дан­ных) мы идем при по­мо­щи опе­ра­ции со­от­вет­ст­вия шаб­ло­ну [pattern-matching], об­ра­ба­ты­вая го­лов­ной эле­мент и пе­ре­да­вая хво­сто­вую часть в ре­кур­сив­но-хво­сто­вой вы­зов функ­ции send_worker_tasks/3. С дру­гой сто­ро­ны, для про­хо­да по спи­ску про­цес­сов мы ис­поль­зу­ем ин­декс те­ку­ще­го за­дания, уве­ли­чи­вая его на единицу при ре­кур­сив­но-хво­сто­вом вы­зо­ве функ­ции send_worker_tasks/3. Реа­ли­зо­ва­но это та­ким спо­со­бом по­то­му, что мы долж­ны об­хо­дить спи­сок про­цес­сов цик­ли­че­­ски, ес­ли за­даний боль­ше, чем про­цес­сов.

Те­перь на­конец-то мы мо­жем реа­ли­зо­вать серд­це на­ше­го при­ме­ра – функ­цию parallel_limited_helper:limited_core/4:

limited_core(FinalAggrFun, SourceList, PortionSize, WorkerList) ->

process_flag(trap_exit, true),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

send_worker_tasks(PreparedData, WorkerList),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

FinalAggrFun(array:to_list(FullStorage)).

В этой функ­ции мы вы­пол­ня­ем всю основ­ную ра­бо­ту по раз­биению ис­ход­ных дан­ных на пор­ции, рас­пре­де­лению за­даний на об­ра­бот­ку пор­ций ис­ход­ных дан­ных рав­но­мер­но по ра­бо­чим про­цес­сам, сбо­ру ре­зуль­та­тов об­ра­бот­ки пор­ций ис­ход­ных дан­ных и объ­е­динению по­лу­чен­ных ре­зуль­та­тов в ито­го­вый ре­зуль­тат. От­ме­тим, что мы не соз­да­ем и не унич­то­жа­ем ра­бо­чие про­цес­сы в об­щей функ­ции parallel_limited_helper:limited_core/4, а от­да­ем эту от­вет­ст­вен­ность вы­зы­ваю­ще­му ко­ду. В свя­зи с этим, функ­ции parallel_limited_helper:limited_core/4, по­ми­мо спи­ска ис­ход­ных дан­ных SourceList и раз­ме­ра пор­ции PortionSize, мы пе­ре­да­ем спи­сок соз­дан­ных ра­бо­чих про­цес­сов WorkerList и функ­цию FinalAggrFun для объ­е­динения ре­зуль­та­тов ра­бо­ты в ито­го­вый ре­зуль­тат. При этом функ­цию для об­ра­бот­ки пор­ций ис­ход­ных дан­ных мы за­да­ем при соз­дании ра­бо­чих про­цес­сов при по­мо­щи функ­ции parallel_limited_helper:limited_worker/1.

На дан­ный мо­мент мы на­пи­са­ли всю необ­хо­ди­мую функ­цио­наль­ность и го­то­вы реа­ли­зо­вать мно­го­за­дач­ные вер­сии функ­ции map и reduce с ис­поль­зо­ванием мо­де­ли «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов (да, мы по­ка боль­ше не бу­дем де­лать мно­го­за­дач­ную вер­сию функ­ции filter). Начнем с вер­сии функ­ции map, ко­то­рая для дан­ного примера реа­ли­за­ции на­зы­ва­ет­ся parallel_map:limited_pmap/4:

limited_pmap(_Fun, [], _PortionSize, _WorkerCount) -> [];

limited_pmap(Fun, SourceList, PortionSize, _WorkerCount)

when length(SourceList) =< PortionSize ->

lists:map(Fun, SourceList);

limited_pmap(Fun, SourceList, PortionSize, WorkerCount) ->

WorkerFun = fun(SourcePortion) -> lists:map(Fun, SourcePortion) end,

WorkerList =

[spawn_link(fun() -> parallel_limited_helper:limited_worker(WorkerFun) end) || _WorkerIndex <- lists:seq(1, WorkerCount)],

Result = parallel_limited_helper:limited_core(fun lists:append/1, SourceList, PortionSize, WorkerList),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, WorkerList),

Result.

Функ­ция parallel_map:limited_pmap/4 со­дер­жит три ва­ри­ан­та: пер­вый ва­ри­ант об­ра­ба­ты­ва­ет си­туа­цию пусто­го спи­ска ис­ход­ных дан­ных, вто­рой ва­ри­ант – когда раз­мер ис­ход­ных дан­ных мень­ше раз­ме­ра пор­ции, а тре­тий ва­ри­ант об­ра­ба­ты­ва­ет об­щий слу­чай. Как го­во­ри­лось вы­ше, от­вет­ст­вен­ность за соз­дание и за­вер­шение ра­бо­чих про­цес­сов мы воз­ла­га­ем на код, ко­то­рый ис­поль­зу­ет функ­цию parallel_limited_helper:limited_core/4. По­это­му пе­ред ис­поль­зо­ванием этой функ­ции мы соз­да­ем WorkerCount «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов, а по­сле по­лу­чения ре­зуль­та­та мы за­вер­ша­ем ра­бо­ту этих ра­бо­чих про­цес­сов.

Пе­рей­дем к вер­сии функ­ции reduce, ко­то­рая для дан­ного примера реа­ли­за­ции на­зы­ва­ет­ся parallel_reduce:limited_reduce/5:

limited_reduce(_Fun, [], {InitValue, _PortionInitValue}, _PortionSize, _WorkerCount) -> InitValue;

limited_reduce(Fun, SourceList, {InitValue, _PortionInitValue}, PortionSize, _WorkerCount) when length(SourceList) =< PortionSize ->

lists:foldl(Fun, InitValue, SourceList);

limited_reduce(Fun, SourceList, {InitValue, PortionInitValue}, PortionSize, WorkerCount) ->

ReduceFun = fun(List) -> lists:foldl(Fun, InitValue, List) end,

PortionReduceFun = fun(List) -> lists:foldl(Fun, PortionInitValue, List) end,

WorkerList =

[spawn_link(fun() -> parallel_limited_helper:limited_worker(PortionReduceFun) end) || _WorkerIndex <- lists:seq(1, WorkerCount)],

Result = parallel_limited_helper:limited_core(ReduceFun, SourceList, PortionSize, WorkerList),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, WorkerList),

Result.

Как вид­но, функ­ция parallel_reduce:limited_reduce/5 реа­ли­зо­ва­на с ис­поль­зо­ванием тех же прин­ци­пов, что и функ­ция parallel_map:limited_pmap/4. Они от­ли­ча­ет­ся толь­ко функ­ция­ми, ко­то­рые об­ра­ба­ты­ва­ют пор­ции ис­ход­ных дан­ных и объ­е­ди­ня­ют ре­зуль­та­ты об­ра­бот­ки пор­ции в ито­го­вый ре­зуль­тат.

Се­го­дня мы сде­ла­ли оче­ред­ной шаг в на­шем прак­ти­ку­ме: мы реа­ли­зо­ва­ли вер­сии функ­ций map и reduce, ко­то­рые ис­поль­зу­ют ог­раничен­ное ко­ли­че­­ст­во ра­бо­чих про­цес­сов (ис­поль­зу­ют мо­дель «мно­го­ра­зо­вых» про­цес­сов). К со­жа­лению, из-за то­го, что ме­сто под ста­тью конеч­но, мы не успе­ли про­тес­ти­ро­вать соз­дан­ные на­ми функ­ции; эту за­да­чу мы остав­ля­ем чи­та­те­лям. А в сле­дую­щий раз мы про­дол­жим наш прак­ти­кум: на оче­ре­ди рас­пре­де­лен­ная вер­сия функ­ций map и reduce и мно­гое дру­гое. |

Персональные инструменты
купить
подписаться
Яндекс.Метрика