摘要:前言本文將嘗試詳細(xì)的帶大家一步步走完一個異步操作從而了解是如何實現(xiàn)異步的其實本文是對上一篇文的實踐和復(fù)習(xí)主旨在于關(guān)注異步的實現(xiàn)所以會忽略掉代碼中的一些異常處理文字較多湊合下吧接下來只會貼出部分源碼幫助理解希望有耐心的同學(xué)打開源碼一起跟蹤一遍
前言
本文將嘗試詳細(xì)的帶大家一步步走完一個異步操作,從而了解tornado是如何實現(xiàn)異步io的. 其實本文是對[上一篇文][1]的實踐和復(fù)習(xí) 主旨在于關(guān)注異步io的實現(xiàn),所以會忽略掉代碼中的一些異常處理.文字較多,湊合下吧 接下來只會貼出部分源碼,幫助理解,希望有耐心的同學(xué)打開tornado源碼,一起跟蹤一遍吧.AsyncHTTPClient :
AsyncHTTPClient 繼承 Configurable ,從__new__重看出是單例模式. 根據(jù) Configurable 的__new__和 AsyncHTTPClient 的 configurable_base 和 configurable_default 得知, 實例化后一定是 SimpleAsyncHTTPClient 的實例fetch
def fetch(self, request, callback=None, raise_error=True, **kwargs): if self._closed: raise RuntimeError("fetch() called on closed AsyncHTTPClient") if not isinstance(request, HTTPRequest): request = HTTPRequest(url=request, **kwargs) # We may modify this (to add Host, Accept-Encoding, etc), # so make sure we don"t modify the caller"s object. This is also # where normal dicts get converted to HTTPHeaders objects. request.headers = httputil.HTTPHeaders(request.headers) request = _RequestProxy(request, self.defaults) future = TracebackFuture() if callback is not None: callback = stack_context.wrap(callback) def handle_future(future): exc = future.exception() if isinstance(exc, HTTPError) and exc.response is not None: response = exc.response elif exc is not None: response = HTTPResponse( request, 599, error=exc, request_time=time.time() - request.start_time) else: response = future.result() self.io_loop.add_callback(callback, response) future.add_done_callback(handle_future) ##fetch_impl帶上handle_response,重點 def handle_response(response): if raise_error and response.error: future.set_exception(response.error) else: future.set_result(response) self.fetch_impl(request, handle_response) return future
fetch 中調(diào)用 fetch_impl,fetch_impl 中其中一個參數(shù)是 callback ,而代碼中的 callback 包含了 future 的 set_result , 所以,當(dāng) callback 被調(diào)用時,外部的 yield 操作將被激活,程序會在 ioloop 中調(diào)用此 callback ,然后回到原函數(shù)的 yield 處, 并且原函數(shù)返回此次 qeust 的 future 對象,以便在函數(shù)外部增加別的 callbackfetch_impl
def _connection_class(self): return _HTTPConnection def _handle_request(self, request, release_callback, final_callback): self._connection_class()( self.io_loop, self, request, release_callback, final_callback, self.max_buffer_size, self.tcp_client, self.max_header_size, self.max_body_size)
在 return 之前,繼續(xù)查看 fetch_impl 內(nèi)部是如何處理,根據(jù)推測,他一定是將繼續(xù)處理網(wǎng)絡(luò)請求, 肯定會將網(wǎng)絡(luò)請求交由 ioloop 的 epoll 部分處理,設(shè)定好處理的 hanlder 再返回 future.set_result ,接下來繼續(xù)分析 fetch_impl 內(nèi)部是如果設(shè)置網(wǎng)絡(luò)請求的. fetch_impl 的實現(xiàn)代碼中查看,實例化中創(chuàng)建了 tcpclient 對象,這個肯定是關(guān)鍵 根據(jù)之前的分析 SimpleAsyncHTTPClient 是單例模式,那他怎么處理各種 http 請求呢? 查看代碼得知,他將請求的 request 和 callback 存儲在 self.queue 中, 每次 fetch_impl 的時候,一個個 pop 出來處理就好了,這樣就能處理n個請求了 一步步跟蹤到 _handle_request ,發(fā)現(xiàn)最后到了 _HTTPConnection 的實例化中了. 實例化的參數(shù)有之前那個包含 future 的 callback . 這樣就可以保證 yield 操作可以回到原處了.好了,繼續(xù)走_HTTPConnection
class _HTTPConnection(httputil.HTTPMessageDelegate): _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) def __init__(self, io_loop, client, request, release_callback, final_callback, max_buffer_size, tcp_client, max_header_size, max_body_size): self.start_time = io_loop.time() self.io_loop = io_loop self.client = client self.request = request self.release_callback = release_callback self.final_callback = final_callback self.max_buffer_size = max_buffer_size self.tcp_client = tcp_client self.max_header_size = max_header_size self.max_body_size = max_body_size self.code = None self.headers = None self.chunks = [] self._decompressor = None # Timeout handle returned by IOLoop.add_timeout self._timeout = None self._sockaddr = None with stack_context.ExceptionStackContext(self._handle_exception): self.parsed = urlparse.urlsplit(_unicode(self.request.url)) if self.parsed.scheme not in ("http", "https"): raise ValueError("Unsupported url scheme: %s" % self.request.url) # urlsplit results have hostname and port results, but they # didn"t support ipv6 literals until python 2.7. netloc = self.parsed.netloc if "@" in netloc: userpass, _, netloc = netloc.rpartition("@") host, port = httputil.split_host_and_port(netloc) if port is None: port = 443 if self.parsed.scheme == "https" else 80 if re.match(r"^[.*]$", host): # raw ipv6 addresses in urls are enclosed in brackets host = host[1:-1] self.parsed_hostname = host # save final host for _on_connect if request.allow_ipv6 is False: af = socket.AF_INET else: af = socket.AF_UNSPEC ssl_options = self._get_ssl_options(self.parsed.scheme) timeout = min(self.request.connect_timeout, self.request.request_timeout) if timeout: self._timeout = self.io_loop.add_timeout( self.start_time + timeout, stack_context.wrap(self._on_timeout)) self.tcp_client.connect(host, port, af=af, ssl_options=ssl_options, max_buffer_size=self.max_buffer_size, callback=self._on_connect)
_HTTPConnection 的實例化中有一堆成員變量,有點暈, 先不管這么多,關(guān)注我們的 callback ,和 tcpclient . 一行行往下看,是 host 和 port 的初始化操作 ,http 和 https 是不一樣的嘛,當(dāng)然得處理一下, 終于到了最后,是 tcpclient.connect ,從 connect 的參數(shù)中看到 callback=self._on_connect , 應(yīng)該是個重要的方法,出去那些字符串處理,發(fā)現(xiàn) self.connection.write_headers(start_line , self.request.headers ) , 這應(yīng)該是發(fā)送 http 頭的操作吧,是網(wǎng)絡(luò)請求,所以這是處理 connect 這個 url 后,發(fā)送 http 頭的操作. 還是回頭看看是如何 connect 的吧,因為這是異步的關(guān)鍵,搞懂了這個,那剩下來的也是同出一則TCPClient
轉(zhuǎn)到 tcpclient 的代碼去看他的實例化和 connect 操作,看來剩下的路還很長呢 TCPClient 實例化的代碼很短,有個 resolver 對象,先不管connect
@gen.coroutine def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None, max_buffer_size=None): """Connect to the given host and port. Asynchronously returns an `.IOStream` (or `.SSLIOStream` if ``ssl_options`` is not None). """ addrinfo = yield self.resolver.resolve(host, port, af) connector = _Connector( addrinfo, self.io_loop, functools.partial(self._create_stream, max_buffer_size)) af, addr, stream = yield connector.start() # TODO: For better performance we could cache the (af, addr) # information here and re-use it on subsequent connections to # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) if ssl_options is not None: stream = yield stream.start_tls(False, ssl_options=ssl_options, server_hostname=host) raise gen.Return(stream)
去到 connect 方法里,發(fā)現(xiàn) coroutine 裝飾器,并且調(diào)用時設(shè)置了 callback=self._on_connect , 所以當(dāng)這個 coroutine 的 future 被解決時,會調(diào)用 self._on_connect , 你也可以看到 _on_connect 的參數(shù)是 stream ,就是 gen.Return(stream )傳過去的. 因為 gen.coroutine 實現(xiàn)時的代碼中, send 了 value 后,代碼繼續(xù)走,走到 gen.Return (其實這是個 exception , 就會走到 gen.coroutine 里的 set_result 了.) 第一個 yield 右邊是 self.resolver.resolve ,左邊是 addrinfo ,是地址信息, 這個異步操作處理的便是解析 url 的地址信息.此處 tornado 默認(rèn)使用了阻塞的實現(xiàn),暫時先不看, 以后在新的篇幅補充,主要內(nèi)容是 run_on_executor 裝飾器的內(nèi)容, 此處其實是同步返回的,因為默認(rèn)用的是 BlockingResolver 的代碼,直接看下一個 yield_Connector
def __init__(self, addrinfo, io_loop, connect): self.io_loop = io_loop self.connect = connect self.future = Future() self.timeout = None self.last_error = None self.remaining = len(addrinfo) self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
_Connector 實例化,參數(shù)有一個 callback ,是本類的 _create_stream , 并把 self.connect 設(shè)置成傳過來的 callback 所以 self.connect 就是 TCPClient._create_stream 了, 成員變量有個 future 實例,我們需要全程高度關(guān)注 future 和 callback . 實例化后調(diào)用了 start 方法 ,start 內(nèi)部,調(diào)用 try_connect,set_timout , 根據(jù)函數(shù)名得知,是 connect 操作和設(shè)置超時的操作.最后返回實例化時創(chuàng)建的 future .try_connect
def start(self, timeout=_INITIAL_CONNECT_TIMEOUT): self.try_connect(iter(self.primary_addrs)) self.set_timout(timeout) return self.future def try_connect(self, addrs): try: af, addr = next(addrs) except StopIteration: # We"ve reached the end of our queue, but the other queue # might still be working. Send a final error on the future # only when both queues are finished. if self.remaining == 0 and not self.future.done(): self.future.set_exception(self.last_error or IOError("connection failed")) return future = self.connect(af, addr) future.add_done_callback(functools.partial(self.on_connect_done, addrs, af, addr))
future = self.connect(af , addr ),執(zhí)行了 TCPClient._create_stream 方法, 返回 future ,并且設(shè)置 future 的 callback=on_connect_done_create_stream
def _create_stream(self, max_buffer_size, af, addr): # Always connect in plaintext; we"ll convert to ssl if necessary # after one connection has completed. stream = IOStream(socket.socket(af), io_loop=self.io_loop, max_buffer_size=max_buffer_size) return stream.connect(addr)
實例化 IOStream ,執(zhí)行并返回 stream.connect,stream.connect 返回的 future 便是 try_connect 中的 future , 所以,進(jìn)去看看 stream.connect 內(nèi)部是怎么”解決”這個 future 的.IOStream connect
def connect(self, address, callback=None, server_hostname=None): self._connecting = True if callback is not None: self._connect_callback = stack_context.wrap(callback) future = None else: future = self._connect_future = TracebackFuture() try: self.socket.connect(address) except socket.error as e: if (errno_from_exception(e) not in _ERRNO_INPROGRESS and errno_from_exception(e) not in _ERRNO_WOULDBLOCK): if future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), e) self.close(exc_info=True) return future self._add_io_state(self.io_loop.WRITE) return future
self._connecting = True 設(shè)置此實例正在連接中,連接完畢設(shè)置成 false 如果沒有 callback 傳入,生成 future 對象, 剛才返回的 future 記錄在這個實例的成員變量 self._connect_future 中. 然后執(zhí)行 socket 的 connect 操作,因為 socket 設(shè)置成非阻塞, 所以此處會立即返回,不會阻塞,當(dāng)連接成功時,緩沖區(qū)可寫,失敗時緩沖區(qū)可讀可寫.這是基礎(chǔ)知識,詳情百度. 然后調(diào)用 self._add_io_state ,返回 future_add_io_state
def _add_io_state(self, state): if self.closed(): # connection has been closed, so there can be no future events return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state)
終于到了這一步,要用 epoll 了!!!根據(jù)實例化的代碼得知 self._state=None , 會走 self.io_loop.add_handler 這步,根據(jù)我之前發(fā)的[文章][2],會將當(dāng)前的 fd ,當(dāng)前實例的 _handle_events ,和寫,錯誤操作注冊到 epoll 中 接著!!!!!終于走完了這個 yield 的流程了!!!!!!小總結(jié):
請一定弄清 future 是怎么傳遞的,每個 future 管理的 callback 是什么操作. _HTTPConnection 中 tcpclient.connect 一個 future ,callback=self._on_connect . 他將在 raise gen.Return(stream )時被添加到 ioloop 執(zhí)行. tcpclient.connect 內(nèi)部的 connector.start 一個 future , callback 是 on_connect_done ,他將在 poll 檢測到 write 事件時,被添加到 ioloop 執(zhí)行ioloop
def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() self._running = True old_wakeup_fd = None if hasattr(signal, "set_wakeup_fd") and os.name == "posix": try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: with self._callback_lock: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None if self._callbacks: poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd)
接下來 tornado 終于也回到了 ioloop 代碼中了(淚奔)!!當(dāng)連接成功時,該 fd 的緩沖區(qū)可寫, epoll 收到 fd 的 write 操作通知~進(jìn)入到了 epoll 的 loop 中處理.然后!回到剛才注冊的 _handle_events 了! 注意這個 _handle_events 是 IOStream 的實例里的 _handle_events ,他有剛才我們處理的所有信息哦~ 接下來看 _handle_events 的代碼,看他如果解決掉 futureIOStream._handle_events
def _handle_events(self, fd, events): if self.closed(): gen_log.warning("Got events for closed stream %s", fd) return try: if self._connecting: # Most IOLoops will report a write failed connect # with the WRITE event, but SelectIOLoop reports a # READ as well so we must check for connecting before # either. self._handle_connect() if self.closed(): return if events & self.io_loop.READ: self._handle_read() if self.closed(): return if events & self.io_loop.WRITE: self._handle_write() if self.closed(): return if events & self.io_loop.ERROR: self.error = self.get_fd_error() # We may have queued up a user callback in _handle_read or # _handle_write, so don"t close the IOStream until those # callbacks have had a chance to run. self.io_loop.add_callback(self.close) return state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state == self.io_loop.ERROR and self._read_buffer_size == 0: # If the connection is idle, listen for reads too so # we can tell if the connection is closed. If there is # data in the read buffer we won"t run the close callback # yet anyway, so we don"t need to listen in this case. state |= self.io_loop.READ if state != self._state: assert self._state is not None, "shouldn"t happen: _handle_events without self._state" self._state = state self.io_loop.update_handler(self.fileno(), self._state) except UnsatisfiableReadError as e: gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) except Exception: gen_log.error("Uncaught exception, closing connection.", exc_info=True) self.close(exc_info=True) raise def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) # IOLoop implementations may vary: some of them return # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. if self._connect_future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None future.set_result(self) self._connecting = False
判斷是否在連接中,當(dāng)然是了,剛才我也強調(diào)過了, 然后進(jìn)入 _handle_connect,_handle_connect 先判斷 connect 有沒成功, 成功了就是設(shè)置 _connect_future 的 result,set_result(self ),把 self(iostream )設(shè)置進(jìn)去了! 然后 _connect_future 的 callbacks 會在下一次循環(huán)被 ioloop 消化掉!! 一步步返回看,這個 future 正是我們之前的那個 yiled 操作的右邊的返回的 future , 所以剛才 _Connector.try_connect 設(shè)置的 callback ,on_connect_done 會在 ioloop 的 callback 里執(zhí)行. 根據(jù)上一篇[文章][3]講的 coroutine 的源碼得知,此 future 里還有 Runner.run 的 callback 哦~ 所以 ,run 里 send 了 vaule 到 gen . 終于終于!!程序回到了剛才的 yield 處了!!!!! tornado正是如此實現(xiàn)異步io的 感覺一直講完整個操作不太現(xiàn)實,剩下的大家還是自己跟蹤吧,道理跟這個流程類似. yield 操作右邊,一定是返回一個 future (舊版本貌似是 YieldPoint ,因為沒看過舊版,所以不太清楚) , 然后在返回 future 之前,設(shè)置好 fd 的 handler ,和其他的解析工作,然后等待 epoll 檢測到關(guān)心的 io event , 在 io 的 handler 里把 future 解決,從而回到 yield 處~ 核心就是 ioloop 三部分 ,future,gen.coroutine . 相互配合完成異步操作. 跟蹤幾遍消化一下,就可以寫 tornado 的擴展了. 祝大家武運亨通
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/37577.html
摘要:最大的特點就是其支持異步,所以它有著優(yōu)異的性能。的代碼結(jié)構(gòu)可以在其官網(wǎng)了解,本文著重分析的實現(xiàn)。事件驅(qū)動模型的大致思路的方法用于啟動事件循環(huán)。行文比較草率,如有錯誤和不足之處,敬請指正。 0. 簡介 tornado是一個用Python語言寫成的Web服務(wù)器兼Web應(yīng)用框架,由FriendFeed公司在自己的網(wǎng)站FriendFeed中使用,被Facebook收購以后框架以開源軟件形式開放...
摘要:源碼之分析的協(xié)程原理分析版本為支持異步,實現(xiàn)了一個協(xié)程庫。提供了回調(diào)函數(shù)注冊當(dāng)異步事件完成后,調(diào)用注冊的回調(diào)中間結(jié)果保存結(jié)束結(jié)果返回等功能注冊回調(diào)函數(shù),當(dāng)被解決時,改回調(diào)函數(shù)被調(diào)用。相當(dāng)于喚醒已經(jīng)處于狀態(tài)的父協(xié)程,通過回調(diào)函數(shù),再執(zhí)行。 tornado 源碼之 coroutine 分析 tornado 的協(xié)程原理分析 版本:4.3.0 為支持異步,tornado 實現(xiàn)了一個協(xié)程庫。 ...
摘要:序言最近閑暇無事閱讀了一下的源碼對整體的結(jié)構(gòu)有了初步認(rèn)識與大家分享不知道為什么右邊的目錄一直出不來非常不舒服不如移步到吧是的核心模塊也是個調(diào)度模塊各種異步事件都是由他調(diào)度的所以必須弄清他的執(zhí)行邏輯源碼分析而的核心部分則是這個循環(huán)內(nèi)部的邏輯貼 序言 最近閑暇無事,閱讀了一下tornado的源碼,對整體的結(jié)構(gòu)有了初步認(rèn)識,與大家分享 不知道為什么右邊的目錄一直出不來,非常不舒服. 不如移...
這篇文章摘自我的博客, 歡迎大家沒事去逛逛~ 背景 這幾個月我開發(fā)了公司里的一個restful webservice,起初技術(shù)選型的時候是采用了flask框架。雖然flask是一個同步的框架,但是可以配合gevent或者其它方式運行在異步的容器中(測試鏈接),效果看上去也還可以,因此就采用了這種方式。 后面閱讀了tornado的源碼,也去了解了各種協(xié)程框架以及運行的原理。總感覺flask的這種同步...
這篇文章摘自我的博客, 歡迎大家沒事去逛逛~ 背景 這幾個月我開發(fā)了公司里的一個restful webservice,起初技術(shù)選型的時候是采用了flask框架。雖然flask是一個同步的框架,但是可以配合gevent或者其它方式運行在異步的容器中(測試鏈接),效果看上去也還可以,因此就采用了這種方式。 后面閱讀了tornado的源碼,也去了解了各種協(xié)程框架以及運行的原理。總感覺flask的這種同步...
閱讀 2789·2021-11-24 09:39
閱讀 2548·2021-11-23 09:51
閱讀 1802·2021-11-17 09:33
閱讀 1737·2021-10-22 09:54
閱讀 1870·2021-08-16 11:00
閱讀 3420·2019-08-30 15:53
閱讀 1733·2019-08-30 13:19
閱讀 2902·2019-08-30 12:49