国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

tornado 源碼閱讀-初步認識

2450184176 / 3245人閱讀

摘要:序言最近閑暇無事閱讀了一下的源碼對整體的結構有了初步認識與大家分享不知道為什么右邊的目錄一直出不來非常不舒服不如移步到吧是的核心模塊也是個調度模塊各種異步事件都是由他調度的所以必須弄清他的執行邏輯源碼分析而的核心部分則是這個循環內部的邏輯貼

序言
最近閑暇無事,閱讀了一下tornado的源碼,對整體的結構有了初步認識,與大家分享
不知道為什么右邊的目錄一直出不來,非常不舒服.
不如移步到oschina吧....[http://my.oschina.net/abc2001x/blog/476349][1]
ioloop
`ioloop`是`tornado`的核心模塊,也是個調度模塊,各種異步事件都是由他調度的,所以必須弄清他的執行邏輯
源碼分析
而`ioloop`的核心部分則是 `while True`這個循環內部的邏輯,貼上他的代碼如下
   def start(self):
        if self._running:
            raise RuntimeError("IOLoop is already running")
        self._setup_logging()
        if self._stopped:
            self._stopped = False
            return
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()
        self._running = True

        old_wakeup_fd = None
        if hasattr(signal, "set_wakeup_fd") and os.name == "posix":

            try:
                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                if old_wakeup_fd != -1:

                    signal.set_wakeup_fd(old_wakeup_fd)
                    old_wakeup_fd = None
            except ValueError:

                old_wakeup_fd = None

        try:
            while True:

                with self._callback_lock:
                    callbacks = self._callbacks
                    self._callbacks = []

                due_timeouts = []

                if self._timeouts:
                    now = self.time()
                    while self._timeouts:
                        if self._timeouts[0].callback is None:

                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    if (self._cancellations > 512
                            and self._cancellations > (len(self._timeouts) >> 1)):
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)

                for callback in callbacks:
                    self._run_callback(callback)
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)

                callbacks = callback = due_timeouts = timeout = None

                if self._callbacks:

                    poll_timeout = 0.0
                elif self._timeouts:

                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:

                    poll_timeout = _POLL_TIMEOUT

                if not self._running:
                    break

                if self._blocking_signal_threshold is not None:

                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:

                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        fd_obj, handler_func = self._handlers[fd]
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:

                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:

            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)
除去注釋,代碼其實沒多少行. 由while 內部代碼可以看出ioloop主要由三部分組成:
1.回調 callbacks

他是ioloop回調的基礎部分,通過IOLoop.instance().add_callback()添加到self._callbacks
他們將在每一次loop中被運行.

主要用途是將邏輯分塊,在適合時機將包裝好的callback添加到self._callbacks讓其執行.

例如ioloop中的add_future

def add_future(self, future, callback):
        """Schedules a callback on the ``IOLoop`` when the given
        `.Future` is finished.

        The callback is invoked with one argument, the
        `.Future`.
        """
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))

future對象得到result的時候會調用future.add_done_callback添加的callback,再將其轉至ioloop執行

2.定時器 due_timeouts

這是定時器,在指定的事件執行callback.
跟1中的callback類似,通過IOLoop.instance().add_callback

在每一次循環,會計算timeouts回調列表里的事件,運行已到期的callback.
當然不是無節操的循環.

因為poll操作會阻塞到有io操作發生,所以只要計算最近的timeout,
然后用這個時間作為self._impl.poll(poll_timeout)poll_timeout ,
就可以達到按時運行了

但是,假設poll_timeout的時間很大時,self._impl.poll一直在堵塞中(沒有io事件,但在處理某一個io事件),
那添加剛才1中的callback不是要等很久才會被運行嗎? 答案當然是不會.
ioloop中有個waker對象,他是由兩個fd組成,一個讀一個寫.
ioloop在初始化的時候把waker綁定到epoll里了,add_callback時會觸發waker的讀寫.
這樣ioloop就會在poll中被喚醒了,接著就可以及時處理timeout callback

用這樣的方式也可以自己封裝一個小的定時器功能玩玩

3.io事件的event loop

處理epoll事件的功能
通過IOLoop.instance().add_handler(fd, handler, events)綁定fd event的處理事件
httpserver.listen的代碼內,
netutil.py中的netutil.pyadd_accept_handler綁定accept handler處理客戶端接入的邏輯

如法炮制,其他的io事件也這樣綁定,業務邏輯的分塊交由ioloopcallbackfuture處理

關于epoll的用法的內容.詳情見我第一篇文章吧,哈哈

總結

ioloop由callback(業務分塊), timeout callback(定時任務) io event(io傳輸和解析) 三塊組成,互相配合完成異步的功能,構建gen,httpclient,iostream等功能

串聯大致的流程是,tornado 綁定io event,處理io傳輸解析,傳輸完成后(結合Future)回調(callback)業務處理的邏輯和一些固定操作 . 定時器則是較為獨立的模塊

Futrue

個人認為Futuretornado僅此ioloop重要的模塊,他貫穿全文,所有異步操作都有他的身影
顧名思義,他主要是關注日后要做的事,類似jqueryDeferred

一般的用法是通過ioloopadd_future定義futuredone callback,
futureset_result的時候,futuredone callback就會被調用.
從而完成Future的功能.

具體可以參考gen.coroutine的實現,本文后面也會講到

他的組成不復雜,只有幾個重要的方法
最重要的是 add_done_callback , set_result

tornadoFutureioloop,yield實現了gen.coroutine

1. add_done_callback

ioloopcallback類似 , 存儲事件完成后的callbackself._callbacks

def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)
2.set_result

設置事件的結果,并運行之前存儲好的callback

def set_result(self, result):
        self._result = result
        self._set_done()

def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception("Exception in callback %r for %r",
                                  cb, self)
        self._callbacks = None

為了驗證之前所說的,上一段測試代碼

#! /usr/bin/env python
#coding=utf-8

import tornado.web
import tornado.ioloop

from tornado.gen import coroutine
from tornado.concurrent import Future


def test():
    def pp(s):
        print s

    future = Future()
    iol = tornado.ioloop.IOLoop.instance()

    print "init future %s"%future

    iol.add_future(future, lambda f: pp("ioloop callback after future done,future is %s"%f))

    #模擬io延遲操作
    iol.add_timeout(iol.time()+5,lambda:future.set_result("set future is done"))

    print "init complete"
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    test()  

運行結果:

gen.coroutine

接著繼續延伸,看看coroutine的實現
gen.coroutine實現的功能其實是將原來的callback的寫法,用yield的寫法代替. 即以yield為分界,將代碼分成兩部分.
如:

#! /usr/bin/env python
#coding=utf-8

import tornado.ioloop
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient

@coroutine
def cotest():
    client = AsyncHTTPClient()
    res = yield client.fetch("http://www.segmentfault.com/")
    print res

if __name__ == "__main__":
    f = cotest()    
    print f #這里返回了一個future哦
    tornado.ioloop.IOLoop.instance().start()

運行結果:

源碼分析

接下來分析下coroutine的實現

def _make_coroutine_wrapper(func, replace_callback):

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()

        if replace_callback and "callback" in kwargs:
            callback = kwargs.pop("callback")
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = getattr(e, "value", None)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                "stack_context inconsistency (probably caused "
                                "by yield within a "with StackContext" block)"))
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, "value", None))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

如源碼所示,func運行的結果是GeneratorType ,yielded = next(result),
運行至原函數的yield位置,返回的是原函數func內部 yield 右邊返回的對象(必須是FutureFuturelist)給yielded.
經過Runner(result, future, yielded) 對yielded進行處理.
在此就 貼出Runner的代碼了.
Runner初始化過程,調用handle_yield, 查看yielded是否已done了,否則add_future運行Runnerrun方法,
run方法中如果yielded對象已完成,用對它的gen調用send,發送完成的結果.
所以yielded在什么地方被set_result非常重要,
當被set_result的時候,才會send結果給原func,完成整個異步操作

詳情可以查看tornado 中重要的對象 iostream,源碼中iostream的 _handle_connect,如此設置了連接的result.

def _handle_connect(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            self.error = socket.error(err, os.strerror(err))
            if self._connect_future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), errno.errorcode[err])
            self.close()
            return
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        if self._connect_future is not None:
            future = self._connect_future
            self._connect_future = None
            future.set_result(self)
        self._connecting = False

最后貼上一個簡單的測試代碼,演示coroutine,future的用法

import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future

@coroutine
def asyn_sum(a, b):
    print("begin calculate:sum %d+%d"%(a,b))
    future = Future()
    future2 = Future()
    iol = tornado.ioloop.IOLoop.instance()

    print future

    def callback(a, b):
        print("calculating the sum of %d+%d:"%(a,b))
        future.set_result(a+b)

        iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2)
    iol.add_timeout(iol.time()+3,callback, a, b)

    result = yield future

    print("after yielded")
    print("the %d+%d=%d"%(a, b, result))

    yield future2

    print "after future2"

def main():
    f =  asyn_sum(2,3)

    print ""
    print f
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

運行結果:

為什么代碼中個yield都起作用了? 因為Runner.run里,最后繼續用handle_yield處理了send后返回的yielded對象,意思是func里可以有n干個yield操作

if not self.handle_yield(yielded):
                    return
總結

至此,已完成tornado中重要的幾個模塊的流程,其他模塊也是由此而來.寫了這么多,越寫越卡,就到此為止先吧,

最后的最后的最后

啊~~~~~~好想有份工作女朋友啊~~~~~

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/37553.html

相關文章

  • SegmentFault 技術周刊 Vol.30 - 學習 Python 來做一些神奇好玩的事情吧

    摘要:學習筆記七數學形態學關注的是圖像中的形狀,它提供了一些方法用于檢測形狀和改變形狀。學習筆記十一尺度不變特征變換,簡稱是圖像局部特征提取的現代方法基于區域圖像塊的分析。本文的目的是簡明扼要地說明的編碼機制,并給出一些建議。 showImg(https://segmentfault.com/img/bVRJbz?w=900&h=385); 前言 開始之前,我們先來看這樣一個提問: pyth...

    lifesimple 評論0 收藏0
  • [零基礎學python]python開發框架

    摘要:軟件開發者通常依據特定的框架實現更為復雜的商業運用和業務邏輯。所有,做開發,要用一個框架。的性能是相當優異的,因為它師徒解決一個被稱之為問題,就是處理大于或等于一萬的并發。 One does not live by bread alone,but by every word that comes from the mouth of God --(MATTHEW4:4) 不...

    lucas 評論0 收藏0
  • 記一次tornado QPS 優化

    摘要:初步分析提升可從兩方面入手,一個是增加并發數,其二是減少平均響應時間。大部分的時間花在系統與數據庫的交互上,到這,便有了一個優化的主題思路最大限度的降低平均響應時間。不要輕易否定一項公認的技術真理,要拿數據說話。 本文最早發表于個人博客:PylixmWiki 應項目的需求,我們使用tornado開發了一個api系統,系統開發完后,在8核16G的虛機上經過壓測qps只有200+。與我們當...

    Doyle 評論0 收藏0
  • [零基礎學python]探析get和post方法

    摘要:特別提醒,看官不要自宮,因為本教程不是辟邪劍譜,也不是葵花寶典,撰寫本課程的人更是生理健全者。直到目前,科學上尚未有證實或證偽自宮和寫程序之間是否存在某種因果關系。和是中用的最多的方法啦。 Do not store up for yourselves treasures on earth, where moth and rust consume and where thieves...

    AaronYuan 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<