Luckylau's Blog

Python的eventlet使用与理解

neutron源码学习基础知识储备之Eventlet

eventlet在openstack,还有ryu控制器中使用频率很高,有必要总结一下用法。

什么是协程?

​ 说到Coroutine,我们必须提到两个更远的东西。在操作系统(os)级别,有进程(process)和线程(thread)两个(仅从我们常见的讲)实际的“东西”(不说概念是因为这两个家伙的确不仅仅是概念,而是实际存在的,os的代码管理的资源)。这两个东西都是用来模拟“并行”的,写操作系统的程序员通过用一定的策略给不同的进程和线程分配CPU计算资源,来让用户“以为”几个不同的事情在“同时”进行“。在单CPU上,是os代码强制把一个进程或者线程挂起,换成另外一个来计算,所以,实际上是串行的,只是“概念上的并行”。在现在的多核的cpu上,线程可能是“真正并行的”。

​ Coroutine,翻译成”协程“,初始碰到的人马上就会跟上面两个概念联系起来。直接先说区别,Coroutine是编译器级的,Process和Thread是操作系统级的。Coroutine的实现,通常是对某个语言做相应的提议,然后通过后成编译器标准,然后编译器厂商来实现该机制。Process和Thread看起来也在语言层次,但是内生原理却是操作系统先有这个东西,然后通过一定的API暴露给用户使用,两者在这里有不同。Process和Thread是os通过调度算法,保存当前的上下文,然后从上次暂停的地方再次开始计算,重新开始的地方不可预期,每次CPU计算的指令数量和代码跑过的CPU时间是相关的,跑到os分配的cpu时间到达后就会被os强制挂起。Coroutine是编译器的魔术,通过插入相关的代码使得代码段能够实现分段式的执行,重新开始的地方是yield关键字指定的,一次一定会跑到一个yield对应的地方。

总之,对于Coroutine,是编译器帮助做了很多的事情,来让代码不是一次性的跑到底,而不是操作系统强制的挂起。代码每次跑多少,是可预期的。但是,Process和Thread,在这个层面上完全不同,这两个东西是操作系统管理的。

python-eventlet又是什么?

官方网站对eventlet的描述是:

​ Eventlet is built around the concept of green threads (i.e. coroutines, we use the terms interchangeably) that are launched to do network-related work.

Green threads differ from normal threads in two main ways:

​ Green threads are so cheap they are nearly free. You do not have to conserve green threads like you would normal threads. In general, there will be at least one green thread per network connection.
Green threads cooperatively yield to each other instead of preemptively being scheduled. The major advantage from this behavior is that shared data structures don’t need locks, because only if a yield is explicitly called can
​ another green thread have access to the data structure. It is also possible to inspect primitives such as queues to see if they have any pending data.

​ 大概意思是Eventlet是以绿色线程(协同线程)的概念建立起来的网络库,绿色线程和普通线程的区别是:1.绿色线程的开销小 2.绿色线程共享数据,无需锁,同一时刻只有一个线程能访问数据,通过类似队列的去查找等待的数据。

​ eventlet是一个用来处理和网络相关的python库函数,而且可以通过协程来实现并发,在eventlet里,把“协程”叫做 greenthread(绿色线程)。所谓并发,就是开启了多个greenthread,并且对这些greenthread进行管理,以实现非阻塞式的 I/O。比如说用eventlet可以很方便的写一个性能很好的web服务器,或者是一个效率很高的网页爬虫,这都归功于eventlet的“绿色线程”,以及对“绿色线程”的管理机制。更让人不可思议的是,eventlet为了实现“绿色线程”,竟然对python的和网络相关的几个标准库函数进行了改写,并且可以以补丁(patch)的方式导入到程序中,因为python的库函数只支持普通的线程,而不支持协程,eventlet称之为“绿化”。
​ 它通过greenlet提供的协程功能,让开发者可以不用将以往的多线程等并发程序的开发方式转变成异步状态机模型,就能直接使用select/epoll/kqueue等操作系统提供的支持高并发IO接口,并且能尽可能地发挥它们在并发上的优势。

eventlet的结构如下图所示,eventlet实现的”并发” 更准确的讲, 是 IO多路复用

python-eventlet API?

Greenthread Spawn (spawn,孵化的意思,即如何产生greenthread)

主要有3个函数可以创建绿色线程:

1)eventlet.spawn(func, args, *kwargs):

​ 创建一个绿色线程去运行func这个函数,后面的参数是传递给这个函数的参数。返回值是一个eventlet.GreenThread对象,这个对象可以用来接受func函数运行的返回值。在绿色线程池还没有满的情况下,这个绿色线程一被创建就立刻被执行。其实,用这种方法去创建线程也是可以理解的,线程被创建出来,肯定是有一定的任务要去执行,这里直接把函数当作参数传递进去,去执行一定的任务,就好像标准库中的线程用run()方法去执行任务一样。

2)eventlet.spawn_n(func, args, *kwargs):

这个函数和spawn()类似,不同的就是它没有返回值,因而更加高效,这种特性,使它也有存在的价值。

3)eventlet.spawn_after(seconds, func, args, *kwargs):

这个函数和spawn()基本上一样,都有一样的返回值,不同的是它可以限定在什么时候执行这个绿色线程,即在seconds秒之后,启动这个绿色线程。

Greenthread Control

1)eventlet.sleep(seconds=0)

悬挂当前的绿色线程,以允许其它的绿色线程执行

2)class eventlet.GreenPool

​ 这是一个类,在这个类中用set集合来容纳所创建的绿色线程,并且可以指定容纳线程的最大数量(默认是1000个),它的内部是用Semaphore和Event这两个类来对池进行控制的,这样就构成了线程池。其中,有几个比较重要的方法:

​ free()

​ imap(function, *iterables)

​ resize(new_size)

​ running()

​ spawn(function, args, *kwargs)

​ spawn_n(function, args, *kwargs)

​ starmap(function, iterable)

​ waitall()

​ waiting()

3)class eventlet.GreenPile

这也是一个类,而且是一个很有用的类,在它内部维护了一个GreenPool对象和一个Queue对象。这个GreenPool对象可以是从外部传递进来的,也可以是在类内部创建的,GreenPool对象主要是用来创建绿色线程的,即在GreenPile内部调用了GreenPool.spawn()方法。而Queue对象则是用来保存spawn()方法的返回值的,即Queue中保存的是GreenThread对象。并且它还实现了next()方法,也就意味着GreenPile对象具有了迭代器的性质。所以如果我们要对绿色线程的返回值进行操作的话,用这个类是再好不过的了。

next()
Wait for the next result, suspending the current greenthread until it is available. Raises StopIteration when there are no more results.

spawn(func, args, *kw)
Runs func in its own green thread, with the result available by iterating over the GreenPile object

4)class eventlet.Queue

​ 基类是LightQueue,它实现了大部分的队列的常用方法。它是用collections做为实现队列的基本数据结构的。而且这个LightQueue的实现,不单单实现了存取操作,在本质上它实现了一个生产者和消费者问题,定义了两个set()类型的成员变量putters和getters,前者用来存放在队列满时,被阻塞的绿色线程,后者用来存放当队列空时,被阻塞的绿色线程。类中的putting()和getting()方法就是分别得到被阻塞的绿色线程的数量。Queue继承了LightQueue,并且又增加了它自己的两个方法:task_done()和join()。task_done()是被消费者的绿色线程所调用的,表示在这个项上的所有工作都做完了,join()是阻塞,直到队列中所有的任务都完成。LifoQueue和PriorityQueue是存放数据的两种不同的方式。

5)class eventlet.Timeout

This class is a way to add timeouts to anything. It raises exception in the current greenthread after timeout seconds. When exception is omitted or None, the Timeout instance itself is raised.

Patching Functions

这里就是之前所说的“绿化”,经过eventlet“绿化”过的模块都在eventlet.green中,导入他们主要有两种方法:

1) eventlet.import_patched(modulename, additional_modules, *kw_additional_modules)

1
2
3
4
5
6
7
from eventlet.green import socket
from eventlet.green import SocketServer
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
('socket', socket),
('SocketServer', SocketServer))
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
socket=socket, SocketServer=SocketServer)

2)eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False)

1
2
import eventlet
eventlet.monkey_patch(socket=True, select=True)

Network Convenience Functions(和网络相关的函数)

eventlet.connect(addr, family=, bind=None)

主要执行了以下几个步骤:新建了一个TCP类型的socket,绑定本地的ip和端口,和远程的地址进行连接

1
2
3
4
5
6
def connect(addr, family=socket.AF_INET, bind=None):
sock = socket.socket(family, socket.SOCK_STREAM)
if bind is not None:
sock.bind(bind)
sock.connect(addr)
return sock

eventlet.listen(addr, family=, backlog=50)

和connect()类似,只是把connect()换成了listen(),backlog指定了最大的连接数量

1
2
3
4
5
6
7
def listen(addr, family=socket.AF_INET, backlog=50):
sock = socket.socket(family, socket.SOCK_STREAM)
if sys.platform[:3]=="win":
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #这段不知道具体是做什么的
sock.bind(addr)
sock.listen(backlog)
return sock

eventlet.wrap_ssl(sock, a, *kw)

给socket加上ssl(安全套接层),对数据进行加密

eventlet.serve(sock, handle, concurrency=1000)

这个函数直接创建了一个socket服务器,在它内部创建了一个GreenPool对象,默认的最大绿色线程数是1000,然后是一个循环来接受连接

1
2
3
4
5
6
7
8
9
10
11
12
def serve(sock, handle, concurrency=1000):
pool = greenpool.GreenPool(concurrency)
server_gt = greenthread.getcurrent()
while True:
try:
conn, addr = sock.accept()
gt = pool.spawn(handle, conn, addr)
gt.link(_stop_checker, server_gt, conn)
conn, addr, gt = None, None, None
except StopServe:
return

eventlet 中的wsgi?

流程描述:

服务器开一个socket等待客户端连接;请求来了,服务器会读出传来的数据,然后根据HTTP协议做一些初步的封装,接着就可以调用事先注册的应用程序了,并将请求的数据塞进去;等响应处理完毕了再把数据通过socket发出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server参数介绍:
def server(sock, # Server socket, must be already bound to a port and listening(IP和端口并开启监听).
site, # WSGI application function(事件处理函数,发送start_response响应头然后返回响应内容)
log=None, # File-like object that logs should be written to.If not specified, sys.stderr is used.(日志处理,默认为sys.stderr用来重定向标准错误信息的)
environ=None, # Additional parameters that go into the environ dictionary of every request(每次请求的参数,写入一个字典中)
max_size=None, #Maximum number of client connections opened at any time by this server.(默认为1024)
max_http_version=DEFAULT_MAX_HTTP_VERSION, # Set to "HTTP/1.0" to make the server pretend it only supports HTTP 1.0.
                                # This can help with applications or clients that don't behave properly using HTTP 1.1.(HTTP协议版本,默认为HTTP/1.1)
protocol=HttpProtocol, # Protocol class.(协议类,默认为HttpProtocol)
server_event=None, # Used to collect the Server object(搜集服务器对象信息)
minimum_chunk_size=None, # Minimum size in bytes for http chunks. This can be used to improve performance of applications which yield many small strings, though
                      # using it technically violates the WSGI spec. This can be overridden on a per request basis by setting environ['eventlet.minimum_write_chunk_size'].
                      # 设置最小的Chunk大小,可以通过设置environ['eventlet.minimum_write_chunk_size']来覆盖.Chunk表示服务器发送给客户端的分块传输编码(Chunked transfer encoding)
log_x_forwarded_for=True, # If True (the default), logs the contents of the x-forwarded-for header in addition to the actual client ip address in the 'client_ip' field of the log line.
                      # 默认为True,记录客户端IP日志,X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
custom_pool=None, # A custom GreenPool instance which is used to spawn client green threads.If this is supplied, max_size is ignored.(协程池,如果启用则可以忽略前面的max_size参数)
keepalive=True, # If set to False, disables keepalives on the server; all connections will be closed after serving one request.(控制客户端连接数是否保持alive)
log_output=True, # A Boolean indicating if the server will log data or not.(确定服务端是否输出日志)
log_format=DEFAULT_LOG_FORMAT, # A python format string that is used as the template to generate log lines.(日志输出格式)
url_length_limit=MAX_REQUEST_LINE, # A maximum allowed length of the request url. If exceeded, 414 error is returned.(最大的url长度限制,默认为8192)
debug=True, # True if the server should send exception tracebacks to the clients on 500 errors.If False, the server will respond with empty bodies.(是否发送调式信息给客户端)
socket_timeout=None, # Timeout for client connections' socket operations. Default None means wait forever.(Socket超时时间设置,单位是秒)
capitalize_response_headers=True) # Normalize response headers' names to Foo-Bar(是否标准化相应头)

Client端:

1
2
3
4
5
6
7
8
#客户端代码:
import eventlet
c=eventlet.connect(('127.0.0.1', 6000))
while True:
data=raw_input('Enter data:')
c.sendall(data)
rc=c.recv(1024)
print rc

Server端:

1
2
3
4
5
6
7
8
9
10
11
12
#服务端代码:
import eventlet
def handle(client):
while True:
c = client.recv(1024)
print c
client.sendall(c)
server = eventlet.listen(('127.0.0.1', 6000))
pool = eventlet.GreenPool(10000)
while True:
new_sock, address = server.accept()
pool.spawn_n(handle, new_sock)

python-eventlet 的Demo?

官方上引以为傲的“网页爬虫”,用到了绿色线程池和imap()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
urls = [
"http://www.google.com/intl/en_ALL/images/logo.gif",
"http://python.org/images/python-logo.gif",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif",
]
import eventlet
from eventlet.green import urllib2
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print("got body", len(body))

源码级别的分析?

eventlet主要依赖另外2个python package:

greenlet
python-epoll (或其他类似的异步IO库, 如poll/select等)

主要做了3个工作:

封装greenlet
封装epoll
改写python标准库中相关的module, 以便支持epoll

什么是epoll?

epoll是linux实现的一个基于事件的异步IO库, 在之前类似的异步IO库poll上改进而来。

下面两个例子会演示如何用epoll将阻塞的IO操作用epoll改写为异步非阻塞:

blocking IO

import socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
try:
while True:
connectiontoclient, address = serversocket.accept()
request = b''
while EOL1 not in request and EOL2 not in request:
request += connectiontoclient.recv(1024)
print('-'*40 + '\n' + request.decode()[:-2])
connectiontoclient.send(response)
connectiontoclient.close()
finally:
serversocket.close()

​ 需要注意的是程序会在connectiontoclient, address = serversocket.accept()这一行block住, 直到获取到新的连接, 程序才会继续往下运行.同时, 这个程序同一个时间内只能处理一个连接, 如果有很多用户同时访问8080端口, 必须要按先后 顺序依次处理这些连接, 前面一个连接成功返回后, 才会处理后面的连接.

non-blocking IO by using epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import socket, select
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}; requests = {}; responses = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

可以看到, 例子中首先使用serversocket.setblocking(0)将socket设为异步的模式,然后 用select.epoll()新建了一个epoll, 接着用epoll.register(serversocket.fileno(),select.EPOLLIN)将该socket上的IO输入事件(select.EPOLLIN
)注册到epoll里.这样做了以后, 就可以将 上面例子中会在socket.accept()这步阻塞的MainLoop改写为基于异步IO事件的epoll循环了.events = epoll.poll(1)

​ 简单的说, 如果有很多用户同时连接到8080端口, 这个程序会同时accept()所有的socket连接, 然后通过这行代码将发生IO事件socket放到events中, 并在后面循环中处理. 没有发生IO事件的 socket不会在loop中做处理. 这样使用epoll就实现了一个简单的并发web服务器.

注意, 这里提到的并发, 和我们通常所理解线程/进程的并发并不太一样, 更准确的说, 是 IO多路复用 .

什么是greenlet?

greentlet是python中实现我们所谓的”Coroutine(协程)”的一个基础库.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from greenlet import greenlet
def test1():
print 12
gr2.switch()
print 34
def test2():
print 56
gr1.switch()
print 78
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
#输出
12
56
34

​ 程序先分别为两个函数定义了2个greenlet: gr1和gr2.gr1.switch()显式切换到gr1上执行, gr1中输出”12”后gr2.switch()显式切换到gr2上执行输出56, 又gr1.switch()显式切换到gr1上, 输出34. test1()执行结束,gr1 die. 于是 test2()里的78不会输出.可以发现greenlet仅仅是实现了一个最简单的”coroutine”, 而eventlet中的greenthread是在 greenlet的基础上封装了一些更high-level的功能, 比如greenlet的调度等.

什么是eventlet.green?

从epoll的运行机制可以看出, 要使用异步IO, 必须要将相关IO操作改写成non-blocking的方式. 但是我们用
eventlet.spawn()的函数,并没有针对epoll做任何改写, 那eventlet是怎么实现 异步IO的呢?这也是eventlet这个package最凶残的地方, 它自己重写了python标准库中IO相关的操作, 将它们 改写成支持epoll的模式, 放在eventlet.green中.比如说, socket.accept()被改成了这样

1
2
3
4
5
6
7
8
9
10
11
12
def accept(self):
if self.act_non_blocking:
return self.fd.accept()
fd = self.fd
while True:
res = socket_accept(fd)
if res is not None:
client, addr = res
set_nonblocking(client)
return type(self)(client), addr
trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))

​ 然后在eventlet.spawn()的时候, 通过 一些高阶魔法和”huge hack”, 将这些改写过得模块”patch”到spawn出的greenthread上, 从而 实现epoll的IO多路复用, 相当凶残.其中的hub和greenthread分别对应eventlet.hubs.hub和eventlet.greenthread, 本质都是 一个greenlet的实例.hub中封装前面提到的epoll, epoll的事件循环是由hub.run()这个方法里实现.每当用户调用 eventlet.spawn(), 就会在当前python线程的pool里产生一个新的greenthread. 由于greenthread 里的IO相关的python标准库被改写成non-blocking的模式(参考上面的socket.accept()).每当greenthread里做IO相关的操作时, 最终都会返回到hub中的epoll循环, 然后根据epoll中的 IO事件, 调用响应的函数. 具体如下面所示.greenthread.sleep(), 实际上也是将CPU控制权交给hub,然后由hub调度下一个需要运行的 greenthread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def wait(self, seconds=None):
readers = self.listeners[READ]
writers = self.listeners[WRITE]
if not readers and not writers:
if seconds:
sleep(seconds)
return
try:
presult = self.poll.poll(int(seconds * self.WAIT_MULTIPLIER))
except select.error, e:
if get_errno(e) == errno.EINTR:
return
raise
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
for fileno, event in presult:
try:
if event & READ_MASK:
readers.get(fileno, noop).cb(fileno)
if event & WRITE_MASK:
writers.get(fileno, noop).cb(fileno)
if event & select.POLLNVAL:
self.remove_descriptor(fileno)
continue
if event & EXC_MASK:
readers.get(fileno, noop).cb(fileno)
writers.get(fileno, noop).cb(fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
clear_sys_exc_info()

参考:

http://blog.csdn.net/xiangmin2587/article/details/8182775

http://blog.csdn.net/qq910894904/article/details/41699541

http://www.cnblogs.com/wonderKK/p/4062591.html

http://eventlet.net/doc/

http://eventlet.net/doc/modules/wsgi.html

http://www.xuebuyuan.com/1379840.html

Luckylau wechat
如果对您有价值,看官可以打赏的!