Журнал LinuxFormat - перейти на главную

LXF170:Язы­ки про­грам­ми­ро­вания: Erlang

Материал из Linuxformat
Перейти к: навигация, поиск

Erlang - Описывается следующей формулой: функциональный язык + процессы

Прак­ти­кум по мно­го­за­дач­но­сти 5

Пя­тый прак­ти­кум по мно­го­за­дач­но­сти Ан­д­рей Уша­ков по­свя­ща­ет сбе­ре­же­нию сис­тем­ных ре­сур­сов.

(thumbnail)
Наш эксперт Ан­д­рей Уша­ков ак­тив­но при­бли­жа­ет тот день, ко­гда функ­цио­наль­ные язы­ки ста­нут мейн­ст­ри­мом.

В оче­ред­ной раз займемся прак­ти­кумом по мно­го­за­дач­но­сти в язы­ке Erlang – в ви­де ра­бо­ты над мно­го­за­дач­ны­ми вер­сия­ми функ­ций map и reduce. Се­го­дня мы про­дол­жим соз­да­вать вер­сии функ­ций map и reduce, ко­то­рые бе­реж­но от­но­сят­ся к ре­сур­сам ком­пь­ю­те­ра: про­цес­сор­но­му вре­мени, па­мя­ти, се­те­вой под­сис­те­ме. Это оз­на­ча­ет, что но­вые вер­сии функ­ций map и reduce не соз­да­ют лишних объ­ек­тов, в ро­ли ко­то­рых вы­сту­па­ли пор­ции дан­ных, и не на­гру­жа­ют се­те­вую под­сис­те­му од­но­вре­мен­ной по­сыл­кой всех этих объ­ек­тов.

По тра­ди­ции в­спомним, на чем мы оста­но­ви­лись в про­шлом но­ме­ре. Мы ­го­во­ри­­ли о ре­шении (при­ня­том при по­строении на­ших при­ме­ров) раз­бить все ис­ход­ные дан­ные на пор­ции и ра­зо­слать эти пор­ции дан­ных (ка­к за­дания по об­ра­бот­ке) ра­бо­чим про­цес­сам сра­зу, а затем толь­ко со­би­рать ре­зуль­та­ты об­ра­бот­ки этих пор­ций ра­бо­чи­ми про­цес­са­ми. Мы по­ка­за­ли, что это при­во­дит к рез­ко­му скач­ку по­треб­ляе­мой па­мя­ти и на­груз­ки на про­цес­сор и, воз­мож­но, на сеть (ес­ли ра­бо­чие про­цес­сы вы­пол­ня­ют­ся на раз­ных уз­лах). Ведь ес­ли мы хо­тим раз­бить все ис­ход­ные дан­ные на пор­ции, то необ­хо­ди­мо под эти пор­ции дан­ных вы­де­лить па­мять, раз­мер ко­то­рой чуть боль­ше раз­ме­ра ис­ход­ных дан­ных (т. к. ка­ж­дая пор­ция – это кор­теж, со­стоя­щий из са­мой пор­ции дан­ных и ее ин­дек­са), а так­же по­тра­тить неко­то­рое про­цес­сор­ное вре­мя. Далее мы все эти дан­ные пе­ре­да­ем по се­ти на­шим ра­бо­чим про­цес­сам, уве­ли­чи­вая на­груз­ку на сеть. А при доста­точ­но боль­шом объ­е­ме вход­ных дан­ных нам про­сто мо­жет не хва­тить раз­ме­ра ад­рес­но­го про­стран­ст­ва (про­цес­са опе­ра­ци­он­ной сис­те­мы, со­дер­жащего сре­ду вы­полнения Erlang, наш при­мер и дан­ные) для то­го, что­бы хранить од­но­вре­мен­но как ис­ход­ные дан­ные, так и пор­ции дан­ных.

Разберемся, как ре­шить эту про­бле­му. В про­шлый раз мы рас­смот­ре­ли воз­мож­ное ее ре­шение: «от­ре­зать» но­вую пор­цию для об­ра­бот­ки ка­ко­му-ли­бо ра­бо­че­му про­цес­су толь­ко по ме­ре необ­хо­ди­мо­сти. Т. е. толь­ко по­сле то­го, как ка­кой-ли­бо ра­бо­чий про­цесс вы­полнит свою за­да­чу (об­ра­бо­та­ет пор­цию дан­ных) и ото­шлет ре­зуль­тат об­ра­бот­ки глав­но­му про­цес­су, он по­лу­чит сле­дую­щее за­дание (сле­дую­щую пор­цию) для вы­полнения. Прав­да, за это ре­шение нам при­дет­ся ­пла­тить, т. к. услож­ня­ет­ся взаи­мо­дей­ст­вие ме­ж­ду ра­бо­чи­ми про­цес­са­ми и глав­ным про­цес­сом. Если рань­ше глав­ный про­цесс раз­би­вал ис­ход­ные дан­ные на пор­ции, рас­пре­де­лял эти пор­ции (как за­дания) ме­ж­ду ра­бо­чи­ми про­цес­са­ми и ожи­дал ре­зуль­та­тов об­ра­бот­ки от всех ра­бо­чих про­цес­сов, то при но­вом под­хо­де необ­хо­ди­мо в глав­ном про­цес­се де­лать сле­дую­щее: соз­дать и раз­дать на­чаль­ные за­дания для ра­бо­чих про­цес­сов, по­том со­б­рать по­лу­чен­ные ре­зуль­та­ты об­ра­бот­ки и при наличии еще не об­ра­бо­тан­ных дан­ных соз­да­вать но­вую пор­цию и от­сы­лать ее на об­ра­бот­ку. С дру­гой сто­ро­ны, для ра­бо­чих про­цес­сов ниче­го не из­менилось: мы все так же по­лу­ча­ем за­дание на об­ра­бот­ку пор­ции дан­ных, об­ра­ба­ты­ва­ем эту пор­цию, от­сы­ла­ем ре­зуль­та­ты об­ра­бот­ки об­рат­но глав­но­му про­цес­су, по­сле че­го по­лу­ча­ем оче­ред­ное за­дание, ес­ли оно есть. И аб­со­лют­но не важ­но, сра­зу ли все за­дания на об­ра­бот­ку от­прав­ля­ют­ся ра­бо­чим про­цес­сам или же по­сте­пен­но, в от­вет на ре­зуль­тат об­ра­бот­ки пре­ды­ду­ще­го за­дания.

А те­перь да­вай­те по­смот­рим на то, что из реа­ли­за­ции дан­но­го под­хо­да в про­шлый раз мы успе­ли сде­лать. Во-пер­вых, мы объ­я­ви­ли, что всю об­щую функ­цио­наль­ность (на осно­ве ко­то­рой мы смо­жем реа­ли­зо­вать мно­го­за­дач­ные вер­сии функ­ций map и reduce) мы по­ме­ща­ем в мо­дуль parallel_smartmsg_helper. Во-вто­рых, мы вве­ли ряд оп­ре­де­лений за­пи­сей для то­го, что­бы сде­лать наш код бо­лее по­нят­ным:

-record(tasks_descr, {created = 0, processed = 0, rest = []}).

-record(task_request, {master, index, portion}).

-record(task_result, {worker, index, result}).

Здесь запись task_descr оп­ре­де­ле­на для хранения дан­ных о про­цес­се об­ра­бот­ки ис­ход­но­го спи­ска, запись task_request пред­став­ля­ет за­прос на об­ра­бот­ку оче­ред­ной пор­ции дан­ных (со­об­щение, ко­то­рое по­лу­ча­ет ра­бо­чий про­цесс от глав­но­го про­цес­са), а запись task_result пред­став­ля­ет ре­зуль­тат об­ра­бот­ки оче­ред­ной пор­ции дан­ных ра­бо­чим про­цес­сом (со­об­щение, ко­то­рое ра­бо­чий про­цесс по­сы­ла­ет глав­но­му про­цес­су). Сле­ду­ет за­ме­тить, что мы не по­ме­ща­ем эти оп­ре­де­ления за­пи­сей в от­дель­ный за­го­ло­воч­ный файл (с рас­ши­рением .hrl), т. к. эти оп­ре­де­ления ис­поль­зу­ют­ся толь­ко внут­ри мо­ду­ля parallel_smartmsg_helper. И, на­конец, мы оп­ре­де­ли­ли функ­цию, ко­то­рую вы­пол­ня­ет ра­бо­чий про­цесс во вре­мя сво­ей жизни. Это экс­пор­ти­руе­мая функ­ция parallel_smartmsg_helper: smartmsg_worker/1:

smartmsg_worker(Fun) ->

receive

#task_request{master=MasterPid, index=Index, portion=SourcePortion} ->

Dest = Fun(SourcePortion),

MasterPid ! #task_result{worker=self(), index=Index, result=Dest},

smartmsg_worker(Fun);

_Other -> smartmsg_worker(Fun)

end.

Как мы уже го­во­ри­ли, для ра­бо­че­го про­цес­са ло­ги­ка ра­бо­ты аб­со­лют­но не за­ви­сит от то­го, сра­зу ли все за­дания на об­ра­бот­ку пор­ций дан­ных он по­лу­чит или же по­сте­пен­но, в от­вет на ре­зуль­тат об­ра­бот­ки неко­то­рой пор­ции дан­ных. По­это­му эта функ­ция ничем не от­ли­ча­ет­ся от ана­ло­гич­ных функ­ций, ко­то­рые вы­пол­ня­ют ра­бо­чие про­цес­сы бо­лее ранних при­ме­ров, за ис­клю­чением то­го, что здесь для ра­бо­ты с со­об­щения­ми мы ис­поль­зу­ем оп­ре­де­лен­ные ранее за­пи­си. Эта функ­ция яв­ля­ет­ся экс­пор­ти­руе­мой – по при­чине то­го, что за соз­дание ра­бо­чих про­цес­сов у нас от­вет­ст­венен внешний код.

По­сле по­вто­рения мы мо­жем сме­ло ид­ти даль­ше. Для на­ча­ла да­вай­те соз­да­дим па­ру вспо­мо­га­тель­ных функ­ций, ко­то­рые бу­дут ин­кап­су­ли­ро­вать та­кие опе­ра­ции, как соз­дание и на­зна­чение но­вой за­да­чи ра­бо­че­му про­цес­су и со­хранение ре­зуль­та­та об­ра­бот­ки пор­ции в про­ме­жу­точ­ном хранили­ще. Начнем со вспо­мо­га­тель­ной функ­ции, ко­то­рая со­хра­ня­ет ре­зуль­тат об­ра­бот­ки пор­ции в про­ме­жу­точ­ном хранили­ще. Как уже го­во­ри­лось в пре­ды­ду­щих стать­ях, в ка­че­­ст­ве про­ме­жу­точ­но­го хранили­ща мы ис­поль­зу­ем мас­сив (кол­лек­цию, доступ к эле­мен­там ко­то­рой осу­ще­ст­в­ля­ет­ся по ин­дек­су), а ре­зуль­тат об­ра­бот­ки пор­ции со­сто­ит из са­мо­го ре­зуль­та­та и его ин­дек­са (ко­то­рый сов­па­да­ет с ин­дек­сом пор­ции при ее соз­дании). По­это­му функ­ция для со­хранения ре­зуль­та­тов об­ра­бот­ки пор­ции долж­на в ка­че­­ст­ве вход­ных па­ра­мет­ров принимать мас­сив с ре­зуль­та­та­ми об­ра­бот­ки пор­ций, сам ре­зуль­тат об­ра­бот­ки пор­ции и его ин­декс, а воз­вра­щать об­нов­лен­ный мас­сив с ре­зуль­та­та­ми об­ра­бот­ки пор­ций (мы помним, что в язы­ке Erlang мы не из­ме­ня­ем су­ще­ст­вую­щий объ­ект, а соз­да­ем но­вый). Это функ­ция collect_result/3, ко­то­рая оп­ре­де­ле­на в мо­ду­ле parallel_smartmsg_helper, но не экс­пор­ти­ру­ет­ся из него:

collect_result(Result, Index, Storage) ->

array:set(Index, Result, Storage).

По сути, она очень про­ста – это все­го лишь аль­тер­на­ти­ва (алиа­с) для функ­ции array:set/3; основ­ное ее пред­на­зна­чение – сде­лать код, ко­то­рый со­хра­ня­ет ре­зуль­та­ты об­ра­бот­ки пор­ции в про­ме­жу­точ­ное хранили­ще, бо­лее яс­ным и по­нят­ным.

В ка­че­­ст­ве сле­дую­ще­го ша­га мы соз­да­дим вспо­мо­га­тель­ную функ­цию для на­зна­чения но­во­го за­дания ра­бо­че­му про­цес­су. Это бу­дет функ­ция assign_task/4, оп­ре­де­лен­ная в мо­ду­ле parallel_smartmsg_helper, но не экс­пор­ти­руе­мая из него:

assign_task(Worker, SourceList, PortionSize, Index)

when length(SourceList) =< PortionSize ->

Worker ! #task_request{master = self(), index = Index, portion = SourceList},

[];

assign_task(Worker, SourceList, PortionSize, Index) ->

{Portion, Rest} = lists:split(PortionSize, SourceList),

Worker ! #task_request{master = self(), index = Index, portion = Portion},

Rest.

Вид­но, что эта функ­ция выглядит слож­нее функ­ции collect/3. В ней нам необ­хо­ди­мо об­ра­ба­ты­вать два слу­чая (для это­го оп­ре­де­ле­но два ва­ри­ан­та функ­ции assign_task/4): когда раз­мер необ­ра­бо­тан­но­го остат­ка мень­ше или ра­вен раз­ме­ру пор­ции дан­ных и когда раз­мер необ­ра­бо­тан­но­го остат­ка боль­ше раз­ме­ра пор­ции. В пер­вом слу­чае мы про­сто от­сы­ла­ем этот оста­ток ра­бо­че­му про­цес­су на об­ра­бот­ку и воз­вра­ща­ем пустой спи­сок; это оз­на­ча­ет, что боль­ше нет ис­ход­ных дан­ных для об­ра­бот­ки. Во вто­ром слу­чае мы «от­ре­за­ем» пор­цию за­дан­но­го раз­ме­ра, от­сы­ла­ем эту пор­цию ра­бо­че­му про­цес­су, по­сле че­го воз­вра­ща­ем оста­ток по­сле «от­ре­зания» пор­ции об­рат­но, т. к. он со­дер­жит не об­ра­бо­тан­ные еще дан­ные. В ка­че­­ст­ве па­ра­мет­ров эта функ­ция принима­ет иден­ти­фи­ка­тор ра­бо­че­го про­цес­са, ко­то­ро­му мы со­би­ра­ем­ся на­зна­чить за­дание, оста­ток необ­ра­бо­тан­ных ис­ход­ных дан­ных, раз­мер пор­ции и ин­декс соз­да­вае­мой пор­ции дан­ных.

А мы мо­жем пой­ти даль­ше. Ес­ли рань­ше мы «раз­би­ва­ли» ис­ход­ные дан­ные на пор­ции и рас­пре­де­ля­ли эти пор­ции (в ви­де за­даний на об­ра­бот­ку) сре­ди ра­бо­чих про­цес­сов сра­зу, то сей­час мы «от­ре­за­ем» оче­ред­ную пор­цию и от­да­ем ее ра­бо­че­му про­цес­су толь­ко по­сле то­го, как он при­шлет глав­но­му про­цес­су ре­зуль­та­ты об­ра­бот­ки ка­кой-ли­бо пор­ции дан­ных. Но для то­го, что­бы та­кая схе­ма взаи­мо­дей­ст­вия глав­но­го про­цес­са и ра­бо­чих про­цес­сов ра­бо­та­ла, необ­хо­ди­мо инициа­ли­зи­ро­вать ра­бо­чие про­цес­сы. Про­цесс инициа­ли­за­ции ра­бо­чих про­цес­сов со­сто­ит в сле­дую­щем: для ка­ж­до­го ра­бо­че­го про­цес­са мы «от­ре­за­ем» от ис­ход­ных дан­ных (а точнее, от необ­ра­бо­тан­но­го остат­ка) пор­цию и от­прав­ля­ем эти пор­ции на об­ра­бот­ку ра­бо­чим про­цес­сам. По­сле та­кой инициа­ли­за­ции мы мо­жем ис­поль­зо­вать опи­сан­ный вы­ше ме­ханизм взаи­мо­дей­ст­вия, т. е. ожи­дать ре­зуль­тат об­ра­бот­ки пор­ции от ка­ко­го-ли­бо ра­бо­че­го про­цес­са, по­сле че­го под­го­тав­ли­вать и от­сы­лать ему оче­ред­ное за­дание (оче­ред­ную пор­цию ис­ход­ных дан­ных). Дан­ный про­цесс инициа­ли­за­ции ра­бо­чих про­цес­сов мы реа­ли­зу­ем при по­мо­щи функ­ции distribute_init_tasks/3, ко­то­рая оп­ре­де­ле­на в мо­ду­ле parallel_smartmsg_helper:

distribute_init_tasks(#tasks_descr{created=Created, rest=[]}, _PortionSize, _WorkerList) ->

#tasks_descr{created = Created, rest = []};

distribute_init_tasks(TasksDescr, _PortionSize, []) -> TasksDescr;

distribute_init_tasks(#tasks_descr{created=Created, rest=Source}, PortionSize, [Worker|Workers]) ->

Rest = assign_task(Worker, Source, PortionSize, Created),

TasksDescr = #tasks_descr{created=Created+1, rest=Rest},

distribute_init_tasks(TasksDescr, PortionSize, Workers).

Эта функ­ция на вход принима­ет три па­ра­мет­ра: опи­са­тель про­цес­са об­ра­бот­ки ис­ход­ных дан­ных (ко­то­рый яв­ля­ет­ся эк­зем­п­ля­ром за­пи­си tasks_descr), раз­мер пор­ции ис­ход­ных дан­ных и спи­сок иден­ти­фи­ка­то­ров ра­бо­чих про­цес­сов (точнее, оста­ток спи­ска иден­ти­фи­ка­то­ров ра­бо­чих про­цес­сов); воз­вра­ща­ет эта функ­ция опи­са­тель про­цес­са об­ра­бот­ки ис­ход­ных дан­ных по­сле инициа­ли­за­ции ра­бо­чих про­цес­сов. В функ­ции distribute_init_tasks/3 мы долж­ны об­ра­ба­ты­вать три раз­ных слу­чая (по­это­му функ­ция distribute_init_tasks/3 со­дер­жит три ва­ри­ан­та): когда за­кон­чи­лись вход­ные дан­ные, когда мы инициа­ли­зи­ро­ва­ли все ра­бо­чие про­цес­сы, и об­щий слу­чай – когда есть как необ­ра­бо­тан­ные вход­ные дан­ные, так и неинициа­ли­зи­ро­ван­ные ра­бо­чие про­цес­сы. Пер­вый слу­чай воз­мо­жен, ес­ли ко­ли­че­­ст­во ра­бо­чих про­цес­сов боль­ше или рав­но ко­ли­че­­ст­ву пор­ций (с раз­ме­ром PortionSize). В пер­вом и вто­ром ва­ри­ан­те функ­ции distribute_init_tasks/3 дальней­шая инициа­ли­за­ция уже невоз­мож­на, по­это­му мы воз­вра­ща­ем опи­са­тель про­цес­са об­ра­бот­ки дан­ных. В по­следнем же ва­ри­ан­те (когда есть как вход­ные дан­ные, так и неинициа­ли­зи­ро­ван­ные ра­бо­чие про­цес­сы) мы инициа­ли­зи­ру­ем оче­ред­ной ра­бо­чий про­цесс пор­ци­ей (при по­мо­щи функ­ции assign_task/4), по­сле че­го вы­зы­ва­ем ре­кур­сив­но (при по­мо­щи хво­сто­вой ре­кур­сии) ме­тод distribute_init_tasks/3 для остав­ших­ся ра­бо­чих про­цес­сов и вход­ных дан­ных. Так­же сле­ду­ет ска­зать, что функ­ция distribute_init_tasks/3 не экс­пор­ти­ру­ет­ся из мо­ду­ля parallel_smartmsg_helper, т. к. она ис­поль­зу­ет­ся толь­ко внут­ри него.

Сле­дую­щий шаг, ко­то­рый нам необ­хо­ди­мо реа­ли­зо­вать – это взаи­мо­дей­ст­вие ме­ж­ду ра­бо­чи­ми про­цес­са­ми и глав­ным про­цес­сом. Как мы уже го­во­ри­ли, взаи­мо­дей­ст­вие ме­ж­ду ра­бо­чи­ми про­цес­са­ми и глав­ным про­цес­сом вы­гля­дит сле­дую­щим об­ра­зом: по­сле инициа­ли­за­ции ра­бо­чих про­цес­сов (о чем мы го­во­ри­ли вы­ше) глав­ный про­цесс ждет со­об­щения от лю­бо­го ра­бо­че­го про­цес­са с ре­зуль­та­та­ми об­ра­бот­ки на­зна­чен­ной ему пор­ции дан­ных. При по­лу­чении им та­ко­го со­об­щения (эк­зем­п­ля­ра за­пи­си task_result) глав­ный про­цесс со­хра­ня­ет ре­зуль­та­ты об­ра­бот­ки в про­ме­жу­точ­ное хранили­ще, а его дальней­шие дей­ст­вия за­ви­сят от то­го, есть на мо­мент по­лу­чения со­об­щения необ­ра­бо­тан­ные дан­ные или нет. Ес­ли на мо­мент по­лу­чения со­об­щения необ­ра­бо­тан­ные дан­ные еще есть, то глав­ный про­цесс «от­ре­за­ет» от этих дан­ных пор­цию, от­сы­ла­ет эту пор­цию ра­бо­че­му про­цес­су в ви­де за­дания на об­ра­бот­ку (в ви­де эк­зем­п­ля­ра за­пи­си task_request), по­сле че­го про­дол­жа­ет ждать со­об­щения от ра­бо­чих про­цес­сов. Ес­ли же на мо­мент по­лу­чения со­об­щения необ­ра­бо­тан­ных дан­ных не оста­лось, то глав­ный про­цесс про­сто про­дол­жа­ет ждать со­об­щений от дру­гих про­цес­сов. Это ожи­дание за­кан­чи­ва­ет­ся тогда, когда при­хо­дит по­следнее со­об­щение с ре­зуль­та­том об­ра­бот­ки ка­кой-ли­бо пор­ции ис­ход­ных дан­ных. Что­бы от­сле­дить это по­следнее со­об­щение, мы ис­поль­зу­ем опи­са­тель про­цес­са об­ра­бот­ки дан­ных. Как мы уже го­во­ри­ли, опи­са­тель про­цес­са об­ра­бот­ки дан­ных (эк­зем­п­ляр за­пи­си tasks_descr) со­дер­жит три по­ля: ко­ли­че­­ст­во соз­дан­ных за­дач на об­ра­бот­ку, ко­ли­че­­ст­во об­ра­бо­тан­ных за­дач и оста­ток необ­ра­бо­тан­ных ис­ход­ных дан­ных. Оче­вид­но, что по­следним бу­дет та­кое со­об­щение от од­но­го из ра­бо­чих про­цес­сов, по­сле ко­то­ро­го ко­ли­че­­ст­во соз­дан­ных за­дач рав­но ко­ли­че­­ст­ву вы­полнен­ных за­дач, а оста­ток необ­ра­бо­тан­ных ис­ход­ных дан­ных пуст. Та­ким об­ра­зом, вид­но, что функ­ция, ко­то­рая бу­дет реа­ли­зо­вы­вать это взаи­мо­дей­ст­вие, долж­на иметь три ва­ри­ан­та: один ва­ри­ант – для окон­чания взаи­мо­дей­ст­вия (и дальней­ше­го вы­чис­ления ито­го­во­го ре­зуль­та­та), дру­гой ва­ри­ант – для си­туа­ции, когда необ­ра­бо­тан­ные ис­ход­ные дан­ные уже за­кон­чи­лись, но неко­то­рые ра­бо­чие про­цес­сы еще вы­пол­ня­ют свои за­дания, и, на­конец, по­следний ва­ри­ант для об­ще­го слу­чая. Это бу­дет функ­ция handle_workers/3, ко­то­рая оп­ре­де­ле­на в мо­ду­ле parallel_smartmsg_helper, но не экс­пор­ти­ру­ет­ся из него:

handle_workers(#tasks_descr{created=N, processed=N, rest=[]}, Storage, _PortionSize) ->

Storage;

handle_workers(#tasks_descr{created=Created, processed=Processed, rest=[]}, Storage, PortionSize) ->

receive

#task_result{index=Index, result=Dest} ->

UpdatedStorage = collect_result(Dest, Index, Storage),

TasksDescr = #tasks_descr{created=Created, processed=Processed+1, rest=[]},

handle_workers(TasksDescr, UpdatedStorage, PortionSize);

_ -> handle_workers(#tasks_descr{created=Created, processed=Processed, rest=[]}, Storage, PortionSize)

end;

handle_workers(#tasks_descr{created=Created, processed=Processed, rest=Source}, Storage, PortionSize) ->

receive

#task_result{worker=Worker, index=Index, result=Dest} ->

UpdatedStorage = collect_result(Dest, Index, Storage),

Rest = assign_task(Worker, Source, PortionSize, Created),

TasksDescr = #tasks_descr{created=Created+1, processed=Processed+1, rest=Rest},

handle_workers(TasksDescr, UpdatedStorage, PortionSize);

_ -> handle_workers(#tasks_descr{created=Created, processed=Processed, rest= Source}, Storage, PortionSize)

end.

Функ­ция handle_workers/3 принима­ет три па­ра­мет­ра: опи­са­тель про­цес­са об­ра­бот­ки дан­ных (эк­зем­п­ляр за­пи­си tasks_descr), хранили­ще про­ме­жу­точ­ных дан­ных и раз­мер пор­ции, а воз­вра­ща­ет (по­сле то­го, как все ис­ход­ные дан­ные бу­дут «раз­би­ты» на пор­ции, об­ра­бо­та­ны ра­бо­чи­ми про­цес­са­ми и со­б­ра­ны) хранили­ще про­ме­жу­точ­ных дан­ных с ре­зуль­та­та­ми об­ра­бот­ки всех пор­ций. Сле­ду­ет об­ра­тить внимание на то, ка­кие со­об­щения мы об­ра­ба­ты­ва­ем в функ­ции handle_workers/3. Во-пер­вых, мы об­ра­ба­ты­ва­ем со­об­щения от ра­бо­чих про­цес­сов с ре­зуль­та­та­ми об­ра­бот­ки оче­ред­ной пор­ции дан­ных; эти со­об­щения яв­ля­ют­ся эк­зем­п­ля­ра­ми за­пи­си task_result. Во-вто­рых, мы об­ра­ба­ты­ва­ем все осталь­ные ви­ды со­об­щений, не де­лая при этом ниче­го. Мы это де­ла­ем для то­го, что­бы оче­редь со­об­щения про­цес­са не за­со­ря­лась «му­сор­ны­ми» со­об­щения­ми.

Те­перь мы мо­жем со­брать все вме­сте и соз­дать точ­ку вхо­да для об­щей функ­цио­наль­но­сти, на осно­ве ко­то­рой мы по­том мо­жем сде­лать со­от­вет­ст­вую­щие вер­сии функ­ций map и reduce. Это бу­дет функ­ция parallel_smartmsg_helper:smartmsg_core/4, оп­ре­де­лен­ная в мо­ду­ле parallel_smartmsg_helper и экс­пор­ти­руе­мая из него:

smartmsg_core(FinalAggrFun, SourceList, PortionSize, WorkerList) ->

process_flag(trap_exit, true),

TasksDescr = distribute_init_tasks(#tasks_descr{rest=SourceList}, PortionSize, WorkerList),

PortionCount = parallel_common:calc_portion_count(length(SourceList), PortionSize),

EmptyStorage = array:new([{size, PortionCount}, {fixed, true}, {default, none}]),

FullStorage = handle_workers(TasksDescr, EmptyStorage, PortionSize),

process_flag(trap_exit, false),

FinalAggrFun(array:to_list(FullStorage)).

В этой функ­ции мы де­ла­ем сле­дую­щее: инициа­ли­зи­ру­ем ра­бо­чие про­цес­сы (раз­да­вая им на­чаль­ные за­дания), инициа­ли­зи­ру­ем хранили­ще про­ме­жу­точ­ных дан­ных, иниции­ру­ем об­щение с ра­бо­чи­ми про­цес­са­ми, по­сле че­го из дан­ных, на­хо­дя­щих­ся в хранили­ще про­ме­жу­точ­ных дан­ных, фор­ми­ру­ем ито­го­вый ре­зуль­тат. Как уже го­во­ри­лось вы­ше, об­щение с ра­бо­чи­ми про­цес­са­ми за­кан­чи­ва­ет­ся тогда, когда все пор­ции ис­ход­ных дан­ных об­ра­бо­та­ны и ре­зуль­та­ты их об­ра­бот­ки по­лу­че­ны и со­хранены в хранили­ще про­ме­жу­точ­ных дан­ных. По­это­му при фор­ми­ро­вании ито­го­во­го ре­зуль­та­та из дан­ных, на­хо­дя­щих­ся в хранили­ще про­ме­жу­точ­ных дан­ных, все ре­зуль­та­ты об­ра­бот­ки пор­ций в этом хранили­ще уже при­сут­ст­ву­ют (т. е. ров­но так же, как и в пре­ды­ду­щих вер­си­ях, соз­дан­ных на­ми). Сле­ду­ет так­же за­ме­тить, что при инициа­ли­за­ции ра­бо­чих про­цес­сов (при вы­зо­ве функ­ции distribute_init_tasks/3) в ка­че­­ст­ве од­но­го из па­ра­мет­ров мы пе­ре­да­ем на­чаль­ное зна­чение опи­са­те­ля про­цес­са об­ра­бот­ки дан­ных. При фор­ми­ро­вании это­го на­чаль­но­го зна­чения в ка­че­­ст­ве остат­ка необ­ра­бо­тан­ных ис­ход­ных дан­ных мы пе­ре­да­ем са­ми эти ис­ход­ные дан­ные (при этом ко­ли­че­­ст­во соз­дан­ных и вы­полнен­ных за­даний, оче­вид­но, рав­но 0).

И, на­конец, мы мо­жем соз­дать оче­ред­ные мно­го­за­дач­ные вер­сии функ­ций map и reduce: это бу­дут функ­ции parallel_map:smartmsg_pmap/4 и parallel_reduce:smartmsg_reduce/5, оп­ре­де­лен­ные в мо­ду­лях parallel_map и parallel_reduce со­от­вет­ст­вен­но. Начнем мы с оче­ред­ной вер­сии мно­го­за­дач­ной функ­ции map:

smartmsg_pmap(_Fun, [], _PortionSize, _WorkerCount) -> [];

smartmsg_pmap(Fun, Source, PortionSize, _WorkerCount)

when length(Source) =< PortionSize ->

lists:map(Fun, Source);

smartmsg_pmap(Fun, Source, PortionSize, WorkerCount) ->

WorkerFun = fun(Portion) -> lists:map(Fun, Portion) end,

Workers = [spawn_link(fun() -> parallel_smartmsg_helper:smartmsg_worker(WorkerFun) end) || _Index <- lists:seq(1, WorkerCount)],

Result = parallel_smartmsg_helper:smartmsg_core(fun lists:append/1, Source, PortionSize, Workers),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, Workers),

Result.

Лег­ко уви­деть, что ниче­го осо­бо не по­ме­ня­лось по сравнению с пре­ды­ду­щей вер­си­ей мно­го­за­дач­ной функ­ции map, кроме одного: мы ис­поль­зу­ем функ­цию parallel_smartmsg_helper:smartmsg_core/4 в ка­че­­ст­ве точ­ки вхо­да в об­щую функ­цио­наль­ность. Те­перь рас­смот­рим оче­ред­ную мно­го­за­дач­ную вер­сию функ­ции reduce:

smartmsg_reduce(_Fun, [], {Init, _PortionInit}, _PortionSize, _WorkerCount) -> Init;

smartmsg_reduce(Fun, Source, {Init, _PortionInit}, PortionSize, _WorkerCount)

when length(Source) =< PortionSize ->

lists:foldl(Fun, Init, Source);

smartmsg_reduce(Fun, Source, {Init, PortionInit}, PortionSize, WorkerCount) ->

ReduceFun = fun(List) -> lists:foldl(Fun, Init, List) end,

PortionReduceFun = fun(List) -> lists:foldl(Fun, PortionInit, List) end,

Workers = [spawn_link(fun() -> parallel_smartmsg_helper:smartmsg_worker(PortionReduceFun) end) || _Index <- lists:seq(1, WorkerCount)],

Result = parallel_smartmsg_helper:smartmsg_core(ReduceFun, Source, PortionSize, Workers),

lists:foldl(fun(Worker, _Aggr) -> exit(Worker, normal) end, true, Workers),

Result.

И в этом слу­чае ниче­го не по­ме­ня­лось по сравнению с пре­ды­ду­щей вер­си­ей, кроме точ­ки вхо­да в об­щую функ­цио­наль­ность.

Да­вай­те про­ве­рим, что оче­ред­ные мно­го­за­дач­ные вер­сии функ­ций map и reduce ра­бо­та­ют пра­виль­но. Для это­го мы ком­пи­ли­ру­ем со­от­вет­ст­вую­щие мо­ду­ли и за­пуска­ем кон­соль сре­ды вы­полнения язы­ка Erlang. Начнем с про­вер­ки ра­бо­ты функ­ции parallel_map:smartmsg_pmap/4. Вы­зов parallel_map:smartmsg_pmap(fun(Item)->lists:reverse(Item) end, [], 2, 2) воз­вра­ща­ет пустой спи­сок. Этим про­ве­ря­ет­ся пер­вый ва­ри­ант функ­ции parallel_map:smartmsg_pmap/4. Вы­зов parallel_map:smartmsg_pmap(fun(Item)->lists:reverse(Item) end, [“ab”, “cd”], 4, 2) воз­вра­ща­ет спи­сок [«ba”, “dc”]. Так как раз­мер пор­ции 4, а спи­сок с дан­ны­ми со­дер­жит все­го 2 эле­мен­та, то мы про­ве­ря­ем вто­рой ва­ри­ант функ­ции parallel_map:smartmsg_pmap/4. На­конец, вы­зов parallel_map:smartmsg_pmap(fun(Item)->lists:reverse(Item) end, [“ab”, “cd”, “ef”, “gh”], 2, 2) воз­вра­ща­ет спи­сок [«ba”, “dc”, “fe”, “hg”]. Так как раз­мер пор­ции 2, а спи­сок с дан­ны­ми со­дер­жит 4 эле­мен­та, то мы про­ве­ря­ем об­щий ва­ри­ант функ­ции parallel_map:smartmsg_pmap/4. При этом бу­дет соз­да­но 2 ра­бо­чих про­цес­са, и оба эти про­цес­са бу­дут за­гру­же­ны, т. к. спи­сок с дан­ны­ми раз­би­ва­ет­ся на 2 пор­ции дан­ных. Те­перь пе­рей­дем к про­вер­ке ра­бо­ты функ­ции parallel_reduce:smartmsg_reduce/5. Вы­зов parallel_reduce:smartmsg_reduce(fun(Item, Agg)->Agg ++ Item end, [], {“++”, “”}, 2, 2) воз­вра­ща­ет стро­ку “++”. Этот вы­зов про­ве­ря­ет пер­вый ва­ри­ант функ­ции parallel_reduce:smartmsg_reduce/5. Вы­зов parallel_reduce:smartmsg_reduce(fun(Item, Agg)->Agg ++ Item end, [“aa”, “bb”], {“++”, “”}, 4, 2) воз­вра­ща­ет стро­ку “++aabb”. Так как раз­мер пор­ции 4, а спи­сок с дан­ны­ми со­дер­жит все­го 2 эле­мен­та, то мы про­ве­ря­ем вто­рой ва­ри­ант функ­ции parallel_reduce:smartmsg_reduce/5. На­конец, вы­зов parallel_reduce:smartmsg_reduce(fun(Item, Agg)->Agg ++ Item end, [“aa”, “bb”, “cc”, “dd”], {“++”, “”}, 2, 2) воз­вра­ща­ет стро­ку «++aabbccdd». Так как раз­мер пор­ции 2, а спи­сок с дан­ны­ми со­дер­жит 4 эле­мен­та, то мы про­ве­ря­ем об­щий ва­ри­ант функ­ции parallel_reduce:smartmsg_reduce/5. При этом бу­дет соз­да­но 2 ра­бо­чих про­цес­са, и оба эти про­цес­са бу­дут за­гру­же­ны, т. к. спи­сок с дан­ны­ми раз­би­ва­ет­ся на 2 пор­ции дан­ных.

Итак, мы соз­да­ли поч­ти оче­ред­ные мно­го­за­дач­ные вер­сии функ­ций map и reduce. Мо­жет по­ка­зать­ся, что их уже неку­да улуч­шать и по­ра оста­но­вить­ся. Од­на­ко мы все-та­ки мо­жем сде­лать сле­дую­щее: от­ка­зать­ся от руч­но­го рас­пре­де­ления за­даний на об­ра­бот­ку (мы рас­пре­де­ля­ем за­дания так, что­бы ка­ж­дый ра­бо­чий про­цесс был за­гру­жен). Тогда мы пе­ре­ло­жим эту за­да­чу на неко­то­рый пул про­цес­сов (а точнее, уз­лов), ко­то­рый бы и рас­пре­де­лял за­дания на об­ра­бот­ку ме­ж­ду про­цес­са­ми по заданным кри­те­риям. Чем мы и зай­мем­ся в следующий раз. |


Персональные инструменты
купить
подписаться
Яндекс.Метрика