Журнал LinuxFormat - перейти на главную

LXF168:Язы­к про­грам­ми­ро­вания Erlang

Материал из Linuxformat
Перейти к: навигация, поиск

Erlang Опи­сы­ва­ет­ся сле­дую­щей фор­му­лой: функ­цио­наль­ный язык + про­цес­сы

Erlang: Прак­ти­ка мно­го­за­дач­но­сти

Ан­д­рей Уша­ков не по­сту­пил­ся прин­ци­пом еди­но­лич­ной от­вет­ст­вен­но­сти. И вот ку­да это его за­ве­ло...

(thumbnail)
Наш эксперт Ан­д­рей Уша­ков ак­тив­но при­бли­жа­ет тот день, ко­гда функ­цио­наль­ные язы­ки ста­нут мейн­ст­ри­мом.

Итак, мы про­дол­жа­ем ре­шать на­шу боль­шую за­да­чу: соз­дание мно­го­за­дач­ных вер­сий функ­ций map и reduce. На этом уро­ке мы по­го­во­рим о том, как на осно­ве функ­ций, соз­дан­ных в про­шлый раз, реа­ли­зо­вать рас­пре­де­лен­ные (вы­пол­няю­щие­ся на раз­лич­ных уз­лах) вер­сии функ­ций map и reduce. Так­же мы про­ана­ли­зи­ру­ем до­пу­щения, при­ня­тые на­ми, когда мы реа­ли­зо­вы­ва­ли вер­сии функ­ций map и reduce на осно­ве «мно­го­ра­зо­вых» про­цес­сов. Это по­зво­лит нам по­нять, ку­да идти даль­ше. Но сперва взглянем, где мы оста­но­ви­лись в про­шлый раз.

На про­шлом уро­ке мы пе­ре­шли к мо­де­ли «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов и ог­раничи­ли их чис­ло. На осно­ве это­го под­хо­да мы соз­да­ли оче­ред­ные мно­го­за­дач­ные вер­сии функ­ций map и reduce: функ­ции parallel_map:limited_pmap/4 и parallel_reduce:limited_reduce/5. И, как обыч­но, при соз­дании этих функ­ций мы вынесли об­щую функ­цио­наль­ность в ряд функ­ций, рас­по­ла­гаю­щих­ся в мо­ду­ле parallel_limited_helper. При реа­ли­за­ции этой функ­цио­наль­но­сти мы при­ня­ли два важ­ных со­гла­шения: во-пер­вых, до­го­во­ри­лись, что от­вет­ст­вен­ность за соз­дание и унич­то­жение про­цес­сов ложит­ся на вы­зы­ваю­щую сто­ро­ну. Во-вто­рых, все за­дания мы сра­зу рас­пре­де­ля­ем ме­ж­ду ра­бо­чи­ми про­цес­са­ми, по­сле че­го толь­ко до­жи­да­ем­ся ре­зуль­та­тов их ра­бо­ты.

Еще раз рас­смот­рим эту об­щую функ­цио­наль­ность. parallel_limited_helper:limited_worker/1 яв­ля­ет­ся функ­ци­ей, ко­то­рую вы­пол­ня­ет ра­бо­чий про­цесс во вре­мя сво­ей жизни:

limited_worker(Fun) ->

receive

{task_request, MasterPid, Index, SourcePortion} ->

Dest = Fun(SourcePortion),

MasterPid ! {result, Index, Dest},

limited_worker(Fun);

_Other -> limited_worker(Fun)

end.

В этой функ­ции ра­бо­чие про­цес­сы вы­пол­ня­ют за­дания на об­ра­бот­ку пор­ций дан­ных, ко­то­рые они по­лу­ча­ют в ви­де со­об­щений, по­слан­ных ра­бо­чим про­цес­сам глав­ным про­цес­сом. Ес­те­ст­вен­но, что ре­зуль­та­ты ра­бо­ты от­сы­ла­ют­ся об­рат­но глав­но­му про­цес­су. Мы экс­пор­ти­ру­ем эту функ­цию из мо­ду­ля parallel_limited_helper, т. к. мы до­го­во­ри­лись, что ра­бо­чие про­цес­сы соз­да­ет внешний код. Па­ра функ­ций send_worker_tasks/2 и send_worker_tasks/3 ис­поль­зу­ет­ся для рас­пре­де­ления за­даний ме­ж­ду ра­бо­чи­ми про­цес­са­ми (функ­ций send_worker_tasks/2 яв­ля­ет­ся ин­тер­фей­сом, а функ­ция send_worker_tasks/3 – реа­ли­за­ци­ей дан­ной функ­цио­наль­но­сти):

send_worker_tasks(PreparedData, WorkerList) ->

send_worker_tasks(PreparedData, WorkerList, 1).

send_worker_tasks([], _WorkerList, _WorkerIndex) -> complete;

send_worker_tasks(PreparedData, WorkerList, WorkerIndex)

when WorkerIndex > length(WorkerList) ->

send_worker_tasks(PreparedData, WorkerList, 1);

send_worker_tasks([{Index, Portion} | Rest], WorkerList, WorkerIndex) ->

Worker = lists:nth(WorkerIndex, WorkerList),

Worker ! {task_request, self(), Index, Portion},

send_worker_tasks(Rest, WorkerList, WorkerIndex + 1).

Эта па­ра функ­ций все­го лишь оп­ре­де­ле­на в мо­ду­ле parallel_limited_helper, но не экс­пор­ти­ру­ет­ся из него, т. к. ин­кап­су­ли­ру­ет один из внут­ренних ша­гов. И, на­конец, функ­ция parallel_limited_helper:limited_core/4 яв­ля­ет­ся серд­цем всех реа­ли­за­ций, осно­ван­ных на этой функ­ции:

limited_core(FinalAggrFun, SourceList, PortionSize, WorkerList) ->

process_flag(trap_exit, true),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

PreparedData = parallel_common:prepare_data(PortionSize, SourceList),

send_worker_tasks(PreparedData, WorkerList),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, PortionCount),

process_flag(trap_exit, false),

FinalAggrFun(array:to_list(FullStorage)).

В этой функ­ции мы раз­би­ва­ем ис­ход­ные дан­ные на пор­ции, рав­но­мер­но рас­пре­де­ля­ем за­дания по об­ра­бот­ке пор­ций дан­ных ме­ж­ду соз­дан­ны­ми ра­бо­чи­ми по­то­ка­ми, со­би­ра­ем ре­зуль­та­ты об­ра­бот­ки пор­ций дан­ных ра­бо­чи­ми по­то­ка­ми и объ­е­ди­ня­ем ре­зуль­та­ты их ра­бо­ты в ито­го­вый ре­зуль­тат.

Рассмот­рим со­гла­шения, при­ня­тые на­ми при реа­ли­за­ции функ­ций из мо­ду­ля parallel_limited_helper. Начнем с со­гла­шения о том, что от­вет­ст­вен­ность за соз­дание и унич­то­жение ра­бо­чих про­цес­сов ле­жит на вы­зы­ваю­щей сто­роне. С пер­во­го взгля­да мо­жет по­ка­зать­ся, что это ре­шение – не из оп­ти­маль­ных: по­че­му нель­зя про­сто пе­ре­дать число ра­бо­чих про­цес­сов в функ­цию parallel_limited_helper:limited_core/4 (вме­сто спи­ска ра­бо­чих про­цес­сов)? Для понимания при­чин да­вай­те по­ста­вим бо­лее об­щую за­да­чу: нам необ­хо­ди­мо соз­дать рас­пре­де­лен­ные вер­сии функ­ций map и reduce, т. е. вер­сии функ­ций, ра­бо­чие про­цес­сы ко­то­рых вы­пол­ня­лись бы на за­ранее за­дан­ных уз­лах. При этом мы ог­раничи­ва­ем мак­си­маль­ное число ра­бо­чих про­цес­сов на ка­ж­дом уз­ле, что да­ет в ре­зуль­та­те ог­раничение на об­щее число ра­бо­чих про­цес­сов. Рас­пре­де­ляя ра­бо­чие про­цес­сы по всем уз­лам рав­но­мер­но, мы все же мо­жем их соз­да­вать в на­шей обоб­щен­ной функ­ции (ко­то­рая ана­ло­гич­на функ­ции parallel_limited_helper:limited_core/4), пе­ре­да­вая как па­ра­мет­ры мак­си­маль­ное число ра­бо­чих про­цес­сов на ка­ж­дом уз­ле и спи­сок доступ­ных уз­лов. А ес­ли мы хо­тим соз­да­вать на раз­ных уз­лах раз­ное число ра­бо­чих про­цес­сов, мы уже долж­ны пе­ре­да­вать спи­сок пар (кор­те­жей из двух эле­мен­тов) «узел – мак­си­маль­ное число про­цес­сов на уз­ле».

В чем ми­ну­сы та­ко­го под­хо­да? Во-пер­вых, тогда умень­ша­ет­ся ко­ли­че­­ст­во сце­на­ри­ев ис­поль­зо­вания дан­ной функ­ции. Дей­ст­ви­тель­но, при пе­ре­да­че в функ­цию спи­ска ра­бо­чих про­цес­сов (ко­то­рые соз­да­ла нам вы­зы­ваю­щая сто­ро­на) нам без разницы, соз­да­ны ли эти про­цес­сы на од­ном уз­ле с глав­ным про­цес­сом или же нет. Нам так­же без разницы, ог­раниче­но ли вре­мя жизни ра­бо­чих про­цес­сов мно­го­за­дач­ной (или рас­пре­де­лен­ной) вер­си­ей функ­ции map и reduce или же они яв­ля­ют­ся дол­го­жи­ву­щи­ми (на­при­мер, из неко­то­ро­го пу­ла про­цес­сов). С дру­гой сто­ро­ны, ре­шив, что про­цес­сы долж­на соз­да­вать са­ма на­ша функ­ция, мы по­лу­чим си­туа­цию, когда, скажем, на­ша функ­ция мо­жет соз­дать ра­бо­чие про­цес­сы толь­ко на локаль­ном уз­ле (или с еще ка­ки­ми-ли­бо ог­ра­ничения­ми). Во-вто­рых, при та­ком под­хо­де на­ру­шается прин­ци­п SRP: функ­ция со­дер­жит и реа­ли­за­цию мно­го­за­дач­ной об­ра­бот­ки спи­ска, и функ­цио­наль­ность по соз­данию ра­бо­чих про­цес­сов.

Конеч­но, когда мы ис­поль­зо­ва­ли «од­но­ра­зо­вые» ра­бо­чие про­цес­сы, их соз­дание в на­шей обоб­щен­ной функ­ции бы­ло оп­рав­да­но, т. к. бы­ло неотъ­ем­ле­мой ча­стью ал­го­рит­ма, и толь­ко на­ша функ­ция зна­ла об этих про­цес­сах. В слу­чае же «мно­го­ра­зо­вых» ра­бо­чих про­цес­сов их соз­дание неотъ­ем­ле­мой ча­стью ал­го­рит­ма не яв­ля­ет­ся. К то­му же об этих про­цес­сах зна­ет внешний код (т. к. он за­да­ет ог­раничения на их ко­ли­че­­ст­во), и вполне ло­гич­но, что имен­но он бу­дет управ­лять вре­менем жизни этих про­цес­сов. Ес­ли же нам необ­хо­ди­мо, что­бы у нас на раз­ных уз­лах бы­ло раз­ное ко­ли­че­­ст­во ра­бо­чих про­цес­сов, то при об­су­ж­дае­мом под­хо­де мы по­лу­чим си­туа­цию, когда код для соз­дания этих ра­бо­чих про­цес­сов на­хо­дит­ся как на вы­зы­ваю­щей сто­роне, так и на вы­зы­вае­мой сто­роне. Дей­ст­ви­тель­но, на вы­зы­ваю­щей сто­роне мы бу­дем вы­чис­лять для ка­ж­до­го уз­ла мак­си­маль­ное число ра­бо­чих про­цес­сов, и фор­ми­ро­вать спи­сок пар «узел – мак­си­маль­ное число ра­бо­чих про­цес­сов на этом уз­ле». А на вы­зы­вае­мой сто­роне – соз­да­вать ра­бо­чие про­цес­сы в со­от­вет­ст­вии с пе­ре­дан­ным спи­ском. Оче­вид­но, что под­дер­жи­вать и рас­ши­рять по­доб­ную реа­ли­за­цию бу­дет тя­же­ло.

А те­перь да­вай­те ре­шим по­став­лен­ную вы­ше за­да­чу: соз­да­дим рас­пре­де­лен­ные вер­сии функ­ций map и reduce. Как уже го­во­ри­лось, функ­ция parallel_limited_helper:limited_core/4 пе­ре­да­ет от­вет­ст­вен­ность за управ­ление жиз­нью ра­бо­чих про­цес­сов вы­зы­ваю­щей сто­роне. От вы­зы­ваю­щей сто­ро­ны функ­ция parallel_limited_helper:limited_core/4 ожи­да­ет спи­сок ра­бо­чих про­цес­сов, ко­то­рые вы­пол­ня­ют функ­цию parallel_limited_helper:limited_worker/1 или ей по­доб­ную, т. е. с та­ким же про­то­ко­лом взаи­мо­дей­ст­вия. Это оз­на­ча­ет, что для соз­дания рас­пре­де­лен­ных ва­ри­ан­тов функ­ций map и reduce мы мо­жем ис­поль­зо­вать функ­цию parallel_limited_helper:limited_core/4. Функ­ции parallel_map:distributed_pmap/5 и parallel_reduce:distributed_reduce/6 бу­дут реа­ли­за­ция­ми рас­пре­де­лен­ных вер­сий функ­ций map и reduce. В этих функ­ци­ях мы соз­да­ем ра­бо­чие про­цес­сы на за­дан­ных уз­лах, ис­поль­зу­ем соз­дан­ные про­цес­сы для рас­пре­де­лен­ной об­ра­бот­ки ис­ход­но­го спи­ска (при по­мо­щи вы­зо­ва функ­ции parallel_limited_helper:limited_core/4) и за­вер­ша­ем ра­бо­ту соз­дан­ных ра­бо­чих про­цес­сов. Функ­ция parallel_map:distributed_pmap/5 име­ет сле­дую­щий вид:

distributed_pmap(_Fun, [], _PortionSize, _NodeList, _WorkerCount) -> [];

distributed_pmap(Fun, SourceList, PortionSize, _NodeList, _WorkerCount)

when length(SourceList) =< PortionSize ->

lists:map(Fun, SourceList);

distributed_pmap(Fun, SourceList, PortionSize, NodeList, WorkerCount) ->

WorkerFun = fun(SourcePortion) -> lists:map(Fun, SourcePortion) end,

WorkerList = [spawn_link(Node, fun() -> parallel_limited_helper:limited_worker(WorkerFun) end) || Node <- NodeList, _WorkerIndex <- lists:seq(1, WorkerCount)],

Result = parallel_limited_helper:limited_core(fun lists:append/1, SourceList, PortionSize, WorkerList),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, WorkerList),

Result.

Как и в пре­ды­ду­щих слу­ча­ях, функ­ция parallel_map:distributed_pmap/5 со­дер­жит три ва­ри­ан­та. Пер­вый ва­ри­ант пред­на­зна­чен для об­ра­бот­ки си­туа­ции, когда ис­ход­ный спи­сок пуст, вто­рой ва­ри­ант – для об­ра­бот­ки си­туа­ции, когда раз­мер ис­ход­ных дан­ных не пре­вы­ша­ет раз­ме­ра пор­ции, а тре­тий – для об­ра­бот­ки всех осталь­ных слу­ча­ев. Функ­ция parallel_reduce:distributed_reduce/6 вы­гля­дит сле­дую­щим об­ра­зом:

distributed_reduce(_Fun, [], {Init, _PortionInit}, _PortionSize, _NodeList, _WorkerCount) -> InitValue;

distributed_reduce(Fun, SourceList, {Init, _PortionInit}, PortionSize, _NodeList, _WorkerCount) when length(SourceList) =< PortionSize ->

lists:foldl(Fun, Init, SourceList);

distributed_reduce(Fun, SourceList, {Init, PortionInit}, PortionSize, NodeList, WorkerCount) ->

ReduceFun = fun(List) -> lists:foldl(Fun, Init, List) end,

PortionReduceFun = fun(List) -> lists:foldl(Fun, PortionInit, List) end,

WorkerList = [spawn_link(Node, fun() -> parallel_limited_helper:limited_worker(PortionReduceFun) end) || Node <- NodeList, _WorkerIndex <- lists:seq(1, WorkerCount)],

Result = parallel_limited_helper:limited_core(ReduceFun, SourceList, PortionSize, WorkerList),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, WorkerList),

Result.

Функ­ция parallel_reduce:distributed_reduce/6, как и функ­ция parallel_map:distributed_pmap/5, со­дер­жит три ва­ри­ан­та для об­ра­бот­ки точ­но та­ких же си­туа­ций: пусто­го ис­ход­но­го спи­ска, ис­ход­но­го спи­ска ма­ло­го раз­ме­ра (не боль­ше раз­ме­ра пор­ции) и для всех осталь­ных слу­ча­ев. В этих функ­ци­ях мы за­да­ем спи­сок уз­лов NodeList, на ко­то­рых мо­гут соз­да­вать­ся ра­бо­чие про­цес­сы, и мак­си­маль­ное ко­ли­че­­ст­во ра­бо­чих про­цес­сов на ка­ж­дом уз­ле WorkerCount; тем са­мым мы ог­раничи­ва­ем об­щее ко­ли­че­­ст­во ра­бо­чих про­цес­сов.

Внима­тель­ный чи­та­тель лег­ко за­ме­тит, что в ка­че­­ст­ве па­ра­мет­ров мы пе­ре­да­ем в функ­ции parallel_map:distributed_pmap/5 и parallel_reduce:distributed_reduce/6 спи­сок уз­лов NodeList и мак­си­маль­ное ко­ли­че­­ст­во про­цес­сов на ка­ж­дом уз­ле WorkerCount. Мо­жет возник­нуть во­прос, не про­ти­во­ре­чит ли та­кое ре­шение все­му вы­ше­ска­зан­но­му. Ес­ли мы ис­поль­зу­ем дол­го­жи­ву­щие ра­бо­чие про­цес­сы (на­при­мер, из неко­то­ро­го пу­ла про­цес­сов), та­кое ре­шение бу­дет про­сто непра­виль­ным. Во всех осталь­ных слу­ча­ях мы хо­тим про­сто вы­полнить рас­пре­де­лен­ную опе­ра­цию map (или reduce) на оп­ре­де­лен­ном на­бо­ре уз­лов, ог­раничив мак­си­маль­ное ко­ли­че­­ст­во ра­бо­чих про­цес­сов на этих уз­лах. Ес­ли и тогда мы бу­дет воз­ла­гать от­вет­ст­вен­ность по соз­данию ра­бо­чих про­цес­сов на внеш­нюю сто­ро­ну, то это не со­всем то, что ожи­да­ет от нас вы­зы­ваю­щая сто­ро­на (да и неудоб­но для вы­зы­ваю­щей сто­ро­ны). Конеч­но, мож­но бы­ло бы вы­де­лить опе­ра­ции по соз­данию ра­бо­чих про­цес­сов и за­вер­шению их ра­бо­ты в от­дель­ные ме­то­ды, что слег­ка по­вы­си­ло бы чи­тае­мость. Но это дей­ст­вие мы оста­вим для тех чи­та­те­лей, ко­му оно ин­те­рес­но.

По­ра про­ве­рить, что на­ши рас­пре­де­лен­ные реа­ли­за­ции функ­ций map и reduce (функ­ции parallel_map:distributed_pmap/5 и parallel_reduce:distributed_reduce/6) ра­бо­та­ют пра­виль­но. Для на­ча­ла да­вай­те соз­да­дим три уз­ла: два уз­ла с име­на­ми slave1 и slave2 для соз­дания ра­бо­чих про­цес­сов и узел с именем master для глав­но­го про­цес­са (и за­пуска функ­ций parallel_map:distributed_pmap/5 и parallel_reduce:distributed_reduce/6 на вы­полнение). На ком­пь­ю­те­ре ав­то­ра (при соз­дании уз­лов с клю­чом -sname) пол­ные име­на уз­лов бу­дут сле­дую­щи­ми: slave1@stdstring, slave2@stdstring и master@stdstring (имен­но эти име­на уз­лов мы бу­дем ис­поль­зо­вать в на­шем при­ме­ре). Все дей­ст­вия в при­ме­ре мы бу­дем про­из­во­дить на уз­ле master@stdstring.

Начнем с функ­ции parallel_map:distributed_pmap/5: мы помним, что эта функ­ция име­ет три ва­ри­ан­та. Вы­зов parallel_map:distributed_pmap(fun(Item) -> 3*Item end, [], 2, [‘slave1@stdstring’, ‘slave2@stdstring’], 2) про­ве­ря­ет пер­вый ва­ри­ант (когда ис­ход­ный спи­сок дан­ных пуст) и воз­вра­ща­ет пустой спи­сок, как и ожи­да­ет­ся. Вы­зов parallel_map:distributed_pmap(fun(Item) -> 3*Item end, [2, 3], 4, [‘slave1@stdstring’, ‘slave2@stdstring’], 2) воз­вра­ща­ет сле­дую­щий спи­сок [6, 9]. Так как раз­мер ис­ход­но­го спи­ска мень­ше раз­ме­ра пор­ции, то мы про­ве­ря­ем вто­рой слу­чай. И, на­конец, вы­зов parallel_map:distributed_pmap(fun(Item) -> 3*Item end, [2, 3, 5, 6, 8, 1, 7, 2], 2, [‘slave1@stdstring’, ‘slave2@stdstring’], 2) воз­вра­ща­ет сле­дую­щий спи­сок: [6, 9, 15, 18, 24, 3, 21, 6]. Оче­вид­но, что этот вы­зов про­ве­ря­ет тре­тий ва­ри­ант функ­ции parallel_map:distributed_pmap/5, т. к. раз­мер ис­ход­но­го спи­ска боль­ше раз­ме­ра пор­ции. У нас вы­де­ле­но два уз­ла под ра­бо­чие про­цес­сы, на ка­ж­дом уз­ле мы соз­да­ем по два про­цес­са; в ито­ге у нас че­ты­ре ра­бо­чих про­цес­са. Раз­мер ис­ход­но­го спи­ска – 8, раз­мер пор­ции дан­ных для об­ра­бот­ки – 2; лег­ко вид­еть, что спи­сок бу­дет раз­бит на че­ты­ре пор­ции, и в ре­зуль­та­те все че­ты­ре ра­бо­чих про­цес­са бу­дут за­гру­же­ны.

А те­перь про­ве­рим ра­бо­ту функ­ции parallel_reduce:distributed_reduce/6 (эта функ­ция так­же име­ет три ва­ри­ан­та). Вы­зов parallel_reduce:distributed_reduce(fun(Item, Agg) -> Item + Agg end, [], {1, 0}, 2, ['slave1@stdstring’, ‘slave2@stdstring’], 2) про­ве­ря­ет пер­вый ва­ри­ант (когда ис­ход­ный спи­сок дан­ных пуст) и вез­вра­ща­ет 1, т. е. на­чаль­ное зна­чение опе­ра­ци­ии сверт­ки. Вы­зов parallel_reduce:distributed_reduce(fun(Item, Agg) -> Item + Agg end, [1, 2], {1, 0}, 4, [‘slave1@stdstring’, ‘slave2@stdstring’], 2) воз­вра­ща­ет зна­чение 4. Раз­мер ис­ход­но­го спи­ска – 2, раз­мер пор­ции – 4; это оз­на­ча­ет, что дан­ный вы­зов про­ве­ря­ет вто­рой ва­ри­ант функ­ции parallel_map:distributed_pmap/5. И, на­конец, вы­зов parallel_reduce:distributed_reduce(fun(Item, Agg) -> Item + Agg end, [1, 2, 3, 4, 5, 6, 7, 8], {1, 0}, 2, [‘slave1@stdstring’, ‘slave2@stdstring’], 2) воз­вра­ща­ет зна­чение 37. Этот вы­зов про­ве­ря­ет тре­тий ва­ри­ант функ­ции parallel_map:distributed_pmap/5, т. к. раз­мер ис­ход­но­го спи­ска боль­ше раз­ме­ра пор­ции. Как и в пре­ды­ду­щем слу­чае, у нас че­ты­ре ра­бо­чих про­цес­са на двух уз­лах (по два ра­бо­чих про­цес­са на узел). Раз­мер ис­ход­но­го спи­ска – 8, раз­мер пор­ции – 2, так что спи­сок бу­дет раз­бит на че­ты­ре пор­ции и все че­ты­ре ра­бо­чих про­цес­са бу­дут за­гру­же­ны. Про­вер­ку дру­гих сце­на­ри­ев ра­бо­ты этих функ­ций (на­при­мер, когда не для всех ра­бо­чих про­цес­сов бу­дут соз­да­ны за­дания) мы остав­ля­ем чи­та­те­лям.

Зай­мем­ся со­гла­шением раз­да­вать все за­да­чи ра­бо­чим про­цес­сам сра­зу, при вы­зо­ве функ­ции parallel_limited_helper:limited_core/4. При вы­зо­ве этой функ­ции, мы раз­би­ва­ем ис­ход­ный спи­сок с дан­ны­ми на пор­ции; пор­ции яв­ля­ют­ся спи­ском пар (кор­те­жей из двух эле­мен­тов) «ин­декс – часть ис­ход­но­го спи­ска с дан­ны­ми». Оче­вид­но, что раз­мер всех пор­ций дан­ных несколь­ко боль­ше раз­ме­ра ис­ход­ных дан­ных; дей­ст­ви­тель­но, раз­мер всех пор­ций ра­вен раз­ме­ру ис­ход­но­го спи­ска, плюс раз­мер на ин­дек­сы всех пор­ций, плюс на­клад­ные рас­хо­ды на соз­дание па­ры (кор­те­жа из двух эле­мен­тов) для ка­ж­дой пор­ции. И ес­ли ис­ход­ный спи­сок очень ве­лик, нам про­сто мо­жет не хва­тить па­мя­ти для соз­дания всех пор­ций. На­при­мер, на 32-раз­ряд­ной сис­те­ме раз­мер ад­рес­но­го про­стран­ст­ва про­цес­са (в опе­ра­ци­он­ной сис­те­ме Linux с обыч­ным ядром) – 3 ГБ; ес­ли раз­мер ис­ход­но­го спи­ска боль­ше 1,5 ГБ, то соз­дать все пор­ции дан­ных для это­го спи­ска, оче­вид­но, не по­лу­чит­ся. Да­же ес­ли мес­та для соз­дания всех пор­ций хва­тит, все рав­но их соз­дание при­во­дит к од­но­мо­мент­ным на­клад­ным рас­хо­дам на вы­де­ление па­мя­ти, соз­дания объ­ек­тов, ко­пи­ро­вания дан­ных и к по­сле­дую­щим на­клад­ным рас­хо­дам на сбор­ку му­со­ра, когда все эти объ­ек­ты ста­нут не нуж­ны. Соз­дав все пор­ции, мы сра­зу же рас­пре­де­ля­ем их ме­ж­ду ра­бо­чи­ми про­цес­са­ми, т. е. от­прав­ля­ем ра­бо­чим про­цес­сам со­об­щения, со­дер­жа­щие дан­ные для об­ра­бот­ки пор­ции для всех пор­ций дан­ных. При этом все пор­ции дан­ных в один мо­мент вре­мени ока­зы­ва­ют­ся в се­ти, вне за­ви­си­мо­сти от их объ­е­ма. По­нят­но, что при боль­шом объ­е­ме по­лу­чен­ных пор­ций (то есть боль­шом объ­е­ме ис­ход­ных дан­ных) взаи­мо­дей­ст­вие уз­лов по се­ти мо­жет стать уз­ким ме­стом, тор­мо­зя­щим всю сис­те­му (всю на­шу мно­го­за­дач­ную об­ра­бот­ку).

На эту про­бле­му сто­ит взгля­нуть несколь­ко с дру­гой сто­ро­ны: так ли нам на­до сра­зу раз­би­вать все ис­ход­ные дан­ные на пор­ции и от­прав­лять их по се­ти ра­бо­чим про­цес­сам? Оче­вид­но, что та­кой под­ход уп­ро­ща­ет ис­ход­ный код на­ших реа­ли­за­ций; бо­лее то­го, та­кой под­ход по­зво­ля­ет во всех на­ших реа­ли­за­ци­ях (соз­дан­ных до это­го мо­мен­та) ис­поль­зо­вать од­ну и ту же функ­цию для сбор­ки ре­зуль­та­тов об­ра­бот­ки: parallel_common:collect_result/2. Мы ре­ши­ли, что от­ка­зы­ва­ем­ся от под­хо­да, когда все де­ла­ет­ся сра­зу, и пред­по­чли под­ход, когда пор­ции фор­ми­ру­ют­ся и от­сы­ла­ют­ся ра­бо­че­му про­цес­су по ме­ре необ­хо­ди­мо­сти. Это ре­шение под­во­дит нас к то­му, что мы не мо­жем от­де­лить ста­дию фор­ми­ро­вания и от­сыл­ки за­даний ра­бо­чим про­цес­сам от ста­дии сбо­ра ре­зуль­та­та их ра­бо­ты (как это бы­ло у нас рань­ше). По­это­му на­ше взаи­мо­дей­ст­вие с ра­бо­чи­ми про­цес­са­ми бу­дет вы­гля­деть сле­дую­щим об­ра­зом.

Мы по­лу­ча­ем ре­зуль­тат об­ра­бот­ки ка­кой-ли­бо пор­ции дан­ных от ка­ко­го-то ра­бо­че­го про­цес­са, со­хра­ня­ем этот ре­зуль­тат (в мас­си­ве, как мы это де­ла­ли и рань­ше), по­сле че­го «от­щи­пы­ва­ем» от остав­ших­ся необ­ра­бо­тан­ных дан­ных пор­цию, фор­ми­ру­ем но­вое за­дание на об­ра­бот­ку и от­сы­ла­ем это за­дание ра­бо­чему про­цес­су, с ко­то­рым на­ча­ли взаи­мо­дей­ст­вие. И так до тех пор, по­ка мы не об­ра­бо­та­ем весь спи­сок ис­ход­ных дан­ных (по­ка спи­сок остав­ших­ся необ­ра­бо­тан­ных дан­ных не опусте­ет) и не по­лу­чим все ре­зуль­та­ты об­ра­бот­ки (по­нят­но, что спи­сок остав­ших­ся необ­ра­бо­тан­ных дан­ных опусте­ет рань­ше, чем мы дей­ст­ви­тель­но об­ра­бо­та­ем все дан­ные). Ну и, ес­те­ст­вен­но, ра­бо­ту мы долж­ны на­чать с то­го, что­бы ка­ж­до­му ра­бо­че­му про­цес­су раз­дать по за­данию. По­нят­но, что та­кой под­ход несколь­ко услож­ня­ет реа­ли­за­цию по сравнению с реа­ли­за­ци­ей, в ко­то­рой мы сра­зу раз­би­ва­ем все дан­ные на пор­ции и от­сы­ла­ем эти дан­ные ра­бо­чим про­цес­сам.

А те­перь да­вай­те реа­ли­зу­ем но­вый ва­ри­ант мно­го­за­дач­ных функ­ций map и reduce на осно­ве все­го вы­ше­ска­зан­но­го. Как и рань­ше, мы вы­де­ля­ем об­щую функ­цио­наль­ность (на осно­ве ко­то­рой мы смо­жем реа­ли­зо­вать мно­го­за­дач­ные вер­сии функ­ций map и reduce) и по­ме­ща­ем ее в от­дель­ный мо­дуль; в на­шем слу­чае это бу­дет мо­дуль parallel_smartmsg_helper.

Что­бы по­лу­чить бо­лее по­нят­ную и гиб­кую реа­ли­за­цию, вве­дем несколь­ко оп­ре­де­лений за­пи­сей:

-record(tasks_descr, {created = 0, processed = 0, rest = []}).

-record(task_request, {master, index, portion}).

-record(task_result, {worker, index, result}).

Эк­зем­п­ляр за­пи­си task_descr хранит дан­ные о про­цес­се об­ра­бот­ки ис­ход­но­го спи­ска; по­ле created со­дер­жит ко­ли­че­­ст­во соз­дан­ных за­даний на об­ра­бот­ку; по­ле processed со­дер­жит ко­ли­че­­ст­во за­даний на об­ра­бот­ку, вы­полнение ко­то­рых за­кон­чи­лось; по­ле rest со­дер­жит необ­ра­бо­тан­ный оста­ток ис­ход­но­го спи­ска. Эк­зем­п­ляр за­пи­си task_request со­дер­жит дан­ные за­про­са на об­ра­бот­ку (по­сы­лае­мый глав­ным про­цес­сом од­но­му из ра­бо­чих про­цес­сов); по­ле master со­дер­жит иден­ти­фи­ка­тор глав­но­го про­цес­са; по­ле index со­дер­жит ин­декс пор­ции ис­ход­ных дан­ных; по­ле portion со­дер­жит са­му пор­цию ис­ход­ных дан­ных. Сле­ду­ет за­ме­тить, что вме­сто пе­ре­да­чи иден­ти­фи­ка­то­ра глав­но­го про­цес­са в со­об­щении его мож­но бы­ло бы пе­ре­дать ра­бо­че­му про­цес­су одним из па­ра­мет­ров функ­ции, ко­то­рую этот ра­бо­чий про­цесс вы­пол­ня­ет. И, на­конец, эк­зем­п­ляр за­пи­си task_result со­дер­жит дан­ные с ре­зуль­та­та­ми об­ра­бот­ки пор­ции (по­сы­лае­мые одним из ра­бо­чих про­цес­сов глав­но­му); по­ле worker со­дер­жит иден­ти­фи­ка­тор ра­бо­че­го про­цес­са; по­ле index со­дер­жит ин­декс ис­ход­ной пор­ции дан­ных; по­ле result со­дер­жит ре­зуль­тат об­ра­бот­ки этой ис­ход­ной пор­ции дан­ных.

Сле­дую­щий шаг, ко­то­рый мы сде­ла­ем в рам­ках на­шей реа­ли­за­ции – соз­да­дим функ­цию, ко­то­рую долж­ны вы­пол­нять ра­бо­чие про­цес­сы. Так как мы соз­да­ем ра­бо­чие про­цес­сы сна­ру­жи на­шей обоб­щен­ной функ­ции об­ра­бот­ки спи­ска дан­ных (ана­ло­ге функ­ции parallel_limited_helper:limited_core/4), на осно­ве ко­то­рой мы по­том соз­да­дим оче­ред­ные вер­сии функ­ций map и reduce, то оче­вид­но, что эта функ­ция долж­на быть экс­пор­ти­руе­мой. В на­шей реа­ли­за­ции это бу­дет функ­ция parallel_smartmsg_helper: smartmsg_worker/1:

smartmsg_worker(Fun) ->

receive

#task_request{master=MasterPid, index=Index, portion=SourcePortion} ->

Dest = Fun(SourcePortion),

MasterPid ! #task_result{worker=self(), index=Index, result=Dest},

smartmsg_worker(Fun);

_Other -> smartmsg_worker(Fun)

end.

В функ­ции, ко­то­рую вы­пол­ня­ют ра­бо­чие про­цес­сы, мы об­ра­ба­ты­ва­ем два ти­па со­об­щений: во-пер­вых, со­об­щения, яв­ляю­щие­ся эк­зем­п­ля­ром за­пи­си task_request – это за­дания на об­ра­бот­ку пор­ции дан­ных; во-вто­рых, все осталь­ные со­об­щения, что­бы в оче­ре­ди со­об­щений ра­бо­че­го про­цес­са не на­ка­п­ли­ва­лись необ­ра­бо­тан­ные неиз­вест­ные со­об­щения. Сле­ду­ет ска­зать, что функ­ция parallel_smartmsg_helper:smartmsg_worker/1 прак­ти­че­­ски иден­тич­на функ­ции parallel_limited_helper:limited_worker/1, ко­то­рую соз­да­ли в пре­ды­ду­щей реа­ли­за­ции. Мы не ста­ли ис­поль­зо­вать в на­шей но­вой реа­ли­за­ции функ­цию parallel_limited_helper:limited_worker/1, что­бы не сме­ши­вать ис­поль­зо­вание несколь­ких мо­ду­лей и не сму­щать чи­та­те­ля этим.

На этом прак­ти­куме мы соз­да­ли рас­пре­де­лен­ные вер­сии функ­ций map и reduce и про­тес­ти­ро­ва­ли их. Мы также ра­зо­бра­лись, что соз­дание всех пор­ций ис­ход­ных дан­ных и рас­пре­де­ление их сре­ди ра­бо­чих по­то­ков в один мо­мент вре­мени – не луч­шая идея, и на­ча­ли реа­ли­зо­вы­вать бо­лее слож­ный под­ход, в ко­то­ром мы фор­ми­ру­ем за­да­чи и от­сы­ла­ем их ра­бо­чим про­цес­сам по ме­ре необ­хо­ди­мо­сти. В сле­дую­щий раз мы про­дол­жим эту ра­бо­ту. |

Персональные инструменты
купить
подписаться
Яндекс.Метрика