国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

tornado 源碼之 iostream.py

JeOam / 2903人閱讀

摘要:對進行包裝,采用注冊回調方式實現非阻塞。通過接口注冊各個事件回調中事件發生后,調用方法,對事件進行分發。

iostream.py
A utility class to write to and read from a non-blocking socket.

IOStream 對 socket 進行包裝,采用注冊回調方式實現非阻塞。
通過接口注冊各個事件回調

_read_callback

_write_callback

_close_callback

_connect_callback

ioloop 中 socket 事件發生后,調用 IOStream._handle_events 方法,對事件進行分發。
對應的事件處理過程中,如果滿足注冊的回調條件,則調用回調函數
回調函數在 IOStream._handle_events 中被調用

contents

iostream.py

contents

example

head

IOStream.__init__

IOStream.connect

IOStream.read_until

IOStream.read_bytes

IOStream.write

IOStream.close

IOStream._handle_events

IOStream._run_callback

IOStream._run_callback

IOStream._read_from_socket

IOStream._read_to_buffer

IOStream._read_from_buffer

IOStream._handle_connect

IOStream._handle_write

IOStream._consume

IOStream._add_io_state

IOStream._read_buffer_size

copyright

example

一個簡單的 IOStream 客戶端示例
由此可見, IOStream 是一個異步回調鏈

創建 socket

創建 IOStream 對象

連接到主機,傳入連接成功后回調函數 send_request

socket 輸出數據請求頁面,讀取 head,傳入讀取 head 成功后回調函數 on_headers

繼續讀取 body,傳入讀取 body 成功后回調函數 on_body

關閉 stream,關閉 ioloop

from tornado import ioloop
from tornado import iostream
import socket


def send_request():
    stream.write("GET / HTTP/1.0
Host: baidu.com

")
    stream.read_until("

", on_headers)


def on_headers(data):
    headers = {}
    for line in data.split("
"):
        parts = line.split(":")
        if len(parts) == 2:
            headers[parts[0].strip()] = parts[1].strip()
    stream.read_bytes(int(headers["Content-Length"]), on_body)


def on_body(data):
    print data
    stream.close()
    ioloop.IOLoop.instance().stop()


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = iostream.IOStream(s)
stream.connect(("baidu.com", 80), send_request)
ioloop.IOLoop.instance().start()


# html>
# 
# 
head
from __future__ import with_statement

import collections
import errno
import logging
import socket
import sys

from tornado import ioloop
from tornado import stack_context

try:
    import ssl # Python 2.6+
except ImportError:
    ssl = None
IOStream.__init__

包裝 socket 類
關鍵語句 self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) 將自身的_handle_events 加入到全局 ioloop poll 事件回調
此時只注冊了 ERROR 類型事件

_read_buffer: 讀緩沖

class IOStream(object):

    def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
                 read_chunk_size=4096):
        self.socket = socket
        self.socket.setblocking(False)
        self.io_loop = io_loop or ioloop.IOLoop.instance()
        self.max_buffer_size = max_buffer_size
        self.read_chunk_size = read_chunk_size
        self._read_buffer = collections.deque()
        self._write_buffer = collections.deque()
        self._write_buffer_frozen = False
        self._read_delimiter = None
        self._read_bytes = None
        self._read_callback = None
        self._write_callback = None
        self._close_callback = None
        self._connect_callback = None
        self._connecting = False
        self._state = self.io_loop.ERROR
        with stack_context.NullContext():
            self.io_loop.add_handler(
                self.socket.fileno(), self._handle_events, self._state)
IOStream.connect

連接 socket 到遠程地址,非阻塞模式

連接 socket

注冊連接完成回調

poll 增加 socket 寫事件

    def connect(self, address, callback=None):
        """Connects the socket to a remote address without blocking.

        May only be called if the socket passed to the constructor was
        not previously connected.  The address parameter is in the
        same format as for socket.connect, i.e. a (host, port) tuple.
        If callback is specified, it will be called when the
        connection is completed.

        Note that it is safe to call IOStream.write while the
        connection is pending, in which case the data will be written
        as soon as the connection is ready.  Calling IOStream read
        methods before the socket is connected works on some platforms
        but is non-portable.
        """
        self._connecting = True
        try:
            self.socket.connect(address)
        except socket.error, e:
            # In non-blocking mode connect() always raises an exception
            if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
                raise
        self._connect_callback = stack_context.wrap(callback)
        self._add_io_state(self.io_loop.WRITE)
IOStream.read_until

注冊讀完成回調

嘗試從緩沖中讀

從 socket 中讀到緩沖區

重復 2,3,沒有數據則退出

將 socket 讀事件加入 poll

如果緩存中數據滿足條件,則直接執行 callback 并返回,
否則,保存 callback 函數下次 read 事件發生時,_handle_events 處理讀事件時,再進行檢測及調用

    def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        self._read_delimiter = delimiter
        self._read_callback = stack_context.wrap(callback)
        while True:
            # See if we"ve already got the data from a previous read
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        self._add_io_state(self.io_loop.READ)
IOStream.read_bytes

參考 read_until,讀限定字節

    def read_bytes(self, num_bytes, callback):
        """Call callback when we read the given number of bytes."""
        assert not self._read_callback, "Already reading"
        if num_bytes == 0:
            callback("")
            return
        self._read_bytes = num_bytes
        self._read_callback = stack_context.wrap(callback)
        while True:
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        self._add_io_state(self.io_loop.READ)
IOStream.write
    def write(self, data, callback=None):
        """Write the given data to this stream.

        If callback is given, we call it when all of the buffered write
        data has been successfully written to the stream. If there was
        previously buffered write data and an old write callback, that
        callback is simply overwritten with this new callback.
        """
        self._check_closed()
        self._write_buffer.append(data)
        self._add_io_state(self.io_loop.WRITE)
        self._write_callback = stack_context.wrap(callback)

    def set_close_callback(self, callback):
        """Call the given callback when the stream is closed."""
        self._close_callback = stack_context.wrap(callback)
IOStream.close

從 ioloop 移除 socket 事件

關閉 socket

調用關閉回調

    def close(self):
        """Close this stream."""
        if self.socket is not None:
            self.io_loop.remove_handler(self.socket.fileno())
            self.socket.close()
            self.socket = None
            if self._close_callback:
                self._run_callback(self._close_callback)

    def reading(self):
        """Returns true if we are currently reading from the stream."""
        return self._read_callback is not None

    def writing(self):
        """Returns true if we are currently writing to the stream."""
        return bool(self._write_buffer)

    def closed(self):
        return self.socket is None
IOStream._handle_events

核心回調
任何類型的 socket 事件觸發 ioloop 回調_handle_events,然后在_handle_events 再進行分發
值得注意的是,IOStream 不處理連接請求的 read 事件
注意
作為服務端,默認代理的是已經建立連接的 socket

# HTTPServer.\_handle_events
# connection 為已經accept的連接
stream = iostream.IOStream(connection, io_loop=self.io_loop)

作為客戶端,需要手動調用 IOStream.connect,連接成功后,成功回調在 write 事件中處理

這個實現比較別扭

    def _handle_events(self, fd, events):
        if not self.socket:
            logging.warning("Got events for closed stream %d", fd)
            return
        try:
            # 處理讀事件,調用已注冊回調
            if events & self.io_loop.READ:
                self._handle_read()
            if not self.socket:
                return
            # 處理寫事件,如果是剛建立連接,調用連接建立回調
            if events & self.io_loop.WRITE:
                if self._connecting:
                    self._handle_connect()
                self._handle_write()
            if not self.socket:
                return
            # 錯誤事件,關閉 socket
            if events & self.io_loop.ERROR:
                self.close()
                return
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state != self._state:
                self._state = state
                self.io_loop.update_handler(self.socket.fileno(), self._state)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close()
            raise
IOStream._run_callback

執行回調

    def _run_callback(self, callback, *args, **kwargs):
        try:
            # Use a NullContext to ensure that all StackContexts are run
            # inside our blanket exception handler rather than outside.
            with stack_context.NullContext():
                callback(*args, **kwargs)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            # Close the socket on an uncaught exception from a user callback
            # (It would eventually get closed when the socket object is
            # gc"d, but we don"t want to rely on gc happening before we
            # run out of file descriptors)
            self.close()
            # Re-raise the exception so that IOLoop.handle_callback_exception
            # can see it and log the error
            raise
IOStream._run_callback

讀回調

從 socket 讀取數據到緩存

無數據,socket 關閉

檢測是否滿足 read_until read_bytes

滿足則執行對應回調

    def _handle_read(self):
        while True:
            try:
                # Read from the socket until we get EWOULDBLOCK or equivalent.
                # SSL sockets do some internal buffering, and if the data is
                # sitting in the SSL object"s buffer select() and friends
                # can"t see it; the only way to find out if it"s there is to
                # try to read it.
                result = self._read_to_buffer()
            except Exception:
                self.close()
                return
            if result == 0:
                break
            else:
                if self._read_from_buffer():
                    return
IOStream._read_from_socket

從 socket 讀取數據

    def _read_from_socket(self):
        """Attempts to read from the socket.

        Returns the data read or None if there is nothing to read.
        May be overridden in subclasses.
        """
        try:
            chunk = self.socket.recv(self.read_chunk_size)
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return None
            else:
                raise
        if not chunk:
            self.close()
            return None
        return chunk
IOStream._read_to_buffer

從 socket 讀取數據存入緩存

    def _read_to_buffer(self):
        """Reads from the socket and appends the result to the read buffer.

        Returns the number of bytes read.  Returns 0 if there is nothing
        to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
        error closes the socket and raises an exception.
        """
        try:
            chunk = self._read_from_socket()
        except socket.error, e:
            # ssl.SSLError is a subclass of socket.error
            logging.warning("Read error on %d: %s",
                            self.socket.fileno(), e)
            self.close()
            raise
        if chunk is None:
            return 0
        self._read_buffer.append(chunk)
        if self._read_buffer_size() >= self.max_buffer_size:
            logging.error("Reached maximum read buffer size")
            self.close()
            raise IOError("Reached maximum read buffer size")
        return len(chunk)
IOStream._read_from_buffer

從緩沖中過濾數據
檢測是否滿足結束條件(read_until/read_bytes),滿足則調用之前注冊的回調
采用的是查詢方式

    def _read_from_buffer(self):
        """Attempts to complete the currently-pending read from the buffer.

        Returns True if the read was completed.
        """
        if self._read_bytes:
            if self._read_buffer_size() >= self._read_bytes:
                num_bytes = self._read_bytes
                callback = self._read_callback
                self._read_callback = None
                self._read_bytes = None
                self._run_callback(callback, self._consume(num_bytes))
                return True
        elif self._read_delimiter:
            _merge_prefix(self._read_buffer, sys.maxint)
            loc = self._read_buffer[0].find(self._read_delimiter)
            if loc != -1:
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter)
                self._read_callback = None
                self._read_delimiter = None
                self._run_callback(callback,
                                   self._consume(loc + delimiter_len))
                return True
        return False
IOStream._handle_connect

調用連接建立回調,并清除連接中標志

    def _handle_connect(self):
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        self._connecting = False
IOStream._handle_write

寫事件

從緩沖區獲取限定范圍內數據

調用 socket.send 輸出數據

如果數據發送我且已注冊回調,調用發送完成回調

    def _handle_write(self):
        while self._write_buffer:
            try:
                if not self._write_buffer_frozen:
                    # On windows, socket.send blows up if given a
                    # write buffer that"s too large, instead of just
                    # returning the number of bytes it was able to
                    # process.  Therefore we must not call socket.send
                    # with more than 128KB at a time.
                    _merge_prefix(self._write_buffer, 128 * 1024)
                num_bytes = self.socket.send(self._write_buffer[0])
                self._write_buffer_frozen = False
                _merge_prefix(self._write_buffer, num_bytes)
                self._write_buffer.popleft()
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    # With OpenSSL, after send returns EWOULDBLOCK,
                    # the very same string object must be used on the
                    # next call to send.  Therefore we suppress
                    # merging the write buffer after an EWOULDBLOCK.
                    # A cleaner solution would be to set
                    # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
                    # not yet accessible from python
                    # (http://bugs.python.org/issue8240)
                    self._write_buffer_frozen = True
                    break
                else:
                    logging.warning("Write error on %d: %s",
                                    self.socket.fileno(), e)
                    self.close()
                    return
        if not self._write_buffer and self._write_callback:
            callback = self._write_callback
            self._write_callback = None
            self._run_callback(callback)
IOStream._consume

從讀緩存消費 loc 長度的數據

    def _consume(self, loc):
        _merge_prefix(self._read_buffer, loc)
        return self._read_buffer.popleft()

    def _check_closed(self):
        if not self.socket:
            raise IOError("Stream is closed")
IOStream._add_io_state

增加 socket 事件狀態

    def _add_io_state(self, state):
        if self.socket is None:
            # connection has been closed, so there can be no future events
            return
        if not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.socket.fileno(), self._state)
IOStream._read_buffer_size

獲取讀緩存中已有數據長度

    def _read_buffer_size(self):
        return sum(len(chunk) for chunk in self._read_buffer)


class SSLIOStream(IOStream):
    """A utility class to write to and read from a non-blocking socket.

    If the socket passed to the constructor is already connected,
    it should be wrapped with
        ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
    before constructing the SSLIOStream.  Unconnected sockets will be
    wrapped when IOStream.connect is finished.
    """
    def __init__(self, *args, **kwargs):
        """Creates an SSLIOStream.

        If a dictionary is provided as keyword argument ssl_options,
        it will be used as additional keyword arguments to ssl.wrap_socket.
        """
        self._ssl_options = kwargs.pop("ssl_options", {})
        super(SSLIOStream, self).__init__(*args, **kwargs)
        self._ssl_accepting = True
        self._handshake_reading = False
        self._handshake_writing = False

    def reading(self):
        return self._handshake_reading or super(SSLIOStream, self).reading()

    def writing(self):
        return self._handshake_writing or super(SSLIOStream, self).writing()

    def _do_ssl_handshake(self):
        # Based on code from test_ssl.py in the python stdlib
        try:
            self._handshake_reading = False
            self._handshake_writing = False
            self.socket.do_handshake()
        except ssl.SSLError, err:
            if err.args[0] == ssl.SSL_ERROR_WANT_READ:
                self._handshake_reading = True
                return
            elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
                self._handshake_writing = True
                return
            elif err.args[0] in (ssl.SSL_ERROR_EOF,
                                 ssl.SSL_ERROR_ZERO_RETURN):
                return self.close()
            elif err.args[0] == ssl.SSL_ERROR_SSL:
                logging.warning("SSL Error on %d: %s", self.socket.fileno(), err)
                return self.close()
            raise
        except socket.error, err:
            if err.args[0] == errno.ECONNABORTED:
                return self.close()
        else:
            self._ssl_accepting = False
            super(SSLIOStream, self)._handle_connect()

    def _handle_read(self):
        if self._ssl_accepting:
            self._do_ssl_handshake()
            return
        super(SSLIOStream, self)._handle_read()

    def _handle_write(self):
        if self._ssl_accepting:
            self._do_ssl_handshake()
            return
        super(SSLIOStream, self)._handle_write()

    def _handle_connect(self):
        self.socket = ssl.wrap_socket(self.socket,
                                      do_handshake_on_connect=False,
                                      **self._ssl_options)
        # Don"t call the superclass"s _handle_connect (which is responsible
        # for telling the application that the connection is complete)
        # until we"ve completed the SSL handshake (so certificates are
        # available, etc).


    def _read_from_socket(self):
        try:
            # SSLSocket objects have both a read() and recv() method,
            # while regular sockets only have recv().
            # The recv() method blocks (at least in python 2.6) if it is
            # called when there is nothing to read, so we have to use
            # read() instead.
            chunk = self.socket.read(self.read_chunk_size)
        except ssl.SSLError, e:
            # SSLError is a subclass of socket.error, so this except
            # block must come first.
            if e.args[0] == ssl.SSL_ERROR_WANT_READ:
                return None
            else:
                raise
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return None
            else:
                raise
        if not chunk:
            self.close()
            return None
        return chunk

def _merge_prefix(deque, size):
    """Replace the first entries in a deque of strings with a single
    string of up to size bytes.

    >>> d = collections.deque(["abc", "de", "fghi", "j"])
    >>> _merge_prefix(d, 5); print d
    deque(["abcde", "fghi", "j"])

    Strings will be split as necessary to reach the desired size.
    >>> _merge_prefix(d, 7); print d
    deque(["abcdefg", "hi", "j"])

    >>> _merge_prefix(d, 3); print d
    deque(["abc", "defg", "hi", "j"])

    >>> _merge_prefix(d, 100); print d
    deque(["abcdefghij"])
    """
    prefix = []
    remaining = size
    while deque and remaining > 0:
        chunk = deque.popleft()
        if len(chunk) > remaining:
            deque.appendleft(chunk[remaining:])
            chunk = chunk[:remaining]
        prefix.append(chunk)
        remaining -= len(chunk)
    deque.appendleft("".join(prefix))

def doctests():
    import doctest
    return doctest.DocTestSuite()
copyright

author:bigfish
copyright: 許可協議 知識共享署名-非商業性使用 4.0 國際許可協議

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42832.html

相關文章

  • tornado源碼解析IOLoop

    摘要:最大的特點就是其支持異步,所以它有著優異的性能。的代碼結構可以在其官網了解,本文著重分析的實現。事件驅動模型的大致思路的方法用于啟動事件循環。行文比較草率,如有錯誤和不足之處,敬請指正。 0. 簡介 tornado是一個用Python語言寫成的Web服務器兼Web應用框架,由FriendFeed公司在自己的網站FriendFeed中使用,被Facebook收購以后框架以開源軟件形式開放...

    Lsnsh 評論0 收藏0
  • Tornado Demo chatdemo 不完全解讀

    摘要:清楚了以上流程,我們直接來看函數主要用作初始化應用監聽端口以及啟動。其中就是保存聊天室所有聊天消息的結構。關于的解讀我會放到閱讀源碼時講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應用: chatdemo 首先看 chatdemo 的目錄結構: ├── chatdemo.py ├── ...

    TesterHome 評論0 收藏0
  • [零基礎學python]python開發框架

    摘要:軟件開發者通常依據特定的框架實現更為復雜的商業運用和業務邏輯。所有,做開發,要用一個框架。的性能是相當優異的,因為它師徒解決一個被稱之為問題,就是處理大于或等于一萬的并發。 One does not live by bread alone,but by every word that comes from the mouth of God --(MATTHEW4:4) 不...

    lucas 評論0 收藏0
  • tornado 源碼 coroutine 分析

    摘要:源碼之分析的協程原理分析版本為支持異步,實現了一個協程庫。提供了回調函數注冊當異步事件完成后,調用注冊的回調中間結果保存結束結果返回等功能注冊回調函數,當被解決時,改回調函數被調用。相當于喚醒已經處于狀態的父協程,通過回調函數,再執行。 tornado 源碼之 coroutine 分析 tornado 的協程原理分析 版本:4.3.0 為支持異步,tornado 實現了一個協程庫。 ...

    NicolasHe 評論0 收藏0
  • Tornado 簡單入門教程(一)——Demo1

    摘要:也就是說用于設定與處理類間的映射關系。在中,默認使用和函數分別處理兩種請求。因為表單仍提交到當前頁面,所以還是由處理。載入時間相關的的一個類,獲取當前時間戳。獲取數據庫中的名為的。 前面的話 Demo1是一個簡單的博客系統(=。=什么網站都叫系統)。我們從這個簡單的系統入手,去了解P+T+M網站的內部邏輯,并記住一些規則,方便我們進一步自己開發。 規則這個詞特意打上了雙引號,目的是...

    solocoder 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<