Журнал LinuxFormat - перейти на главную

LXF81:Python

Материал из Linuxformat
Перейти к: навигация, поиск
Python
Python для профессионалов
Python + PyGame
Python + Web
Python + Clutter

Содержание

Задачи многозадачности

часть 1 Сергей Супрунов открывает новый цикл статей, в котором будут более полно освещены некоторые практические моменты разработки приложений на языке Python. Начнём, пожалуй, с вопросов параллельных вычислений...

Практически любая программа, особенно если она в процессе своей работы осуществляет взаимодействие с пользователем или удалённым клиентом, довольно много времени тратит впустую, ожидая ответного хода своего «партнёра». Неэффективность проявляется и в других вопросах: процессор простаивает, пока программа работает с жёстким диском; жёсткий диск, напротив, бездействует, пока программа занята вычислительными задачами, например, обработкой только что считанных с диска данных. Поэтому рано или поздно разработчики операционных систем должны были прийти к идее распараллеливания работы.

В большинстве современных ОС эта идея имеет две реализации: процессы и потоки (причем в Linux одно практически неотличимо от другого). Процесс, если говорить упрощённо, представляет собой некоторый набор ресурсов (область памяти, значения процессорных регистров, открытые дескрипторы файлов и т.д.), принадлежащих какой-то задаче. На однопроцессорных машинах одновременно может обрабатываться только один процесс, остальные в это время находятся в очереди. Ядро системы, точнее, его планировщик, в соответствии с заданным алгоритмом предоставляет доступ к процессору ожидающим процессам в соответствии с их приоритетом. Если текущий процесс переходит в состояние ожидания ввода-вывода, то доступ к процессору передаётся следующему процессу в очереди. Благодаря этому, во-первых, реализуется более эффективное использование ресурсов системы, а во-вторых, несколько задач могут выполняться в одно и то же (с точки зрения пользователя) время.

Потоки (threads, их также называют нитями или облегчёнными процессами) решают аналогичную задачу, но в рамках одного процесса. При управлении как потоками, так и процессами операционная система вынуждена «непроизводительно» расходовать некоторые ресурсы на так называемое переключение контекста (т.е. на выполнение «подготовительных» мероприятий, таких как восстановление значения регистров и адресного пространства). Благодаря тому, что потоки разделяют некоторые ресурсы (например, память процесса, в рамках которого они исполняются), переключение их контекста происходит заметно быстрее, чем контекста процесса. Благодаря этому можно распараллеливать задачи с заметно меньшими затратами. Хорошим примером может служить производительность Apache 2.x (см. обзор в LXF77).

Хватит, пожалуй, теории. Посмотрим, какие средства предоставляет язык Python программисту, желающему воспользоваться многозадачностью операционной системы, а заодно рассмотрим такие вещи как сокеты и межпроцессорное взаимодействие.

Сокеты

Сокет (конечная точка сетевых коммуникаций) – это основа клиент-серверных приложений. Фактически, это интерфейс, с помощью которого процессы могут осуществлять обмен информацией между собой. Конкретная реализация определяется так называемым коммуникационным доменом, наиболее распространённые из них – Internet-домен и Unix-домен. Internet-сокеты позволяют реализовать взаимодействие на базе протоколов сети Интернет, таких как TCP или UDP. О них мы подробнее поговорим в одной из следующих статей цикла.

Unix-сокеты представляют собой файловые объекты, куда процессы могут записывать поток данных и считывать его. Процесс, прослушивающий сокет в ожидании входящих сообщений, по традиции именуется сервером, а подключающийся к сокету для обмена данными – клиентом.

В языке Python работа с сокетами реализована в модуле socket. Со стороны сервера создание сокета (рассмотрим пример для домена Unix) выглядит следующим образом:

import socket                     # импортируем модуль
s = socket.socket(socket.AF_UNIX) # создаём сокет домена AF_UNIX
s.bind('/tmp/test.sock')          # привязываем его к файлу
s.listen(1)                       # начинаем прослушивать
conn, addr = s.accept()           # ждём подключения
conn.send('HELO')                 # дождавшись, отправляем клиенту строку
data = conn.recv(1024)            # получаем от клиента данные
s.close()                         # закрываем сокет

Обратите внимание на то, что методы accept(), send(), recv() являются по умолчанию блокирующими, т.е. работа программы приостанавливается до тех пор, пока не будет выполнено необходимое действие. Это означает, что клиент и сервер должны (по крайней мере, в нашей простейшей реализации) строго придерживаться определённой последовательности действий (протокола). В нашем случае после установки соединения сервер посылает строку приветствия. Если клиент, вместо того чтобы принять эту информацию, сам начнёт что-то передавать, то мы получим взаимную блокировку – клиент будет ждать, пока сервер примет его данные, сервер же будет ждать аналогичных действий со стороны клиента.

Для нормальной работы клиентская реализация должна выглядеть примерно так:

import socket                     # импортируем модуль
c = socket.socket(socket.AF_UNIX) # создаём сокет такого-же домена
c.connect('/tmp/test.sock')       # подключаемся к сокету сервера
greeting = c.recv(1024)           # принимаем строку приветсвия
c.send('Hello, server!')          # отправляем свои данные
c.close()                         # закрываем соединение

Понятно, что в данном примере мы получили «одноразовый» сервер, который, дождавшись соединения и приняв данные, завершает свою работу. Для постоянной работы фрагмент, начиная с метода accept(), нужно поместить в бесконечный цикл (см. пример ниже).

Модуль select

В Python доступен ещё один способ повысить эффективность работы за счёт параллельного выполнения некоторых операций – модуль select. Он использует системный вызов select для мультиплексирования соединений клиентов в одном цикле событий – метод select этого модуля позволяет отслеживать одновременно несколько сокетов или других файловых объектов (только на Unix-подобных системах) в ожидании готовности одного из них, после чего управление возвращается основной программе. Благодаря этому программа может обрабатывать сразу несколько сокетов по мере их готовности к взаимодействию. Познакомимся с этим модулем поближе.

Чтобы не замусоривать статью множеством фрагментов кода, приступим сразу к рассмотрению серьёзного примера, к которому будем обращаться по мере необходимости. Данный пример – простейший аналог демона syslog, задача которого – получать через Unix-сокет информацию от клиентов и записывать её в файл журнала. Если что-то не совсем понятно сразу, не обращайте на это внимание – всё прояснится к концу изложения. Код представлен на врезке logserver.py.

Чтобы в дальнейшем было проще модифицировать код, реализуем его в виде класса. В его конструкторе (метод __init__()) решаются три задачи: открытие файла (стр. 8-9), удаление файла-сокета, который может остаться в случае аварийного завершения сценария (стр. 10-15) и собственно создание сокета (стр. 16-18). На строки 19-21 пока не обращайте внимания.

Методы openlog() и writelog() соответственно открывают лог-файл и записывают в него строку, предваряя текущей датой. Ну и метод start() – основной, в котором и осуществляется обработка входящих соединений.

Для того, чтобы сервер постоянно обслуживал вверенный ему сокет, создаётся бесконечный цикл (строка 37). Однако здесь есть небольшая проблема. Вы же ещё помните, что методы accept(), recv() и send() являются по умолчанию блокирующими? То есть при «последовательной» обработке первый клиент, достучавшийся до сервера, полностью завладеет вниманием последнего, пока не завершит работу согласно заданному протоколу. Остальные же будут либо поставлены в очередь, либо вообще отброшены, если размер очереди превысит установленное значение (задаётся параметром метода listen()). Только полностью обслужив первого клиента, сервер сможет вернуться к строке 44 и снова вызвать accept().

Если обмен с клиентом происходит быстро, то такая схема работы вполне приемлема. Однако если протокол требует ведения «диалога», в ходе которого возможны задержки, то это может стать серьёзной проблемой. В рассматриваемом примере мы специально усложнили протокол, сделав его двухэтапным – сначала клиент должен представиться, дождаться от сервера подтверждения, затем отправить данные и снова дождаться подтверждения.

Если между первой и второй отправкой данных возникнет пауза (в коде клиента она искусственно реализована функцией time.sleep()), то сервер будет понапрасну простаивать, хотя вполне мог бы заняться обслуживанием других клиентов. Собственно, для этого и используется метод select() одноимённого модуля (строка 39).

Принцип действия его следующий – он берёт на себя ожидание данных в сокетах (массивы обслуживаемых сокетов передаются ему в виде параметров), передавая управление основной программе, если один из сокетов будет готов к обслуживанию.

Чтобы было понятнее, рассмотрим, что происходит в нашем примере. В строке 39 мы запускаем метод select(). Как только один (или несколько) из обслуживаемых сокетов (первоначально такой сокет только один, созданный при инициализации объекта в строке 16) будет готов к обслуживанию, select() передаёт основной программе массив сокетов, готовых к работе, который обрабатывается в цикле (строка 42). Так, если к работе готов «родительский» сокет, для него вызывается метод accept(). Поскольку метод select() гарантирует, что запрос на соединение уже есть, то основной программе не придётся тратить время на ожидание – accept() будет обработан сразу, вернув объект – новый сокет, предназначенный для работы с данным клиентом. Но мы не начинаем сразу же выполнять установленный протокол, а просто помещаем этот новый сокет в список rsocks, обслуживаемый методом select() (строка 45).

Когда на этот сокет поступят данные от клиента, select() вновь сообщит о готовности. На этот раз обработка пойдёт по ветке «else» (строка 46). Поскольку наш протокол двухэтапный, то считывать данные мы должны два раза, что и реализуется дополнительной конструкцией «if – else» (строки 47-56): при первом «подходе» словарь senders не будет содержать упоминания данного сокета (обратные кавычки позволяют работать не с самим сокетом, а с его «строковым представлением»); при втором же этот словарь уже будет содержать имя отправителя, ассоциированное с сокетом. На втором этапе выполняется запись строки в лог-файл (строка 54), удаление сокета из массива rsocks (строка 55), что-бы select() уже не занимался его обслуживанием, и удаление записи из словаря (строка 56).

Обратите внимание, что мы не можем просто взять и последовательно вызвать два метода recv() для получения всех данных, поскольку второй вызов окажется уже блокирующим – ведь на первый recv() мы попадаем, только когда select() обнаружит готовые для обработки данные; во втором же случае готовности придётся ждать самостоятельно.

По большому счёту, вызовы send() в нашем примере получились блокирующие – если клиент не сможет сразу принять переданное ему подтверждение, то сервер будет простаивать. Решается это аналогичным путём, но уже с помощью массива wsocks, однако из боязни сделать код чрезмерно сложным и нечитаемым, в данном примере мы проигнорируем эту проблему, оставив её решение вам в качестве упражнения.

Сигналы

Остались ещё две проблемы. Во-первых, хотелось бы, чтобы сервер перед завершением своей работы (поскольку используется бесконечный цикл, то это придётся делать «грубыми» методами вроде команды kill или Ctrl+C) успевал выполнить некоторые полезные действия (например, закрыть файл журнала, удалить файл сокета). Во-вторых, если в процессе работы сервера удалить или переименовать лог-файл и создать новый с таким же именем (например, это может происходить при ротации журнала утилитами типа logrotate), то дескриптор открытого файла (self.log в нашем примере) не изменится, продолжая указывать на прежнее расположение файла в файловой системе. Так что запись будет вестись по этому дескриптору, в уже переименованный или удалённый файл (поскольку на файл будет оставаться ссылка, «привязанная» к дескриптору, то при удалении из каталога он физически будет оставаться на месте, пока не будет удалён этот дескриптор) То есть нужно предусмотреть переинициализацию файла журнала.

Как команда kill, так и комбинация [Ctrl+C] реализуют метод межпроцессорного взаимодействия, именуемый сигналами. Например, kill 3942 отправит процессу номер 3942 сигнал 15 (SIGTERM), дающий указание завершить работу. Ctrl+C отправляет сигнал 2 (SIGINT). Большинство сигналов процесс может перехватить и обработать по собственному желанию, чем мы и воспользуемся.

В Python для этого предназначен модуль signal. Собственно, его мы и используем в строках 19–21, назначая на некоторые сигналы в качестве обработчика метод stop(). Для сигнала 1 (SIGHUP) в качестве обработчика назначается метод reinit(), который решает задачу переинициализации открытого файла журнала.

Тестирование

Чтобы проверить работу нашего сервера, нам нужен клиент. Его код представлен во врезке logclient.py. Никаких сложностей здесь нет. Поясню лишь, что конструкции time.sleep(5) (стр. 15 и 21) искусственно создают задержку между первым и вторым этапами диалога.

Чтобы убедиться в том, что все клиенты обслуживаются сервером параллельно, нужно запустить их в нескольких экземплярах (например, с разных консолей). В результате в файле журнала появятся такие записи:

Sun Jun 4 15:30:17 2006: ===> LogServer started
Sun Jun 4 15:30:55 2006: [test2] Test message
Sun Jun 4 15:30:56 2006: [test3] Test message
Sun Jun 4 15:31:00 2006: ===> LogServer stopped [signal 2]

Как видите, между записью сообщений от test2 и test3 прошла одна секунда, хотя каждый клиент требует для своей обработки как минимум 10. Значит, ожидание ответа от обоих клиентов выполняется одновременно, чего мы и добивались.

Ветвления

Впрочем, select – это не единственный способ организовать параллельную работу в Python. Модуль os предоставляет функцию fork(), которая использует одноимённый системный вызов, порождающий копию текущего процесса. Чтобы посмотреть, как это работает на практике, напишем небольшой сценарий, который будет автоматически запускать скрипты-клиенты для тестирования нашего сервера (действительно, негоже делать вручную то, что можно поручить программе). Код представлен на врезке logclient2.py.

Здесь всё до безобразия просто – функция fork() (строка 6) порождает копию текущего процесса. В каждой копии выполнение кода будет продолжено как ни в чём не бывало со следующей команды. Чтобы код мог понять, где он выполняется – в родительском процессе или в дочернем, используется значение, возвращаемое функцией fork(). Дочерний процесс получает значение 0, родительский – идентификатор порождённого дочернего процесса (PID).

Кстати, функция os._exit(0) в строке 10 позволяет завершить дочерний процесс. Если этого не сделать, то он пойдёт на выполнение цикла for (строка 5), уже сам выступая в качестве родительского и порождая, таким образом, настоящую лавину новых процессов.

Естественно, таким образом можно было бы реализовать и наш сервер – после метода accept() ответвлять дочерний процесс, который занимался бы обслуживанием конкретного клиента, в то время как родительский продолжал бы «висеть» на методе accept(), ожидая входящие соединения. Именно так и работают многие серверы, например, Apache (версия 1.х – только так и никак иначе, а в 2.х появились потоки).


БЛОКИРОВАТЬ НЕОБЯЗАТЕЛЬНО

Модуль socket также предоставляет возможность работы с неблокирующими вызовами accept(), send() и recv(). Для этого следует предварительно установить значение соответствующего атрибута объекта-сокета с помощью следующего метода:

socket.setblocking(0)

Значение 0 переключает сокет в неблокирующий режим работы (по умолчанию используется блокирующий – значение 1). При этом методы accept(), send() и recv() при отсутствии данных для обработки не останавливают выполнение программы до их появления, а генерируют исключение socket.error. Что с ним делать дальше – решать вам. Например, можно просто игнорировать:

while(1):
    try:
        data = sock.recv()
    except socket.error, errcode:
        if errcode[0] == 35:
            pass
        else:
            raise(socket.error)

Второй параметр оператора except – переменная, в которую будет занесён код ошибки. Этот код представляет собой кортеж вида (35, ‘Resource temporarily unavailable’), где первый элемент – числовой код ошибки, а второй – текстовая строка-пояснение. При отсутствии данных генерируется ошибка 35, которую мы и игнорируем (pass). Здесь мы получаем то же ожидание данных, но уже реализованное самим кодом Python. Но преимущество здесь в том, что вместо оператора pass можно реализовать любую обработку. Например, переходить к опросу другого сокета.

logserver.py

  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import os, socket, time, signal, select
  5.  
  6. class LogServer:
  7.     def __init__(self, sockfile='./lserv.sock',
  8.                  logfile='./lserv.log',
  9.                  maxqueue=5):
  10.         self.logfilename = logfile
  11.         self.openlog()
  12.         self.sockfilename = sockfile
  13.         try:
  14.             if os.path.exists(sockfile):
  15.                 os.unlink(sockfile)
  16.         except:
  17.             raise 'error'
  18.  
  19.         self.socket = socket.socket(socket.AF_UNIX)
  20.         self.socket.bind(sockfile)
  21.         self.socket.listen(maxqueue)
  22.  
  23.         signal.signal(signal.SIGHUP, self.reinit)
  24.         signal.signal(signal.SIGINT, self.stop)
  25.         signal.signal(signal.SIGTERM, self.stop)
  26.  
  27.         self.writelog('===> LogServer started')
  28.  
  29.     def openlog(self):
  30.         self.log = open(self.logfilename, 'a+')
  31.  
  32.     def writelog(self, message):
  33.         self.log.write('%s: %s\n' % (time.asctime(), message))
  34.  
  35.     def reinit(self, signum, frame):
  36.         self.log.close()
  37.         self.openlog()
  38.         self.start()
  39.  
  40.     def start(self):
  41.         rsocks = []
  42.         wsocks = []
  43.         rsocks.append(self.socket)
  44.         senders = {}
  45.         while 1:
  46.             try:
  47.                 reads, writes, errs = select.select(rsocks, wsocks, [])
  48.             except:
  49.                 return
  50.  
  51.             for sock in reads:
  52.                 if sock == self.socket:
  53.                     client, name = sock.accept()
  54.                     rsocks.append(client)
  55.                 elif not `sock` in senders.keys():
  56.                     sender = sock.recv(1024)
  57.                     sock.send('Sender OK')
  58.                     senders[`sock`] = sender
  59.                 else:
  60.                     message = sock.recv(1024)
  61.                     sock.send('Message OK')
  62.                     self.writelog('[%s] %s' % (senders[`sock`], message))
  63.                     rsocks.remove(sock)
  64.                     del senders[`sock`]
  65.  
  66.     def stop(self, signum, frame):
  67.         self.writelog('===> LogServer stopped [signal %s]' % (signum))
  68.         self.log.close()
  69.         os.unlink(self.sockfilename)
  70.  
  71. if __name__ == '__main__':
  72.     serv = LogServer(maxqueue=3)
  73.     serv.start()

logclient.py

  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sys, socket, time
  5.  
  6. class LogClient:
  7.     def __init__(self, sender='generic client',
  8.                  sockfile='./lserv.sock',
  9.                  buffersize=1024,
  10.                  testmode=0):
  11.         self.sender = sender
  12.         self.sockfile = sockfile
  13.         self.buffersize = buffersize
  14.         self.testmode = testmode
  15.  
  16.     def writelog(self, message):
  17.         if self.testmode:
  18.             time.sleep(5)
  19.         self.socket = socket.socket(socket.AF_UNIX)
  20.         self.socket.connect(self.sockfile)
  21.         self.socket.send(self.sender)
  22.         if self.socket.recv(self.buffersize) == 'Sender OK':
  23.             if self.testmode:
  24.                 time.sleep(5)
  25.             self.socket.send(message)
  26.             if not self.socket.recv(self.buffersize) == 'Message OK':
  27.                 print 'Ошибка: нет подтверждения Message'
  28.         else:
  29.             print 'Ошибка: нет подтверждения Sender'
  30.         self.socket.close()
  31.  
  32. if __name__ == '__main__':
  33.     sendername = sys.argv[1]
  34.     client = LogClient(sender=sendername, testmode=1)
  35.     client.writelog('Test message')

logclient2.py

  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import os
  5. from logclient import LogClient
  6.  
  7. for i in xrange(25):
  8.     pid = os.fork()
  9.     if pid == 0:
  10.         client = LogClient(sender='client%d' % i, testmode=1)
  11.         client.writelog('Test from client%d' % i)
  12.         os._exit(0)
  13.     else:
  14.         print 'Start child[%d]' % pid
Персональные инструменты
купить
подписаться
Яндекс.Метрика