Журнал LinuxFormat - перейти на главную

LXF164: Вникать в Erlang

Материал из Linuxformat
Перейти к: навигация, поиск


Erlang Опи­сы­ва­ет­ся сле­дую­щей фор­му­лой: функ­цио­наль­ный язык + про­цес­сы

Erlang: Прак­ти­ка мно­го­за­дач­но­сти

Cуха тео­рия, мой друг... Ан­д­рей Уша­ков пред­ла­га­ет при­ло­жить ру­ки к прак­ти­че­ским ре­ше­ни­ям.

(thumbnail)
Наш эксперт Ан­д­рей Уша­ков ак­тив­но при­бли­жа­ет тот день, когда функ­цио­наль­ные язы­ки ста­нут мейн­ст­ри­мом.

По­следние несколь­ко ста­тей мы занима­лись тем, что изу­ча­ли, что та­кое мно­го­за­дач­ные и рас­пре­де­лен­ные сис­те­мы и ка­кие слож­но­сти нас под­сте­ре­га­ют на пу­ти соз­дания та­ких сис­тем. По­ми­мо это­го, мы изу­ча­ли, ка­кие в язы­ке Erlang есть сред­ст­ва для ре­шения подобных за­дач. Но на од­ной тео­рии да­ле­ко не уе­хать: без прак­ти­че­­ско­­го опы­та не соз­дать бо­лее-менее слож­ную, мно­го­за­дач­ную, и, тем бо­лее, рас­пре­де­лен­ную сис­те­му. Имен­но по­это­му мы от­кры­ва­ем цикл ста­тей, по­свя­щен­ных прак­ти­ку­му по соз­данию мно­го­за­дач­ных и рас­пре­де­лен­ных сис­тем средствами язы­ка Erlang.

На­шей за­да­чей в дан­ном прак­ти­ку­ме бу­дет на­пи­сание мно­го­за­дач­ных и, с ка­ко­го-то мо­мен­та, рас­пре­де­лен­ных вер­сий та­ких ши­ро­ко рас­про­странен­ных функ­ций, как map и reduce. Ре­шать дан­ную за­да­чу мы бу­дем по­сте­пен­но: начнем с обыч­ных, не много­за­дач­ных реа­ли­за­ций дан­ных функ­ций и за­кон­чим реа­ли­за­ци­ей, ра­бо­таю­щей в рас­пре­де­лен­ной сре­де, т. е. на несколь­ких уз­лах. Возника­ет за­коно­мер­ный во­прос: а по­че­му имен­но эти функ­ции вы­бра­ны для на­ше­го прак­ти­ку­ма? Как мы уви­дим ниже, реа­ли­за­ция этих функ­ций в про­стей­шем, не мно­го­за­дач­ном слу­чае очень я­сна. По­это­му при реа­ли­за­ции раз­но­об­раз­ных мно­го­за­дач­ных ва­ри­ан­тов этих функ­ций мы прак­ти­че­­ски не бу­дем от­вле­каться на де­та­ли, не от­но­ся­щие­ся к мно­го­за­дач­но­сти.

Пре­ж­де чем на­чать ра­бо­ту с при­ме­ра­ми, немно­го по­го­во­рим о том, что пред­став­ля­ют со­бой функ­ции map и reduce и ка­кие ог­раничения мы на­кла­ды­ва­ем на на­ши реа­ли­за­ции этих функ­ций.

Функ­ция map вы­чис­ля­ет ре­зуль­тат опе­ра­ции ото­бра­жения, ко­то­рая ка­ж­до­му эле­мен­ту a из ис­ход­но­го мно­же­ст­ва A ста­вит в со­от­вет­ст­вие эле­мент fun(a) из ре­зуль­ти­рую­ще­го мно­же­ст­ва для за­дан­ной функ­ции ото­бра­жения fun. По­нят­но, что в ка­че­­ст­ве ис­ход­но­го мно­же­ст­ва эле­мен­тов мож­но взять лю­бой на­бор эле­мен­тов, как упо­ря­до­чен­ный, так и неупо­ря­до­чен­ный, но мы в на­шем при­ме­ре в ка­че­­ст­ве та­ко­го на­бо­ра всегда бу­дем применять толь­ко спи­сок.

Функ­ция reduce вы­чис­ля­ет ре­зуль­тат (неко­то­рое зна­чение) опе­ра­ции сверт­ки (или аг­ре­ги­ро­вания) для за­дан­но­го мно­же­ст­ва, функ­ции сверт­ки и на­чаль­но­го зна­чения. При­ме­ром та­кой опе­ра­ции яв­ля­ет­ся опе­ра­ция на­хо­ж­дения сум­мы мно­же­ст­ва чи­сел. Как и в слу­чае функ­ции map, в ка­че­­ст­ве ис­ход­но­го на­бо­ра эле­мен­тов мы бу­дем ис­поль­зо­вать спи­ски. Ре­зуль­тат вы­полнения опе­ра­ции сверт­ки мо­жет быть раз­ным и за­ви­сит от то­го, в ка­ком по­ряд­ке мы бе­рем эле­мен­ты из мно­же­ст­ва, поскольку опе­ра­ция сверт­ки бывает неком­му­та­тив­ной. Дей­ст­ви­тель­но, ес­ли в ка­че­­ст­ве ис­ход­но­го мно­же­ст­ва мы возь­мем спи­сок мат­риц, а в ка­че­­ст­ве функ­ции сверт­ки – опе­ра­цию ум­но­жения этих мат­риц, ре­зуль­тат бу­дет за­ви­сеть от по­ряд­ка об­хо­да спи­ска: сле­ва на­пра­во или спра­ва на­ле­во. Имен­но по этой при­чине мо­дуль lists со­дер­жит две функ­ции для опе­ра­ции сверт­ки: lists:foldl/3 и lists:foldr/3. Мы во всех на­ших при­ме­рах при реа­ли­за­ции опе­ра­ции сверт­ки бу­дем об­хо­дить спи­сок сле­ва на­пра­во (по­ря­док об­хо­да спи­ска спра­ва на­ле­во три­ви­аль­но реа­ли­зу­ет­ся по ана­ло­гии).

Есть и еще один мо­мент, ко­то­рый свя­зан с опе­ра­ци­ей сверт­ки: это ас­со­циа­тив­ность дан­ной опе­ра­ции. Ас­со­циа­тив­ность опе­ра­ции оп­ре­де­ля­ет, за­ви­сит ли ре­зуль­тат опе­ра­ции от то­го, в ка­ком по­ряд­ке мы вы­чис­ля­ем ре­зуль­тат этой опе­ра­ции, т. е. рас­став­ля­ем скоб­ки. Так, на­при­мер, опе­ра­ция сло­жения чи­сел яв­ля­ет­ся ас­со­циа­тив­ной: это оз­на­ча­ет, что зна­чение вы­ра­жения (1+2)+3 рав­но зна­чению вы­ра­жения 1+(2+3). А опе­ра­ция вы­чи­тания чи­сел ас­со­циа­тив­ной не яв­ля­ет­ся: зна­чение вы­ра­жения (1-2)-3 рав­но -4, а зна­чение вы­ра­жения 1-(2-3) рав­но 2. И опять же для про­сто­ты мы по­ла­га­ем, что име­ем де­ло с ас­со­циа­тив­ной опе­ра­ци­ей сверт­ки (по­че­му нам важ­на ас­со­циа­тив­ность, мы уви­дим ниже).

Ска­жем па­ру слов и об ор­ганиза­ции на­ших при­ме­ров. Все экс­пор­ти­руе­мые функ­ции, ко­то­рые от­но­сят­ся к опе­ра­ции ото­бра­жения, рас­по­ла­га­ют­ся в мо­ду­ле parallel_map (и, со­от­вет­ст­вен­но, в фай­ле parallel_map.erl). Все экс­пор­ти­руе­мые функ­ции, ко­то­рые от­но­сят­ся к опе­ра­ции сверт­ки, рас­по­ла­га­ют­ся в мо­ду­ле parallel_reduce (и, со­от­вет­ст­вен­но, в фай­ле parallel_reduce.erl). По­ми­мо этих двух мо­ду­лей, мы бу­дем оп­ре­де­лять и ис­поль­зо­вать до­полнитель­ные мо­ду­ли по ме­ре необ­хо­ди­мо­сти. Одним из та­ких мо­ду­лей, ко­то­рый бу­дет ис­поль­зо­вать­ся прак­ти­че­­ски вез­де, яв­ля­ет­ся мо­дуль parallel_common (рас­по­ла­гаю­щий­ся в фай­ле parallel_common.erl). Как мы уви­дим, этот мо­дуль со­дер­жит об­щие для на­ших при­ме­ров функ­ции.

А начнем мы на­ши при­ме­ры с обыч­ных, не мно­го­за­дач­ных вер­сий функ­ций map и reduce. Для реа­ли­за­ции обыч­ной вер­сии функ­ции map мы восполь­зу­ем­ся техникой кон­ст­руи­ро­вания спи­сков [List comprehension]:

usual_map(_Fun, []) -> [];

usual_map(Fun, SourceList) -> [Fun(Element) || Element <- SourceList].

Как вид­но, реа­ли­за­ция этой функ­ции доста­точ­но три­ви­аль­на (эту функ­цию мож­но бы­ло бы реа­ли­зо­вать и ре­кур­сив­ным об­ра­зом, но реа­ли­за­ция при этом ста­ла бы несколь­ко боль­ше). Обыч­ную вер­сию функ­ции reduce та­ким об­ра­зом реа­ли­зо­вать не по­лу­чит­ся – для этого нам по­тре­бу­ет­ся ре­кур­сив­но ра­бо­тать с ис­ход­ным спи­ском:

usual_reduce(_Fun, [], InitValue) -> InitValue;

usual_reduce(Fun, [H | Rest], InitValue) ->

NewAgg = Fun(H, InitValue),

usual_reduce(Fun, Rest, NewAgg).

И опять же, реа­ли­за­ция этой функ­ции дос­та­точ­но три­ви­аль­на.

Да­вай­те про­ве­рим, что на­ши функ­ции ра­бо­та­ют пра­виль­но. Для это­го от­ком­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и для вы­зо­вов функ­ций, при­ве­ден­ных ниже, про­ве­рим, что ре­зуль­тат их вы­зо­вов со­от­вет­ст­ву­ет при­ве­ден­ным ре­зуль­та­там. Вы­зов parallel_map:usual_map(fun(X) -> X*2 end, [1, 2, 3, 4]) вернет спи­сок [2, 4, 6, 8]. Вы­зов parallel_map:usual_map(fun(Str) -> “-” ++ Str ++ “-” end, [“aa”, “bb”, “cc”]) вернет спи­сок [“-aa-”, “-bb-”, “-cc-”]. Те же са­мые ре­зуль­та­ты мы по­лу­чим и при ис­поль­зо­вании функ­ции lists:map/2 из мо­ду­ля lists. Ре­зуль­та­том вы­зо­ва parallel_reduce:usual_reduce(fun(Item, Agg) -> Item + Agg end, [1, 2, 3, 4], 1) бу­дет чис­ло 11. Ре­зуль­та­том вы­зо­ва parallel_reduce:usual_reduce(fun(Item, Agg) -> Agg ++ Item end, [“aa”, “bb”], “++”) бу­дет стро­ка “++aabb”. Та­кие же ре­зуль­та­ты мы по­лу­чим и при ис­поль­зо­вании функ­ции lists:foldl/3 из мо­ду­ля lists.

Реа­ли­зо­вав обыч­ные вер­сии функ­ций map и reduce, соз­да­дим их мно­го­за­дач­ные вер­сии. Мы начнем с про­стей­ше­го слу­чая для функ­ции map: когда для ото­бра­жения ка­ж­до­го эле­мен­та (т. е. для вы­чис­ления ре­зуль­ти­рую­ще­го эле­мен­та fun(a) для ка­ж­до­го эле­мен­та a из ис­ход­но­го спи­ска) из ис­ход­но­го мно­же­ст­ва мы ис­поль­зу­ем от­дель­ную за­да­чу. В мно­го­за­дач­ной вер­сии функ­ции map у нас бу­дет один глав­ный про­цесс, ко­то­рый соз­да­ет до­черние ра­бо­чие про­цес­сы, раз­да­ет им за­дания и со­би­ра­ет ре­зуль­та­ты их ра­бо­ты, и несколь­ко ра­бо­чих про­цес­сов (в на­шем слу­чае ко­ли­че­­ст­во ра­бо­чих про­цес­сов рав­но ко­ли­че­­ст­ву эле­мен­тов в спи­ске). Глав­ный про­цесс (в ко­то­ром мы иниции­ру­ем вы­полнение на­шей функ­ции map) дол­жен сде­лать сле­дую­щее: соз­дать за­дания для ра­бо­чих про­цес­сов, соз­дать необ­хо­ди­мое ко­ли­че­­ст­во ра­бо­чих про­цес­сов, раз­дать всем этим про­цес­сам за­дание, по­лу­чить ре­зуль­та­ты от всех ра­бо­чих про­цес­сов и объ­е­динить эти ре­зуль­та­ты в ре­зуль­ти­рую­щем спи­ске. Ра­бо­чие про­цес­сы в дан­ной вер­сии функ­ции map яв­ля­ют­ся «од­но­ра­зо­вы­ми»: они по­лу­ча­ют за­дание, вы­пол­ня­ют его, воз­вра­ща­ют его и за­кан­чи­ва­ют свою ра­бо­ту. Соз­дание за­дания для ра­бо­чих про­цес­сов за­клю­ча­ет­ся в пре­об­ра­зо­вании ис­ход­но­го спи­ска в спи­сок пар (кор­те­жей из двух эле­мен­тов), со­стоя­щих из по­ряд­ко­во­го но­ме­ра эле­мен­та и са­мо­го эле­мен­та. Мы за­да­ем по­ряд­ко­вые но­ме­ра эле­мен­тов, на­чи­ная с 0; по­че­му мы так де­ла­ем и за­чем во­об­ще нуж­ны по­ряд­ко­вые но­ме­ра эле­мен­тов, бу­дет яс­но чуть ниже. Для соз­дания за­даний для ра­бо­чих про­цес­сов мы соз­да­ем па­ру вспо­мо­га­тель­ных функ­ций simple_prepare_data/2 (ин­тер­фейс­ная функ­ция) и simple_prepare_data/3 (функ­ция, ре­шаю­щая дан­ную за­да­чу) в мо­ду­ле parallel_map:

simple_prepare_data([]) -> [];

simple_prepare_data(SourceList) -> simple_prepare_data(0, SourceList, []).

simple_prepare_data(Index, [Element], PreparedData) ->

lists:reverse([{Index, Element}] ++ PreparedData);

simple_prepare_data(Index, [Element|Rest], PreparedData) ->

simple_prepare_data(Index + 1, Rest, [{Index, Element}] ++ PreparedData).

Прин­цип ра­бо­ты этой функ­ции осно­ван на ре­кур­сии (а точнее, функ­ции simple_prepare_data/3); при этом са­ма функ­ция доста­точ­но три­ви­аль­на, и де­таль­но раз­го­ва­ри­вать про нее мы не бу­дем. Соз­дание ра­бо­чих про­цес­сов и раз­да­ча им за­даний мож­но объ­е­динить в на­шем слу­чае, т. к. у нас ка­ж­дый про­цесс слу­жит для вы­полнения толь­ко од­но­го за­дания: при­менения функ­ции ото­бра­жения к од­но­му из эле­мен­тов ис­ход­но­го мно­же­ст­ва.

А те­перь да­вай­те по­го­во­рим о том, за­чем нам нуж­но свя­зы­вать с ка­ж­дым из эле­мен­тов его по­ряд­ко­вый но­мер и по­че­му мы ну­ме­ра­цию эле­мен­тов на­чи­на­ем с 0. Обыч­ная, не мно­го­за­дач­ная вер­сия функ­ции map ра­бо­та­ет сле­дую­щим об­ра­зом: по­сле­до­ва­тель­но об­хо­дит все эле­мен­ты ис­ход­но­го спи­ска, для ка­ж­до­го эле­мен­та вы­чис­ля­ет зна­чение функ­ции ото­бра­жения от это­го эле­мен­та, и по­лу­чен­ное зна­чение до­бав­ля­ет­ся к ре­зуль­ти­рую­ще­му спи­ску. Очень важ­но по­нять, что все это про­ис­хо­дит по­сле­до­ва­тель­но! В слу­чае же мно­го­за­дач­ной вер­сии, мы не мо­жем га­ран­ти­ро­вать, что все за­да­чи при­шлют нам свои ре­зуль­та­ты ра­бо­ты в пра­виль­ном по­ряд­ке, да­же ес­ли (как в на­шем слу­чае) мы их соз­да­ва­ли (и за­пуска­ли на вы­полнение) в пра­виль­ном по­ряд­ке. Ре­шение этой про­бле­мы доста­точ­но про­стое и оче­вид­ное: необ­хо­ди­мо с ка­ж­дым эле­мен­том пе­ре­да­вать его по­ряд­ко­вый но­мер. Ес­ли функ­ция ра­бо­че­го про­цес­са на­пи­са­на так, что она вме­сте с ре­зуль­та­том ра­бо­ты воз­вра­ща­ет и этот ис­ход­ный по­ряд­ко­вый но­мер, то мы смо­жем со­хра­нять по­лу­чае­мые ре­зуль­та­ты по его по­ряд­ко­во­му но­ме­ру в неко­то­рое хранили­ще. В мо­мент го­тов­но­сти всех ре­зуль­та­тов ра­бо­ты их мож­но бу­дет из­влечь из хранили­ща и по­мес­тить в под­хо­дя­щую струк­ту­ру дан­ных. Хранили­ща­ми, ко­то­рые по­зво­ля­ют со­хранить неко­то­рые зна­чение по его по­ряд­ко­во­му но­ме­ру, яв­ля­ют­ся мас­си­вы (оп­ре­де­лен­ные в мо­ду­ле array). Так как ну­ме­ра­ция эле­мен­тов в мас­си­ве на­чи­на­ет­ся с 0, то имен­но по этой при­чине мы так­же на­чи­на­ем ну­ме­ро­вать эле­мен­ты из ис­ход­но­го спи­ска с 0. Ну, а под­хо­дя­щей струк­ту­рой дан­ных, как мы уже го­во­ри­ли, яв­ля­ет­ся спи­сок.

Функ­цио­наль­ность (или сред­ст­во), ко­то­рая приоста­нав­ли­ва­ет вы­полнение од­ной за­да­чи, по­ка не бу­дут по­лу­че­ны все необ­хо­ди­мые ре­зуль­та­ты от дру­гих за­дач, и со­би­рает эти ре­зуль­та­ты, на­зы­ва­ет­ся барь­е­ром. Дан­ная функ­цио­наль­ность у нас бу­дет об­щей для несколь­ких ва­ри­ан­тов реа­ли­за­ций функ­ций map и reduce, по­это­му ее реа­ли­за­ция на­хо­дит­ся в па­ре функ­ций collect_result/2 (ин­тер­фейс­ная функ­ция) и collect_result/3 (функ­ция, ре­шаю­щая дан­ную за­да­чу) мо­ду­ля parallel_common:

collect_result(ResultStorage, TotalCount) -> collect_result(ResultStorage, TotalCount, 0).

collect_result(ResultStorage, TotalCount, TotalCount) -> ResultStorage;

collect_result(ResultStorage, TotalCount, ProcessedCount) -> receive

{'EXIT', _From, normal} -> collect_result(ResultStorage, TotalCount, ProcessedCount);

{'EXIT', _From, Reason} -> error({internal_error, Reason});

{result, Index, DestElement} -> UpdatedResultStorage = array:set(Index, DestElement, ResultStorage), collect_result(UpdatedResultStorage, TotalCount, ProcessedCount + 1);

_Other -> collect_result(ResultStorage, TotalCount, ProcessedCount)

end.

Для сбо­ра ре­зуль­та­тов ра­бо­ты ра­бо­чих про­цес­сов на­ша барь­ер­ная функ­ция долж­на уметь взаи­мо­дей­ст­во­вать с ра­бо­чи­ми про­цес­са­ми, а ес­ли точнее – принимать от них со­об­щения с ре­зуль­та­та­ми ра­бо­ты. В этих со­об­щениях нам необ­хо­ди­мо знать ин­декс ис­ход­но­го эле­мен­та и ре­зуль­ти­рую­щий объ­ект; иден­ти­фи­ка­тор ра­бо­че­го про­цес­са нам не ну­жен, т. к. ис­поль­зуе­мые ра­бо­чие про­цес­сы яв­ля­ют­ся «од­но­ра­зо­вы­ми». По­это­му мы ожи­да­ем от ра­бо­чих про­цес­сов со­об­щения ви­да {result, Index, DestElement}, где Index – ин­декс ис­ход­но­го эле­мен­та, DestElement – ре­зуль­ти­рую­щий эле­мент. По­ми­мо это­го со­об­щения, мы так­же об­ра­ба­ты­ва­ем со­об­щения об из­менении со­стояния ра­бо­чих про­цес­сов: за­вер­шил­ся ли ра­бо­чий про­цесс обыч­ным об­ра­зом или из-за ошиб­ки. В пер­вом слу­чае мы ниче­го не де­ла­ем, во вто­ром – за­вер­ша­ем глав­ный про­цесс с ошиб­кой. И, на­конец, мы об­ра­ба­ты­ва­ем все осталь­ные со­об­щения; т. к. они не име­ют для нас смыс­ла, мы их про­сто из­вле­ка­ем из оче­ре­ди со­об­щений глав­но­го про­цес­са и ниче­го не де­ла­ем.

Те­перь мож­но пе­рей­ти непо­сред­ст­вен­но к те­лу ра­бо­чей функ­ции глав­но­го про­цес­са. Это функ­ция simple_pmap/2, оп­ре­де­лен­ная в мо­ду­ле parallel_map. Эта функ­ция яв­ля­ет­ся и точ­кой вхо­да в реа­ли­зуе­мый на­ми ва­ри­ант, т. е. экс­пор­ти­руе­мой:

simple_pmap(_Fun, []) -> [];

simple_pmap(Fun, SourceList) ->

process_flag(trap_exit, true),

MasterPid = self(),

ElementCount = length(SourceList),

PreparedData = simple_prepare_data(SourceList),

lists:foreach(fun({Index, Element}) -> spawn_link(fun() -> simple_worker(Fun, Element, Index, MasterPid) end) end, PreparedData),

EmptyStorage = array:new([{size, ElementCount}, {fixed, true}, {default, none}]),

FullStorage = parallel_common:collect_result(EmptyStorage, ElementCount),

process_flag(trap_exit, false),

array:to_list(FullStorage).

Как уже го­во­ри­лось вы­ше, в ра­бо­чей функ­ции глав­но­го про­цес­са мы соз­да­ем за­дания для ра­бо­чих про­цес­сов, соз­да­ем необ­хо­ди­мое ко­ли­че­­ст­во ра­бо­чих про­цес­сов и раз­да­ем им за­дания, по­лу­ча­ем ре­зуль­та­ты ра­бо­ты всех ра­бо­чих про­цес­сов и объ­е­ди­ня­ем эти ре­зуль­та­ты в итоговом спи­ске. По­ми­мо это­го, пе­ред соз­данием ра­бо­чих про­цес­сов мы де­ла­ем глав­ный про­цесс су­пер­ви­зо­ром, а по­сле сбо­ра ре­зуль­та­тов ра­бо­ты ра­бо­чих про­цес­сов (по­сле барь­е­ра) мы де­ла­ем глав­ный про­цесс обыч­ным про­цес­сом. Глав­ный про­цесс обыч­но де­ла­ют су­пер­ви­зо­ром для то­го, что­бы от­сле­жи­вать за­вер­шение вспо­мо­га­тель­ных про­цес­сов и, при необ­хо­ди­мо­сти, реа­ги­ро­вать на это (на­при­мер, ес­ли вспо­мо­га­тель­ный про­цесс за­вер­шил­ся из-за ошиб­ки, то пе­ре­за­пустить его). В на­шем слу­чае мы для про­цес­сов, за­вер­шив­ших­ся с ошиб­кой, генери­ру­ем ошиб­ку бо­лее вы­со­ко­го уров­ня (но со­дер­жа­щую ис­ход­ную ошиб­ку в ка­че­­ст­ве до­полнитель­ной ин­фор­ма­ции). Так де­ла­ют, что­бы аб­ст­ра­ги­ро­вать­ся от де­та­лей реа­ли­за­ции, но, тем не менее, по­зво­ляя эти де­та­ли по­лу­чить при необ­хо­ди­мо­сти.

Нам оста­лось рас­смот­реть, что де­ла­ют ра­бо­чие про­цес­сы для вы­полнения сво­его за­дания. Функ­ция simple_worker/4 из мо­ду­ля parallel_map яв­ля­ет­ся ра­бо­чей функ­ци­ей та­ких про­цес­сов:

simple_worker(Fun, SourceElement, Index, MasterPid) ->

DestElement = Fun(SourceElement),

MasterPid ! {result, Index, DestElement}.

Так как ра­бо­чие про­цес­сы у нас «од­но­ра­зо­вые», то их ра­бо­чая функ­ция име­ет очень про­стой вид: вы­полнить за­дание и по­слать ре­зуль­тат вы­полнения за­дания об­рат­но глав­но­му про­цес­су. Для на­шей за­да­чи, за­дание ра­бо­че­го про­цес­са – это про­сто вы­чис­лить зна­чение функ­ции ото­бра­жения для за­дан­но­го ис­ход­но­го эле­мен­та.

Да­вай­те про­ве­рим, что соз­дан­ный на­ми мно­го­за­дач­ный ва­ри­ант функ­ции map ра­бо­та­ет пра­виль­но. Для это­го ком­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и за­пуска­ем кон­соль сре­ды вы­полнения язы­ка Erlang. В ней на­би­ра­ем parallel_map:simple_pmap(fun(Item) -> 3*Item end, [1, 2, 5, 8]) и по­лу­ча­ем в ре­зуль­та­те спи­сок, со­стоя­щий из ут­ро­ен­ных эле­мен­тов ис­ход­но­го спи­ска [3, 6, 15, 24]. Ана­ло­гич­ным об­ра­зом на­би­ра­ем parallel_map:simple_pmap(fun(Item) -> lists:reverse(Item) end, [“str13”, “str666”]) и по­лу­ча­ем спи­сок, со­стоя­щий из об­рат­ных строк [«31rts», «666rts»]. Та­ким об­ра­зом, мы мо­жем сде­лать вы­вод, что дан­ный ва­ри­ант мно­го­за­дач­ной реа­ли­за­ции функ­ции map ра­бо­та­ет пра­виль­но.

С про­стой мно­го­за­дач­ной вер­си­ей функ­ци­ей map мы за­кон­чи­ли. Те­перь возника­ет во­прос: мо­жем ли мы, ру­ко­во­дству­ясь те­ми же прин­ци­па­ми, на­пи­сать столь же про­стую мно­го­за­дач­ную вер­сию функ­ции reduce? Для от­ве­та на этот во­прос да­вай­те рас­смот­рим прин­ци­пи­аль­ное от­ли­чие опе­ра­ции ото­бра­жения (ме­тод map) от опе­ра­ции сверт­ки (ме­тод reduce). В опе­ра­ции ото­бра­жения (map) мы об­ра­ба­ты­ва­ем все эле­мен­ты неза­ви­си­мо друг от дру­га; имен­но по­это­му мы мо­жем об­ра­ба­ты­вать их все па­рал­лель­но. В опе­ра­ции сверт­ки (reduce) мы вы­чис­ля­ем од­но зна­чение по всем эле­мен­там; это оз­на­ча­ет, что мы не мо­жем взять ка­кой-ли­бо эле­мент и ра­бо­тать с ним неза­ви­си­мо от осталь­ных эле­мен­тов. Рас­смот­рим, на­при­мер, как мы вы­чис­ля­ем сум­му эле­мен­тов в спи­ске. У нас есть на­чаль­ное зна­чение сум­мы (обыч­но это 0); мы бе­рем зна­чение пер­во­го эле­мен­та и скла­ды­ва­ем его с на­чаль­ным зна­чением, по­том по­лу­чен­ный ре­зуль­тат скла­ды­ва­ем со зна­чением вто­ро­го эле­мен­та и т. д. Вид­но, что мы не мо­жем взять и од­но­вре­мен­но об­ра­бо­тать пер­вый и вто­рой эле­мен­ты.

Мо­жет по­ка­зать­ся на пер­вый взгляд, что мы не смо­жем соз­дать мно­го­за­дач­ную вер­сию функ­ции reduce, но не сто­ить от­чаи­вать­ся: нам по­мо­жет та­кое свой­ст­во опе­ра­ции, как ас­со­циа­тив­ность. Как уже го­во­ри­лось вы­ше, ас­со­циа­тив­ность опе­ра­ции оп­ре­де­ля­ет, за­ви­сит ли ее ре­зуль­тат от то­го, в ка­ком по­ряд­ке мы его вы­чис­ля­ем. Дру­ги­ми сло­ва­ми, ес­ли опе­ра­ция ас­со­циа­тив­на, то мы мо­жем рас­ста­вить скоб­ки (вы­де­лить под­мно­же­ст­ва эле­мен­тов) так, как нам хо­чет­ся, вы­чис­лить ре­зуль­тат опе­ра­ции со­глас­но рас­став­лен­ным скоб­кам, и по­лу­чим в ка­че­­ст­ве ре­зуль­та­та всегда од­но и то же зна­чение. Так, на­при­мер, сум­ма чи­сел 1+2+3+4+5+6 рав­на как сум­ме (1+2)+(3+4)+(5+6), так и сум­ме (1+2)+(3+4+5+6). Сле­ду­ет так­же ска­зать, что когда мы вы­де­ля­ем под­груп­пы для опе­ра­ции сверт­ки, мы долж­ны так­же за­дать на­чаль­ное зна­чение для опе­ра­ции сверт­ки в под­груп­пе, или «ноль». Для на­хо­ж­дения сум­мы чи­сел этот «ноль» яв­ля­ет­ся чис­лом 0, для на­хо­ж­дения про­из­ве­дения чи­сел – 1, для кон­ка­те­на­ции строк – “” (или []), для на­хо­ж­дения про­из­ве­дения мат­риц – единич­ная мат­ри­ца, и т. д. Понимание это­го фак­та важ­но по той при­чине, что та­кие функ­ции сверт­ки, как lists:foldl/3 и lists:foldr/3, по­зво­ля­ют за­дать на­чаль­ное зна­чение для всей опе­ра­ции сверт­ки, ко­то­рое мо­жет от­ли­чать­ся от «но­ля». Итак, вид­но, что для соз­дания мно­го­за­дач­ной вер­сии функ­ции reduce нам необ­хо­ди­мо сде­лать сле­дую­щее: раз­бить ис­ход­ной спи­сок на пор­ции, пор­ции об­ра­бо­тать па­рал­лель­но, по­сле че­го ре­зуль­та­ты па­рал­лель­ной об­ра­бот­ки свер­нуть в ито­го­вое зна­чение.

Да­вай­те еще по­го­во­рим о том, что де­лать, ес­ли опе­ра­ция сверт­ки не ас­со­циа­тив­на. Возь­мем, на­при­мер, сле­дую­щую раз­ность: 1-2-3-4, зна­чение ко­то­рой рав­но -8. Ес­ли мы сгруп­пи­ру­ем эле­мен­ты так, как мы это де­ла­ли для сум­мы, то по­лу­чим со­всем дру­гое зна­чение. Так, на­при­мер, груп­пи­ров­ка (1-2)-(3-4) да­ет зна­чение 0. Но с точ­ки зрения ариф­ме­ти­ки это непра­виль­но, а пра­ви­лен один из сле­дую­щих ва­ри­ан­тов: (1-2)-(3+4) или (1-2)+(-3-4). Да­вай­те пе­ре­пи­шем эти ва­ри­ан­ты сле­дую­щим об­ра­зом: 1-(2)-(3+4) и 1+(-2)+(-3-4). Ста­но­вит­ся яс­но, что для вы­полнения опе­ра­ции сверт­ки по груп­пам (как для вы­чи­тания чи­сел, так и в об­щем слу­чае) нам необ­хо­ди­мы две опе­ра­ции сверт­ки эле­мен­тов: ис­ход­ная и неко­то­рая до­полнитель­ная. При этом мы ли­бо при­ме­ня­ем ис­ход­ную опе­ра­цию сверт­ки для сверт­ки внут­ри групп, а до­полнитель­ную для сверт­ки ре­зуль­та­тов сверт­ки для групп, ли­бо на­обо­рот. Так­же вид­но, что пер­вый эле­мент в опе­ра­ции сверт­ки не вхо­дит ни в ка­кую груп­пу. Как итог: на­пи­сание опе­ра­ции сверт­ки с груп­па­ми для неас­со­циа­тив­ной опе­ра­ции сверт­ки при­во­дит к до­полнитель­ным слож­но­стям, никак не свя­зан­ным с мно­го­за­дач­но­стью. Что­бы из­бе­жать этих слож­но­стей, реа­ли­зо­вы­вать мно­го­за­дач­ный ва­ри­ант функ­ции reduce для та­ких опе­ра­ций сверт­ки мы не бу­дем, как уже го­во­ри­лось вы­ше.

Итак, мы при­шли к та­ко­му по­ня­тию, как раз­биение дан­ных на пор­ции. Мы обыч­но раз­би­ва­ем ис­ход­ный на­бор эле­мен­тов на пор­ции тогда, когда есть воз­мож­ность об­ра­бо­тать дан­ные па­рал­лель­но, но об­ра­бот­ка од­но­го эле­мен­та дан­ных невы­год­на. В слу­чае опе­ра­ции сверт­ки (функ­ции reduce), мы мо­жем раз­бить спи­сок ис­ход­ных дан­ных на груп­пы из од­но­го эле­мен­та и об­ра­бо­тать их, по­сле че­го по­лу­чен­ные ре­зуль­та­ты свер­нуть в ито­го­вый ре­зуль­тат. Оче­вид­но, что это невы­год­но и бес­смыс­лен­но. В слу­чае опе­ра­ции ото­бра­жения (функ­ции map), ес­ли функ­ция ото­бра­жения про­стая (на­при­мер, уд­воение ар­гу­мен­та), то па­рал­лель­ная об­ра­бот­ка всех эле­мен­тов так­же невы­год­на. И в этом слу­чае го­раз­до вы­годнее об­ра­ба­ты­вать па­рал­лель­но пор­ции дан­ных. В свя­зи с этим возника­ет вполне ес­те­ст­вен­ный во­прос: а как вы­би­рать раз­мер та­ких пор­ций дан­ных? Хо­тя мы и мо­жем тео­ре­ти­че­­ски при­ки­нуть раз­ме­ры пор­ций, обыч­но их раз­ме­ры ищут экс­пе­ри­мен­таль­ным пу­тем на при­ме­рах ти­пич­ных дан­ных. Мы об этом еще по­го­во­рим в на­шем прак­ти­ку­ме.

По­ра дви­гать­ся даль­ше. Но пре­ж­де чем брать­ся за реа­ли­за­цию па­рал­лель­ной вер­сии функ­ции reduce, сле­ду­ет соз­дать несколь­ко по­лез­ных функ­ций для раз­биения дан­ных на пор­ции. Нас ин­те­ре­су­ют две за­да­чи: вы­чис­ление ко­ли­че­­ст­ва пор­ций дан­ных по раз­ме­ру од­ной пор­ции (и раз­ме­ру ис­ход­ных дан­ных) и соб­ст­вен­но раз­биение ис­ход­ных дан­ных на пор­ции. Пер­вая за­да­ча реа­ли­зо­ва­на в функ­ции calc_portion_count/2 мо­ду­ля parallel_common, ко­то­рая весь­ма три­ви­аль­на:

calc_portion_count(TotalSize, PortionSize) when TotalSize rem PortionSize == 0 ->

TotalSize div PortionSize;

calc_portion_count(TotalSize, PortionSize) when TotalSize rem PortionSize /= 0 ->

(TotalSize div PortionSize) + 1.

Эта функ­ция учи­ты­ва­ет тот факт, что ес­ли раз­ме­ры ис­ход­ных дан­ных и пор­ции не крат­ны, то у нас по­яв­ля­ет­ся оста­ток (раз­мер ко­то­ро­го мень­ше раз­ме­ра пор­ции), ко­то­рый так­же необ­хо­ди­мо учи­ты­вать. Раз­биение ис­ход­но­го спи­ска дан­ных на пор­ции реа­ли­зо­ва­но в функ­ци­ях prepare_data/2 (ин­тер­фейс­ная функ­ция) и prepare_data/3 (функ­ция, ре­шаю­щая дан­ную за­да­чу) мо­ду­ля parallel_common:

prepare_data(_PortionSize, []) -> [];

prepare_data(PortionSize, SourceList) -> prepare_data(0, PortionSize, SourceList, []).

prepare_data(Index, PortionSize, SourceList, PreparedData)

when length(SourceList) =< PortionSize ->

lists:reverse([{Index, SourceList}] ++ PreparedData);

prepare_data(Index, PortionSize, SourceList, PreparedData) ->

{Portion, Rest} = lists:split(PortionSize, SourceList),

prepare_data(Index + 1, PortionSize, Rest, [{Index, Portion}] ++ PreparedData).

Так же, как и в си­туа­ции с про­стой мно­го­за­дач­ной реа­ли­за­ци­ей функ­ции map, нам при­дет­ся со­би­рать ре­зуль­та­ты со всех ра­бо­чих про­цес­сов. Это оз­на­ча­ет, что нам точ­но так же необ­хо­ди­мо свя­зы­вать с пор­ция­ми дан­ных ин­дек­сы, на­чи­наю­щие­ся с 0 (толь­ко это уже бу­дут ин­дек­сы пор­ций). Имен­но это и де­ла­ют функ­ции prepare_data/2 и prepare_data/3 – они соз­да­ют спи­сок из пар (кор­те­жей, со­стоя­щих из двух эле­мен­тов: ин­декс пор­ции и соб­ст­вен­но пор­ция), ко­то­рый мы ис­поль­зу­ем в дальней­шем. Сле­ду­ет ска­зать, что все дальней­шие ва­ри­ан­ты мно­го­за­дач­ных реа­ли­за­ций функ­ций map и reduce бу­дут ис­поль­зо­вать пор­ции в ка­че­­ст­ве единицы ра­бо­ты для ра­бо­чих про­цес­сов.

На этом мы, по­жа­луй, се­го­дня оста­но­вим­ся: к со­жа­лению, ме­сто в жур­на­ле для ста­тьи ог­раничен­но. Да­вай­те под­ве­дем про­ме­жу­точ­ный итог: мы уви­де­ли, что да­же в са­мом про­стей­шем слу­чае (на при­ме­ре мно­го­за­дач­ной вер­сии функ­ции map) мно­го­за­дач­ная вер­сия боль­ше и сложнее со­от­вет­ст­вую­щей не мно­го­за­дач­ной вер­сии функ­ции. Мы уви­де­ли, что не всегда воз­мож­но по­дой­ти к за­да­че рас­па­рал­ле­ли­вания про­цес­са вы­чис­ления в лоб: мы не всегда мо­жем рас­па­рал­ле­лить об­ра­бот­ку ка­ж­до­го эле­мента ис­ход­ных дан­ных. И мы на­ча­ли рас­смат­ри­вать слу­чай, когда па­рал­лель­но у нас об­ра­ба­ты­ва­ют­ся не единич­ные эле­мен­ты, а пор­ции. В сле­дую­щем но­ме­ре мы про­дол­жим наш прак­ти­кум и, в том чис­ле, за­кон­чим соз­дание мно­го­за­дач­ных вер­сий функ­ций map и reduce, ко­то­рые па­рал­лель­но об­ра­ба­ты­ва­ют пор­ции ис­ход­ных дан­ных. |

Персональные инструменты
купить
подписаться
Яндекс.Метрика