承接之前的文章:深入理解 tornado 之 底层 ioloop 实现(二)
ioloop 最核心的部分:
def start(self):
if self._running: # 判断是否已经运行
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False # 设置停止为假
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident() # 获得当前线程标识符
self._running = True # 设置运行
old_wakeup_fd = None
if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
old_wakeup_fd = None
try:
while True: # 服务器进程正式开始,类似于其他服务器的 serve_forever
with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据
callbacks = self._callbacks # 读取 _callbacks
self._callbacks = []. # 清空 _callbacks
due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务
if self._timeouts: # 判断 _timeouts 里是否有数据
now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时
while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的
if self._timeouts[0].callback is None: # 超时任务无回调
heapq.heappop(self._timeouts) # 直接弹出
self._cancellations -= 1 # 超时计数器 - 1
elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时
due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。
else:
break # 因为最小堆,如果没超时就直接退出循环( 后面的数据必定未超时 )
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)): # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化
for callback in callbacks:
self._run_callback(callback) # 运行 callbacks 里所有的 calllback
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback) # 运行所有已过期任务的 callback
callbacks = callback = due_timeouts = timeout = None # 释放内存
if self._callbacks: # _callbacks 里有数据时
poll_timeout = 0.0 # 设置 epoll_wait 时间为 0 ( 立即返回 )
elif self._timeouts: # _timeouts 里有数据时
poll_timeout = self._timeouts[0].deadline - self.time()
# 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
# 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600 ,则用 3600 ,如果最小过期时间小于 0 就设置为 0 立即返回。
else:
poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间
if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续
break
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 )
try:
event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队
except Exception as e:
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0) # epoll_wait 结束, 再设置 signal alarm
self._events.update(event_pairs) # 将活跃事件加入 _events
while self._events:
fd, events = self._events.popitem() # 循环弹出事件
try:
fd_obj, handler_func = self._handlers[fd] # 处理事件
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
self._stopped = False # 确保发生异常也继续运行
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd) # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路
最后来看 stop
:
def stop(self):
self._running = False
self._stopped = True
self._waker.wake()
这个很简单,设置判断条件,然后调用 self._waker.wake()
向 pipe 写入随意字符释放 pipe 。 over !
噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。
最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!:pray:
作者:rapospectre
1
micyng 2016-06-07 20:49:34 +08:00
最后一点写错了,不是释放 pipe ,而是利用 pipe 的 fd 唤醒 ioloop 事件循环
|
2
rapospectre OP @micyng 谢谢指正!已经修改
|