Журнал LinuxFormat - перейти на главную

LXF165-166:Язы­ки про­грам­ми­ро­вания: Erlang 92

Материал из Linuxformat
Перейти к: навигация, поиск

Erlang Опи­сы­ва­ет­ся сле­дую­щей фор­му­лой: функ­цио­наль­ный язык + про­цес­сы

Erlang: И сно­ва прак­ти­кум

Ан­д­рей Уша­ков счи­та­ет, что од­но­го прак­ти­ку­ма по мно­го­за­дач­но­сти ма­ло­ва­то – и про­во­дит вто­рой.

(thumbnail)
Наш эксперт Ан­д­рей Уша­ков ак­тив­но при­бли­жа­ет тот день, ко­гда функ­цио­наль­ные язы­ки ста­нут мейн­ст­ри­мом.

В про­шлый раз в нашем прак­ти­куме по мно­го­за­дач­но­сти мы ре­ши­ли соз­дать мно­го­за­дач­ные вер­сий та­ких по­пу­ляр­ных в про­грам­ми­ро­вании функ­ций, как map (опе­ра­ция ото­бра­жения) и reduce (опе­ра­ция сверт­ки). Эти функ­ции бы­ли вы­бра­ны не слу­чай­но: их обыч­ные, не мно­го­за­дач­ные реа­ли­за­ции на­столь­ко про­сты, что при соз­дании их мно­го­за­дач­ных вер­сий незачем силь­но от­вле­кать­ся на де­та­ли, не свя­зан­ные с мно­го­за­дач­но­стью. Про­сто­ту их мы по­ка­за­ли в про­шлый раз, соз­дав их обыч­ную (не мно­го­за­дач­ную) реа­ли­за­цию. Так­же мы соз­да­ли про­стую мно­го­за­дач­ную вер­сию функ­ции map, ис­поль­зующую для ото­бра­жения ка­ж­до­го эле­мен­та свой ра­бо­чий про­цесс. При та­ком под­хо­де (по про­цессу на эле­мен­т) соз­дание мно­го­за­дач­ной вер­сии функ­ции reduce не име­ет смыс­ла. Для соз­дания мно­го­за­дач­ной вер­сии функ­ции reduce (и бо­лее гиб­кой мно­го­за­дач­ной вер­сии функ­ции map) надо раз­би­вать дан­ные на пор­ции и эти пор­ции об­ра­ба­ты­вать па­рал­лель­но, по­сле че­го фор­ми­ро­вать ито­го­вый ре­зуль­тат. На этом мы и оста­но­ви­лись в про­шлый раз.

Снова рас­смот­рим вспо­мо­га­тель­ные функ­ции, соз­да­нные в про­шлый раз для ра­бо­ты с пор­ция­ми дан­ных. Эти функ­ции им­пор­ти­ру­ют­ся из мо­ду­ля parallel_common. Функ­ция calc_portion_count/2 по­зво­ля­ет вы­чис­лить ко­ли­че­­ст­во пор­ций дан­ных по за­дан­ным раз­ме­рам пор­ции и ис­ход­но­го спи­ска с дан­ны­ми:

calc_portion_count(TotalSize, PortionSize)

when TotalSize rem PortionSize == 0 -> TotalSize div PortionSize;

calc_portion_count(TotalSize, PortionSize)

when TotalSize rem PortionSize /= 0 -> (TotalSize div PortionSize) + 1.

Па­ра функ­ций prepare_data/2 (ин­тер­фейс­ная функ­ция) и prepare_data/3 (функ­ция, реа­ли­зую­щая дан­ную функ­цио­наль­ность) раз­би­ва­ют ис­ход­ные дан­ные на пор­ции:

prepare_data(_PortionSize, []) -> [];

prepare_data(PortionSize, SourceList) -> prepare_data(0, PortionSize, SourceList, []).

prepare_data(Index, PortionSize, SourceList, PreparedData)

when length(SourceList) =< PortionSize ->

lists:reverse([{Index, SourceList}] ++ PreparedData);

prepare_data(Index, PortionSize, SourceList, PreparedData) ->

{Portion, Rest} = lists:split(PortionSize, SourceList),

prepare_data(Index + 1, PortionSize, Rest, [{Index, Portion}] ++ PreparedData).

Ис­ход­ный спи­сок раз­би­ва­ется на спи­сок пар (кор­те­жей), со­стоя­щих из ин­дек­са пор­ции и соб­ст­вен­но са­мой пор­ции с дан­ны­ми.

Те­перь мы го­то­вы дви­гать­ся даль­ше. В мно­го­за­дач­ной вер­сии функ­ции reduce у нас бу­дет один глав­ный про­цесс и несколь­ко вспо­мо­га­тель­ных ра­бо­чих про­цес­сов. Глав­ный про­цесс (в ко­то­ром мы иниции­ру­ем вы­полнение на­шей вер­сии функ­ции reduce) от­вечает за раз­биение дан­ных на пор­ции, соз­дание необ­хо­ди­мо­го ко­ли­че­­ст­ва ра­бо­чих про­цес­сов и за­даний для них, сбор ре­зуль­та­тов ра­бо­ты от всех ра­бо­чих про­цес­сов и, на­конец, сверт­ку ре­зуль­та­тов ра­бо­ты вспо­мо­га­тель­ных про­цес­сов в ито­го­вый ре­зуль­тат. Как и при реа­ли­за­ции функ­ции parallel_map: simple_pmap/2, мы ис­поль­зу­ем «од­но­ра­зо­вые» ра­бо­чие про­цес­сы. При та­ком под­хо­де мо­жно раз­дать за­дания сра­зу же при соз­дании про­цес­сов. А ра­бо­чая функ­ция «од­но­ра­зо­вых» про­цес­сов крайне про­ста: вы­чис­лить ре­зуль­тат сверт­ки для пор­ции и ото­слать его глав­но­му про­цес­су. По­сле соз­дания всех «од­но­ра­зо­вых» ра­бо­чих про­цес­сов все взаи­мо­дей­ст­вие с ними сво­дит­ся толь­ко к по­лу­чению от них ре­зуль­та­тов и со­хранению этих ре­зуль­та­тов в про­ме­жу­точ­ном бу­фе­ре (в мас­си­ве). Для этого мы мо­жем ис­поль­зо­вать функ­цию parallel_common: collect_result/2, толь­ко вме­сто со­хранения от­дель­ных эле­мен­тов по­сле ото­бра­жения (что мы де­ла­ли, когда реа­ли­зо­вы­ва­ли про­стей­ший мно­го­за­дач­ный ва­ри­ант функ­ции map) мы бу­дем со­хра­нять ре­зуль­та­ты сверт­ки для груп­пы эле­мен­тов. Под­ве­дя итог вы­шеска­зан­но­му, мы мо­жем на­пи­сать ра­бо­чую функ­цию глав­но­го про­цес­са (она же точ­ка вхо­да в мно­го­за­дач­ную вер­сию функ­ции reduce) сле­дую­щим об­ра­зом:

portion_reduce(_Fun, [], {InitValue, _PortionInitValue}, _PortionSize) -> InitValue;

portion_reduce(Fun, SourceList, {InitValue, _PortionInitValue}, PortionSize)

when length(SourceList) =< PortionSize ->

lists:foldl(Fun, InitValue, SourceList);

portion_reduce(Fun, SourceList, {InitValue, PortionInitValue}, PortionSize) ->

process_flag(trap_exit, true),

MasterPid = self(),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

lists:foreach(fun({Index, Portion}) -> spawn_link(fun() -> portion_worker(Fun, Portion, PortionInitValue, Index, MasterPid) end) end, PreparedData),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

lists:foldl(Fun, InitValue, array:to_list(FullStorage)).

Эта функ­ция со­дер­жит три ва­ри­ан­та. Пер­вый об­ра­ба­ты­ва­ет наи­бо­лее три­ви­аль­ный слу­чай, когда ис­ход­ный спи­сок пуст, и мы воз­вра­ща­ем на­чаль­ное зна­чение опе­ра­ции сверт­ки. Вто­рой ва­ри­ант об­ра­ба­ты­ва­ет си­туа­цию, когда ко­ли­че­­ст­во эле­мен­тов в спи­ске не боль­ше раз­мера пор­ции; тогда нет смыс­ла рас­па­рал­ле­ли­вать вы­полнение функ­ции, и опе­ра­ция сверт­ки вы­пол­ня­ет­ся при по­мо­щи функ­ции lists:foldl/3. И, на­конец, по­следний ва­ри­ант об­ра­ба­ты­ва­ет наи­бо­лее об­щий слу­чай, ко­то­рый и яв­ля­ет­ся мно­го­за­дач­ным. При этом не сто­ит за­бы­вать, что при по­мо­щи ра­бо­чих про­цес­сов мы фор­ми­ру­ем толь­ко ре­зуль­та­ты опе­ра­ции сверт­ки по под­груп­пам; фи­наль­ную опе­ра­цию сверт­ки про­ме­жу­точ­ных ре­зуль­та­тов мы вы­пол­ня­ем в кон­це ра­бо­чей функ­ции глав­но­го про­цес­са. И, конеч­но, мы помним, что функ­ция portion_reduce/3 яв­ля­ет­ся экс­пор­ти­руе­мой функ­ци­ей мо­ду­ля parallel_reduce.

Как го­во­ри­лось в пре­ды­ду­щей ста­тье, при реа­ли­за­ции па­рал­лель­но­го ва­ри­ан­та функ­ции reduce нам необ­хо­ди­мо за­да­вать па­ру на­чаль­ных зна­чений, в от­ли­чие от обыч­но­го ва­ри­ан­та функ­ции reduce. Од­но зна­чение из этой па­ры яв­ля­ет­ся на­чаль­ным зна­чением всей опе­ра­ции сверт­ки, дру­гое же яв­ля­ет­ся «ну­лем» опе­ра­ции сверт­ки; оно необ­хо­ди­мо для вы­полнения опе­ра­ции сверт­ки в под­груп­пах. Так, на­при­мер, ес­ли опе­ра­ци­ей сверт­ки яв­ля­ет­ся опе­ра­ция сум­ми­ро­вания чи­сел, то «ну­лем» бу­дет чис­ло 0, а ес­ли опе­ра­ци­ей сверт­ки бу­дет опе­ра­ция кон­ка­те­на­ции строк, «ну­лем» опе­ра­ции бу­дет пустая стро­ка. В об­щем слу­чае, что бу­дет яв­лять­ся «ну­лем», для опе­ра­ции сверт­ки не вы­чис­лить. По­это­му поль­зо­ва­тель на­шей мно­го­за­дач­ной вер­сии функ­ции reduce дол­жен пе­ре­да­вать как на­чаль­ное зна­чение опе­ра­ции сверт­ки, так и «нуль» этой опе­ра­ции. В на­шей реа­ли­за­ции мы ожи­да­ем, что эти два зна­чения бу­дут пе­ре­да­ны в ви­де па­ры (кор­те­жа) {InitValue, PortionInitValue}, где InitValue – на­чаль­ное зна­чение всей опе­ра­ции сверт­ки, PortionInitValue – «нуль» опе­ра­ции сверт­ки.

Те­перь по­смот­рим на то, чем же занима­ют­ся у нас ра­бо­чие про­цес­сы (т. е. на функ­цию, вы­пол­няе­мую ра­бо­чи­ми про­цес­са­ми). Это функ­ция portion_worker/5, оп­ре­де­лен­ная в мо­ду­ле parallel_reduce (но не экс­пор­ти­руе­мая из это­го мо­ду­ля):

portion_worker(Fun, SourcePortion, InitValue, Index, MasterPid)->

AggrValue = lists:foldl(Fun, InitValue, SourcePortion),

MasterPid ! {result, Index, AggrValue}.

Как мы уже го­во­ри­ли ранее, ра­бо­чие про­цес­сы у нас «од­но­ра­зо­вые», т. е. вы­пол­ня­ют свою за­да­чу и за­кан­чи­ва­ют ра­бо­ту. Имен­но по­это­му функ­ция, ко­то­рую вы­пол­ня­ют ра­бо­чие про­цес­сы, на­столь­ко про­ста: в ней мы вы­чис­ля­ем зна­чение опе­ра­ции сверт­ки для груп­пы и по­сы­ла­ем со­об­щение глав­но­му про­цес­су с вы­чис­лен­ным зна­чением. Со­об­щение име­ет вид {result, Index, AggrValue}, где Index – ин­декс ис­ход­ной пор­ции дан­ных, AggrValue – зна­чение опе­ра­ции сверт­ки для ис­ход­ной пор­ции. В ка­че­­ст­ве на­чаль­но­го зна­чения InitValue для опе­ра­ции сверт­ки пор­ции дан­ных мы пе­ре­да­ем «нуль» опе­ра­ции сверт­ки (при соз­дании ра­бо­чих про­цес­сов).

При­шла по­ра про­ве­рить, что на­ша мно­го­за­дач­ная вер­сия функ­ции reduce ра­бо­та­ет пра­виль­но. Для это­го ком­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и за­пуска­ем кон­соль сре­ды вы­полнения язы­ка Erlang. Так как у нас функ­ция parallel_reduce:portion_reduce/4 име­ет три ва­ри­ан­та, все эти три ва­ри­ан­та бы­ло бы непло­хо про­ве­рить. Для на­ча­ла в ка­че­­ст­ве опе­ра­ции сверт­ки возь­мем опе­ра­цию сло­жения чи­сел. Вы­зов parallel_reduce:portion_reduce(fun(Item, Agg) -> Item Ag+g end, [], {3, 0}, 5) вернет нам в ка­че­­ст­ве зна­чения чис­ло 3, т. е. на­чаль­ное зна­чение опе­ра­ции сверт­ки. Вы­зов parallel_reduce:portion_reduce(fun(Item, Agg) -> Item + Agg end, [1, 2, 3], {3, 0}, 5) вернет чис­ло 9. Это сум­ма всех чи­сел из спи­ска [1, 2, 3] с ве­ли­чи­ной 3 в ка­че­­ст­ве на­чаль­но­го зна­чения сум­мы. Так как ко­ли­че­­ст­во эле­мен­тов в спи­ске – 3, а раз­мер пор­ции – 5, то сум­ма всех чи­сел вы­чис­ля­ет­ся при по­мо­щи вто­ро­го ва­ри­ан­та функ­ции parallel_reduce:portion_reduce/4, т. е. не мно­го­за­дач­ным спо­со­бом. Вы­пол­ня­ем вы­зов parallel_reduce:portion_reduce(fun(Item, Agg) -> Item + Agg end, [1, 2, 3, 4, 5, 6], {3, 0}, 2) и по­лу­ча­ем в ре­зуль­та­те зна­чение 24. Лег­ко про­ве­рить, что сум­ма всех чи­сел из спи­ска с на­чаль­ным зна­чением этой сум­мы 3 бу­дет рав­на 24. В этом вы­зо­ве раз­мер пор­ции ра­вен 2, а ко­ли­че­­ст­во эле­мен­тов в спи­ске – 6; так что при вы­зо­ве бу­дут соз­да­ны 3 ра­бо­чих про­цес­са.

Пре­ды­ду­щая опе­ра­ция сверт­ки бы­ла ком­му­та­тив­ной опе­ра­ци­ей; да­вай­те про­ве­рим ра­бо­ту на­шей мно­го­за­дач­ной вер­сии функ­ции reduce в слу­чае, когда опе­ра­ция сверт­ки ком­му­та­тив­ной не яв­ля­ет­ся. В ка­че­­ст­ве та­кой неком­му­та­тив­ной опе­ра­ции сверт­ки возь­мем опе­ра­цию кон­ка­те­на­ции строк. Как и в пре­ды­ду­щем слу­чае, необ­хо­ди­мо про­ве­рить ра­бо­ту всех трех ва­ри­ан­тов функ­ции parallel_reduce:portion_reduce/4. Вы­зов parallel_reduce:portion_reduce(fun(Item, Agg) -> Agg ++ Item end, [], {“++”, “”}, 2) вернет нам в ка­че­­ст­ве зна­чения стро­ку «++», т. е. на­чаль­ное зна­чение опе­ра­ции кон­ка­те­на­ции. Ре­зуль­та­том вы­полнения вы­зо­ва parallel_reduce:portion_reduce(fun(Item, Agg) -> Agg ++ Item end, [“aa”, “bb”], {“++”, “”}, 5) бу­дет стро­ка «++aabb”. Лег­ко про­ве­рить, что кон­ка­те­на­ция всех строк из ис­ход­но­го спи­ска с на­чаль­ным зна­чением «++» даст нам стро­ку «++aabb”. При этом ко­ли­че­­ст­во эле­мен­тов в спи­ске рав­но 2, а раз­мер пор­ции ра­вен 5; это оз­на­ча­ет, что ре­зуль­тат кон­ка­те­на­ции строк мы по­лу­чи­ли при по­мо­щи вто­ро­го ва­ри­ан­та функ­ции parallel_reduce:portion_reduce/4. И, на­конец, ре­зуль­та­том вы­полнения вы­зо­ва parallel_reduce:portion_reduce(fun(Item, Agg) -> Agg ++ Item end, [“aa”, “bb”, “cc”, “dd”, “ee”, “ff”], {“++”, “”}, 2) бу­дет стро­ка «++aabbccddeeff”. Оче­вид­но, что эта стро­ка яв­ля­ет­ся ре­зуль­та­том кон­ка­те­на­ции всех строк из ис­ход­но­го спи­ска, с на­чаль­ным зна­чением «++». Так как раз­мер спи­ска –6 эле­мен­тов, а раз­мер пор­ции дан­ных – 2, то этот вы­зов бу­дет об­ра­бо­тан треть­им ва­ри­ан­том функ­ции parallel_reduce:portion_reduce/4; при этом бу­дет соз­да­но 3 ра­бо­чих по­то­ка.

Те­перь соз­да­дим мно­го­за­дач­ную вер­сию функ­ции map, ра­бо­чие про­цес­сы ко­то­рой бу­дут об­ра­ба­ты­вать не от­дель­ные эле­мен­ты ис­ход­но­го спи­ска, а пор­ции из эле­мен­тов. Как и рань­ше, у нас бу­дет один глав­ный про­цесс и несколь­ко вспо­мо­га­тель­ных ра­бо­чих про­цес­сов. В глав­ном про­цес­се (в ко­то­ром мы иниции­ру­ем вы­полнение на­шей вер­сии функ­ции map) мы бу­дем раз­би­вать дан­ные на пор­ции, соз­да­вать необ­хо­ди­мое ко­ли­че­­ст­во ра­бо­чих про­цес­сов и за­даний для них, со­би­рать ре­зуль­та­ты ра­бо­ты от всех ра­бо­чих про­цес­сов и, на­конец, пре­об­ра­зо­вы­вать со­б­ран­ные про­ме­жу­точ­ные ре­зуль­та­ты в ито­го­вый спи­сок. При этом ра­бо­чие про­цес­сы у нас про­дол­жа­ют быть «од­но­ра­зо­вы­ми», т. е. они вы­пол­ня­ют свою за­да­чу, от­сы­ла­ют ре­зуль­та­ты ра­бо­ты глав­но­му про­цес­су и за­кан­чи­ва­ют свое су­ще­ст­во­вание в этом брен­ном ми­ре. По­это­му мы мо­жем не оста­нав­ли­вать­ся под­роб­но на прин­ци­пах ра­бо­ты, а сра­зу по­смот­реть код со­от­вет­ст­вую­щих ме­то­дов. Ра­бо­чая функ­ция глав­но­го про­цес­са (она же точ­ка вхо­да в мно­го­за­дач­ную вер­сию функ­ции map) portion_pmap/3 оп­ре­де­ле­на в мо­ду­ле parallel_map (и, ес­те­ст­вен­но, объ­яв­ле­на экс­пор­ти­руе­мой функ­ци­ей из это­го мо­ду­ля):

portion_pmap(_Fun, [], _PortionSize) -> [];

portion_pmap(Fun, SourceList, PortionSize)

when length(SourceList) =< PortionSize ->

lists:map(Fun, SourceList);

portion_pmap(Fun, SourceList, PortionSize) ->

process_flag(trap_exit, true),

MasterPid = self(),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

lists:foreach(fun({Index, Portion}) -> spawn_link(fun() -> portion_worker(Fun, Portion, Index, MasterPid) end) end, PreparedData),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

lists:append(array:to_list(FullStorage)).

Как и в слу­чае функ­ции parallel_reduce:portion_reduce/4, мы оп­ре­де­ля­ем три ва­ри­ан­та функ­ции. Пер­вый об­ра­ба­ты­ва­ет слу­чай пусто­го спи­ска; вто­рой – слу­чай, когда ко­ли­че­­ст­во эле­мен­тов в спи­ске мень­ше раз­ме­ра пор­ции (в этом слу­чае нет смыс­ла рас­па­рал­ле­ли­вать за­да­чу); и, на­конец, тре­тий – об­щий слу­чай.

Рас­смот­рим функ­цию, ко­то­рую бу­дут вы­пол­нять ра­бо­чие про­цес­сы. Это функ­ция portion_worker/4, оп­ре­де­лен­ная в мо­ду­ле parallel_map (но не экс­пор­ти­руе­мая из него):

portion_worker(Fun, SourcePortion, Index, MasterPid) ->

DestPortion = lists:map(Fun, SourcePortion),

MasterPid ! {result, Index, DestPortion}.

Так как ра­бо­чие про­цес­сы у нас опять же «од­но­ра­зо­вые», то вы­пол­няе­мая ими функ­ция очень про­ста: мы вы­чис­ля­ем ре­зуль­тат опе­ра­ции ото­бра­жения пор­ции и воз­вра­ща­ем его глав­но­му про­цес­су при по­мо­щи по­сыл­ки со­об­щения ви­да {result, Index, DestPortion }, где Index – ин­декс ис­ход­ной пор­ции дан­ных, DestPortion – ре­зуль­тат при­менения опе­ра­ции ото­бра­жения к ис­ход­ной пор­ции.

Про­ве­рим, что но­вая мно­го­за­дач­ная вер­сия функ­ции map ра­бо­та­ет пра­виль­но. Для это­го ском­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и за­пустим кон­соль сре­ды вы­полнения язы­ка Erlang. Так как у функ­ции parallel_map:portion_pmap/3 оп­ре­де­ле­но три ва­ри­ан­та, необ­хо­ди­мо про­ве­рить ра­бо­ту их всех. Начнем с пер­во­го ва­ри­ан­та, об­ра­ба­ты­вающ­его слу­чай пустого ис­ход­ного спи­ска. Вы­зов parallel_map:portion_pmap(fun(Item) -> lists:reverse(Item) end, [], 4) воз­вра­ща­ет пустой спи­сок, как и ожи­да­лось. Вто­рой ва­ри­ант – когда ко­ли­че­­ст­во эле­мен­тов в спи­ске мень­ше раз­мера пор­ции (тогда до­полнитель­ных про­цес­сов мы не соз­да­ем, а все вы­чис­ления вы­пол­ня­ем в вы­зы­ваю­щем про­цес­се). Вы­зов parallel_map:portion_pmap(fun(Item) -> lists:reverse(Item) end, [“str13”, “str667”], 4) вернет спи­сок [«31rts”, “766rts”], как и ожи­да­ет­ся. Так как спи­сок со­дер­жит 2 эле­мен­та, а раз­мер пор­ции 4, то мы мо­жем быть уве­ре­ны, что наш вы­зов бу­дет вы­полнен вто­рым ва­ри­ан­том функ­ции parallel_map:portion_pmap/3. И, на­конец, про­ве­рим по­следний ва­ри­ант функ­ции parallel_map:portion_pmap/3. Вы­зов parallel_map:portion_pmap(fun(Item) -> lists:reverse(Item) end, [“str13”, “str67”, “str667”, “str909”], 2) вернет нам спи­сок [“31rts”, “76rts”, “766rts”, “909rts”]. Оче­вид­но, что это зна­чение яв­ля­ет­ся пра­виль­ным. При по­следнем вы­зо­ве раз­мер пор­ции – 2, а ис­ход­ный спи­сок со­дер­жит 4 эле­мен­та; то есть соз­дутся два ра­бо­чих по­то­ка для вы­полнения опе­ра­ции ото­бра­жения ка­ж­дой пор­ции дан­ных.

Да­вай­те внима­тель­но по­смот­рим на соз­дан­ные на­ми вы­ше мно­го­за­дач­ные вер­сии функ­ций map и reduce и сравним, как эти ме­то­ды реа­ли­зу­ют свою функ­цио­наль­ность (сравнивать, что эти ме­то­ды де­ла­ют, оче­вид­но, не име­ет смыс­ла). В обо­их слу­ча­ях у нас есть глав­ный про­цесс (он же про­цесс, в ко­то­ром иниции­ро­вал­ся вы­зов функ­ции) и несколь­ко вспо­мо­га­тель­ных «од­но­ра­зо­вых» ра­бо­чих про­цес­са. В обо­их слу­ча­ях в глав­ном про­цес­се мы раз­би­ва­ем дан­ные на пор­ции, соз­да­ем нужное ко­ли­че­­ст­во ра­бо­чих про­цес­сов и за­даний для них, со­би­ра­ем ре­зуль­та­ты ра­бо­ты от всех ра­бо­чих про­цес­сов и, на­конец, пре­об­ра­зо­вы­ва­ем со­б­ран­ные про­ме­жу­точ­ные ре­зуль­та­ты в ито­го­вый ре­зуль­тат. А в ра­бо­чих про­цес­сах, опять же в обо­их слу­ча­ях, мы вы­пол­ня­ем за­дание, от­сы­ла­ем ре­зуль­тат ра­бо­ты об­рат­но глав­но­му про­цес­су, и все.


Вид­но, что оба эти ме­то­да ор­ганизу­ют свою ра­бо­ту оди­на­ко­вым об­ра­зом. Они от­ли­ча­ют­ся толь­ко функ­ци­ей, об­ра­ба­ты­ваю­щей пор­ции дан­ных в ра­бо­чих про­цес­сах, и функ­ци­ей, пре­об­ра­зо­вы­ваю­щей про­ме­жу­точ­ные ре­зуль­та­ты в ито­го­вый ре­зуль­тат. В реа­ли­за­ции функ­ции map для об­ра­бот­ки пор­ции дан­ных в ра­бо­чем про­цес­се ис­поль­зу­ет­ся функ­ция lists:map/2, а для пре­об­ра­зо­вания про­ме­жу­точ­ных ре­зуль­та­тов в ито­го­вый ре­зуль­тат – функ­ция lists:append/1. В реа­ли­за­ции функ­ции reduce в обо­их слу­ча­ях применя­ет­ся функ­ция lists:foldl/3, толь­ко с раз­ны­ми па­ра­мет­ра­ми. Ло­гич­но с­просить: а зачем мы сей­час сравнива­ем на­ши реа­ли­за­ции функ­ций map и reduce? От­вет, по­жа­луй, оче­ви­ден: во-пер­вых, мы хо­тим из­бе­жать дуб­ли­ро­вания ко­да в уже су­ще­ст­вую­щих реа­ли­за­ци­ях ме­то­дов map и reduce, а во-вто­рых, хо­тим в дальней­шем соз­да­вать па­рал­лель­ные вер­сии еще ка­ких-ли­бо ме­то­дов с минималь­ны­ми за­тра­та­ми. В на­шем слу­чае вид­но, что мы мо­жем вы­де­лить об­щую часть в реа­ли­за­ции ме­то­дов parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4. Все вы­де­ляе­мые об­щие ме­то­ды мы по­ме­ща­ем в мо­дуль parallel_portion_helper (и, со­от­вет­ст­вен­но, в файл parallel_portion_helper.erl).

Начнем с функ­ции, ко­то­рую вы­пол­ня­ют ра­бо­чие про­цес­сы. Для соз­дания ее об­щей вер­сии доста­точ­но за­менить кон­крет­ные функ­ции, ис­поль­зуе­мые для об­ра­бот­ки пор­ции в ра­бо­чих про­цес­сах на неко­то­рую функ­цию, пе­ре­да­вае­мую как па­ра­метр:

portion_worker(Fun, SourcePortion, Index, MasterPid) ->

DestPortion = Fun(SourcePortion),

MasterPid ! {result, Index, DestPortion}.

Здесь па­ра­метр Fun и яв­ля­ет­ся той са­мой функ­ци­ей, ко­то­рая об­ра­ба­ты­ва­ет вход­ную пор­цию дан­ных. Сле­ду­ет за­ме­тить, что эта функ­ция не экс­пор­ти­ру­ет­ся из мо­ду­ля parallel_portion_helper. Те­перь вы­де­лим об­щую часть из ра­бо­чих функ­ций глав­ных про­цес­сов (и, со­от­вет­ст­вен­но, то­чек вхо­да) для на­ших реа­ли­за­ций ме­то­дов map и reduce. Как уже го­во­ри­лось, ра­бо­чие функ­ции глав­ных про­цес­сов от­ли­ча­ют­ся толь­ко сле­дую­щи­ми ас­пек­та­ми: функ­ци­ей, ко­то­рая об­ра­ба­ты­ва­ет пор­ции дан­ных в ра­бо­чих про­цес­сах, и функ­ци­ей, ко­то­рая пре­об­ра­зо­вы­ва­ет про­ме­жу­точ­ные ре­зуль­таты в ито­го­вый ре­зуль­тат. Оче­вид­но, что об­щая часть долж­на со­дер­жать обе эти функ­ции в ка­че­­ст­ве па­ра­мет­ров. С уче­том все­го ска­зан­но­го, об­щая часть ра­бо­чей функ­ции глав­но­го по­то­ка долж­на вы­гля­деть сле­дую­щим об­ра­зом (ес­те­ст­вен­но, что эта функ­ция яв­ля­ет­ся экс­пор­ти­руе­мой из мо­ду­ля parallel_portion_helper):

portion_core(WorkerFun, FinalAggrFun, SourceList, PortionSize) ->

process_flag(trap_exit, true),

MasterPid = self(),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

lists:foreach(fun({Index, Portion}) -> spawn_link(fun() -> portion_worker(WorkerFun, Portion, Index, MasterPid) end) end, PreparedData),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

FinalAggrFun(array:to_list(FullStorage)).

Здесь па­ра­метр WorkerFun яв­ля­ет­ся функ­ци­ей, ко­то­рая при­ме­ня­ет­ся для об­ра­бот­ки пор­ций ис­ход­ных дан­ных в ра­бо­чих про­цес­сах, а па­ра­метр FinalAggrFun – функ­ци­я для пре­об­ра­зо­вания про­ме­жу­точ­ных ре­зуль­та­тов в ито­го­вый ре­зуль­тат. По­ми­мо этих двух па­ра­мет­ров, эта функ­ция принима­ет так­же ис­ход­ный спи­сок SourceList и раз­мер пор­ции PortionSize. Мо­жет возник­нуть во­прос: мо­жем ли мы вынести в ме­тод parallel_portion_helper:portion_core/4 до­полнитель­ные ва­ри­ан­ты, ко­то­рые бы­ли в ме­то­дах parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4 и пред­на­зна­ча­лись для об­ра­бот­ки си­туа­ций, когда ис­ход­ный спи­сок ли­бо пуст, ли­бо его раз­мер не боль­ше раз­ме­ра пор­ции? Для от­ве­та сравним ва­ри­ан­ты функ­ций parallel_map: portion_pmap/3 и parallel_reduce:portion_reduce/4, ко­то­рые об­ра­ба­ты­ва­ют си­туа­цию пусто­го спи­ска с дан­ны­ми. Функ­ция parallel_map:portion_pmap/3 в этом слу­чае воз­вра­ща­ет пустой спи­сок, а функ­ция parallel_reduce:portion_reduce/4 – на­чаль­ное зна­чение опе­ра­ции сверт­ки. Вид­но, что для это­го ва­ри­ан­та воз­вра­щае­мые зна­чения спе­ци­фич­ны. Точ­но так же мож­но уви­деть спе­ци­фи­ку и в ва­ри­ан­тах функ­ций parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4, ко­то­рые об­ра­ба­ты­ва­ют си­туа­цию, когда раз­мер спи­ска не пре­вы­ша­ет раз­ме­ра пор­ции. Имен­но по этой при­чине та­ких ва­ри­ан­тов нет в функ­ции parallel_portion_helper:portion_core/4.

Те­перь по­смот­рим, как мож­но пе­ре­пи­сать функ­ции parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4, ис­поль­зуя соз­дан­ную функ­цию parallel_portion_helper:portion_core/4. Но­вые (пе­ре­пи­сан­ные) функ­ции бу­дут на­зы­вать­ся parallel_map:portion_gen_pmap/3 и parallel_reduce:portion_gen_reduce/4, со­от­вет­ст­вен­но. Реа­ли­за­ция parallel_map:portion_gen_pmap/3 функ­ции вы­гля­дит сле­дую­щим об­ра­зом:

portion_gen_pmap(_Fun, [], _PortionSize) -> [];

portion_gen_pmap(Fun, SourceList, PortionSize)

when length(SourceList) =< PortionSize ->

lists:map(Fun, SourceList);

portion_gen_pmap(Fun, SourceList, PortionSize) ->

WorkerFun = fun(SourcePortion) -> lists:map(Fun, SourcePortion) end,

parallel_portion_helper:portion_core(WorkerFun, fun lists:append/1, SourceList, PortionSize).

Здесь функ­ция WorkerFun соз­да­ет­ся на осно­ве функ­ции lists:map/2, а функ­ци­ей FinalAggrFun яв­ля­ет­ся функ­ция lists: append/1. Ана­ло­гич­ным об­ра­зом вы­гля­дит реа­ли­за­ция функ­ции parallel_reduce:portion_gen_reduce/4:

portion_gen_reduce(_Fun, [], {InitValue, _PortionInitValue}, _PortionSize) -> InitValue;

portion_gen_reduce(Fun, SourceList, {InitValue, _PortionInitValue}, PortionSize)

when length(SourceList) =< PortionSize ->

lists:foldl(Fun, InitValue, SourceList);

portion_gen_reduce(Fun, SourceList, {InitValue, PortionInitValue}, PortionSize) ->

ReduceFun = fun(List) -> lists:foldl(Fun, InitValue, List) end,

PortionReduceFun = fun(List) -> lists:foldl(Fun, PortionInitValue, List) end,

parallel_portion_helper:portion_core(PortionReduceFun, ReduceFun, SourceList, PortionSize).

Здесь и функ­ция WorkerFun, и функ­ция FinalAggrFun соз­да­ют­ся на осно­ве функ­ции lists:foldl/3; глав­ное от­ли­чие ме­ж­ду эти­ми по­строения­ми в том, что в ка­че­­ст­ве па­ра­мет­ра Acc0 (на­чаль­ное зна­чение) в пер­вом слу­чае бе­рет­ся «нуль» опе­ра­ции сверт­ки, а во вто­ром – на­чаль­ное зна­чение опе­ра­ции сверт­ки.

Для тес­ти­ро­вания ра­бо­ты функ­ций parallel_map:portion_gen_pmap/3 и parallel_reduce:portion_gen_reduce/4 мы мо­жем ис­поль­зо­вать те же сце­на­рии, что и для тес­ти­ро­вания функ­ций parallel_map:portion_pmap/3 и parallel_reduce:portion_reduce/4. По­это­му дуб­ли­ро­вать ре­зуль­та­ты тес­ти­ро­вания функ­ций parallel_map:portion_gen_pmap/3 и parallel_reduce:portion_gen_reduce/4 мы здесь не бу­дем (же­лаю­щие мо­гут про­вес­ти его са­ми и убе­дить­ся, что все ра­бо­та­ет, как на­до).

Се­го­дня мы сде­ла­ли оче­ред­ной шаг в на­шем прак­ти­ку­ме: соз­да­ли мно­го­за­дач­ные вер­сии функ­ций map и reduce, ко­то­рые об­ра­ба­ты­ва­ют пор­ции ис­ход­ных дан­ных па­рал­лель­но. На­страи­вая раз­мер пор­ции, мы мо­жем управ­лять про­из­во­ди­тель­но­стью на­ших функ­ций (мы об этом по­го­во­рим в од­ном из сле­дую­щих но­ме­ров). Бо­лее то­го, мы смог­ли най­ти и вы­де­лить об­щее яд­ро у со­от­вет­ст­вую­щих вер­сий функ­ций map и reduce. Важ­ность это­го ре­зуль­та­та в том, что при соз­дании всех дальней­ших мно­го­за­дач­ных вер­сий функ­ций map и reduce мы бу­дем соз­да­вать для них неко­то­рое об­щее яд­ро. Это по­мо­жет из­бе­жать дуб­ли­ро­вания ко­да и об­лег­чит соз­дание мно­го­за­дач­ных вер­сий ка­ких-ли­бо еще функ­ций (но об этом мы по­го­во­рим в сле­дую­щий раз). |

Персональные инструменты
купить
подписаться
Яндекс.Метрика