国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

SocketServer 源碼分析

Eric / 2387人閱讀

摘要:版權出現則重新調用注冊函數。中實例化,調用用戶定義的函數服務循環監聽端口處理請求調用監視請求,處理異常有請求進來停止循環通知外部,循環已經退出注意的用法,只設置一次,避免使用進行頻繁的設置清除。

SocketServer.py
Creating network servers.
contents

SocketServer.py

contents

file head

BaseServer

BaseServer.serve_forever

BaseServer.shutdown

BaseServer.handle_request

BaseServer._handle_request_noblock

BaseServer Overridden functions

TCPServer

UDPServer

ForkingMixIn

ThreadingMixIn

BaseRequestHandler

StreamRequestHandler

DatagramRequestHandler

版權

file head
__version__ = "0.4"


import socket
import select
import sys
import os
import errno
try:
    import threading
except ImportError:
    import dummy_threading as threading

__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
           "StreamRequestHandler","DatagramRequestHandler",
           "ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])

# 出現 EINTR 則重新調用
def _eintr_retry(func, *args):
    """restart a system call interrupted by EINTR"""
    while True:
        try:
            return func(*args)
        except (OSError, select.error) as e:
            if e.args[0] != errno.EINTR:
                raise
BaseServer

RequestHandlerClass 注冊 handle 函數。
finish_request 中實例化,調用用戶定義的 handle 函數

class BaseServer:
    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass
BaseServer.serve_forever

服務循環

監聽端口

處理請求

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # 調用 select 監視請求,處理 EINTR 異常
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                # 有請求進來
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
BaseServer.shutdown

停止 serve_forever 循環.
__is_shut_down 通知外部,循環已經退出
注意 threading.Event() 的用法,只設置一次,避免使用 Event 進行頻繁的設置/清除。
需要在與 serve_forever 不同的線程中調用.
因為調用 shutdown 后需要 wait 信號量,程序會 block,block 后 serve_forever 無法執行
serve_forever 收到請求后才能退出設置信號量

注意
self.__shutdown_request 的讀寫操作,屬于原子操作,在多線程中使用是安全的

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()
BaseServer.handle_request

和 serve_forever 并列的函數
如果不調用 server_forever, 在外面循環調用 handle_request

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.

        # 如果用戶使用 socket.settimeout() 設置了超時時間,則選取一個小的
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        # select,監聽連接,會阻塞直到超時
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        # 處理請求
        self._handle_request_noblock()
BaseServer._handle_request_noblock

真正的請求處理函數

get_request: 接收請求 accept

verify_request: 驗證,做一些驗證工作,比如 ip 過濾

process_request: 處理請求,子類重寫該方法后,需要 調用 SocketServer.BaseServer.process_request,

BaseServer.process_request 中有 BaseRequestHandler 的回調動作,實例化用戶定義的 handler, __init__ 中完成對 handle() 的調用

shutdown_reques: 關閉連接

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            # 接收請求
            # get_request 由子類實現,一般為接收請求,返回 socket
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
        else:
            self.shutdown_request(request)
BaseServer Overridden functions
    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print "-"*40
        print "Exception happened during processing of request from",
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print "-"*40
TCPServer

shutdown_request 先調用 socket.shutdown 后調用 socket.close

close()releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, callshutdown() beforeclose().

Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. Ifhow is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half (e.g. on Mac OS X, shutdown(SHUT_WR) does not allow further reads on the other end of the connection).

class TCPServer(BaseServer):

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    # 調用 shutdown 后調用 close,立即關閉并釋放資源
    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()
UDPServer

UDPServer get_request 返回的是一個 (data, socket) 的 tuple,而 TCPServer 返回的是 socket
handle 中要區分處理
msg, sock = self.request
msg 已經獲取,無需額外 recv

對于數據的傳送, 你應該使用 socket 的 sendto() 和 recvfrom() 方法。 盡管傳統的 send() 和 recv() 也可以達到同樣的效果, 但是前面的兩個方法對于 UDP 連接而言更普遍。  
from python3-cookbook
from SocketServer import BaseRequestHandler, UDPServer
import time

class TimeHandler(BaseRequestHandler):
    def handle(self):
        print("Got connection from", self.client_address)
        # Get message and client socket
        msg, sock = self.request
        resp = time.ctime()
        sock.sendto(resp.encode("ascii"), self.client_address)

if __name__ == "__main__":
    serv = UDPServer(("", 20000), TimeHandler)
    serv.serve_forever()

#-----------------------------
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b"", ("localhost", 20000))
0
>>> s.recvfrom(8192)
("Thu Dec 20 10:01:01 2018", ("127.0.0.1", 20000))
class UDPServer(TCPServer):

    """UDP server class."""

    allow_reuse_address = False

    socket_type = socket.SOCK_DGRAM

    max_packet_size = 8192

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request):
        # No need to shutdown anything.
        self.close_request(request)

    def close_request(self, request):
        # No need to close anything.
        pass
ForkingMixIn

典型的 fork 使用,這里我們能看到 fork 多進程的典型使用

限定最大進程數,保證系統資源不至于耗盡

父進程 wait defunct 進程

fork 后父進程返回

子進程處理請求后 _exit()

class ForkingMixIn:

    """Mix-in class to handle each request in a new process."""

    timeout = 300
    active_children = None
    max_children = 40

    def collect_children(self):
        """Internal routine to wait for children that have exited."""
        if self.active_children is None:
            return

        while len(self.active_children) >= self.max_children:
            try:
                pid, _ = os.waitpid(-1, 0)
                self.active_children.discard(pid)
            except OSError as e:
                if e.errno == errno.ECHILD:
                    # we don"t have any children, we"re done
                    self.active_children.clear()
                elif e.errno != errno.EINTR:
                    break

        # Now reap all defunct children.
        for pid in self.active_children.copy():
            try:
                pid, _ = os.waitpid(pid, os.WNOHANG)
                # if the child hasn"t exited yet, pid will be 0 and ignored by
                # discard() below
                self.active_children.discard(pid)
            except OSError as e:
                if e.errno == errno.ECHILD:
                    # someone else reaped it
                    self.active_children.discard(pid)

    def handle_timeout(self):
        """Wait for zombies after self.timeout seconds of inactivity.

        May be extended, do not override.
        """
        self.collect_children()

    def process_request(self, request, client_address):
        """Fork a new subprocess to process the request."""
        self.collect_children()
        pid = os.fork()
        if pid:
            # Parent process
            if self.active_children is None:
                self.active_children = set()
            self.active_children.add(pid)
            self.close_request(request) #close handle in parent process
            return
        else:
            # Child process.
            # This must never return, hence os._exit()!
            try:
                self.finish_request(request, client_address)
                self.shutdown_request(request)
                os._exit(0)
            except:
                try:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
                finally:
                    os._exit(1)
ThreadingMixIn

ThreadingMixIn 重載了 process_request 函數

創建一個線程

在線程中處理請求

啟動線程

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

if hasattr(socket, "AF_UNIX"):

    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX

    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX

    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass

    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
BaseRequestHandler

基礎請求類,對外提供三個接口

setup()

handle()

finish()

使用時繼承該類,通過 BaseServer 注冊
BaseServer.finish_request 中實例化 BaseRequestHandler 類,在 __init__函數調用中完成繼承類重載的 handle() 接口的調用

class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass
StreamRequestHandler

提供文件操作接口

class StreamRequestHandler(BaseRequestHandler):

    """Define self.rfile and self.wfile for stream sockets."""

    # Default buffer sizes for rfile, wfile.
    # We default rfile to buffered because otherwise it could be
    # really slow for large data (a getc() call per byte); we make
    # wfile unbuffered because (a) often after a write() we want to
    # read and we need to flush the line; (b) big writes to unbuffered
    # files are typically optimized by stdio even when big reads
    # aren"t.
    rbufsize = -1
    wbufsize = 0

    # A timeout to apply to the request socket, if not None.
    timeout = None

    # Disable nagle algorithm for this socket, if True.
    # Use only when wbufsize != 0, to avoid small packets.
    disable_nagle_algorithm = False

    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile("rb", self.rbufsize)
        self.wfile = self.connection.makefile("wb", self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # A final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()
DatagramRequestHandler
class DatagramRequestHandler(BaseRequestHandler):

    """Define self.rfile and self.wfile for datagram sockets."""

    def setup(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        self.packet, self.socket = self.request
        self.rfile = StringIO(self.packet)
        self.wfile = StringIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)
版權

作者:bigfish
許可協議:許可協議 知識共享署名-非商業性使用 4.0 國際許可協議

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42860.html

相關文章

  • flask源碼分析,run函數啟動分析

    摘要:對背后運行機制感興趣,參考網上資料,結合源碼分析函數運行時的機制,主要整理出函數調用棧。以分析首先官方文檔經典示例現在來分析啟動時發生了什么代碼只列出用到的函數,去掉注釋等函數導入運行函數主要運行調用返回類,然后調用返回類的。 對flask背后運行機制感興趣,參考網上資料,結合源碼分析run函數運行時的機制,主要整理出函數調用棧。以flask0.1分析 首先Flask官方文檔經典示例 ...

    Tony 評論0 收藏0
  • flask源碼走讀

    摘要:另外,如果你對模板渲染部分的內容感興趣,也可以考慮閱讀文檔文檔文檔源碼閱讀,可以參考下面的函數打斷點,再測試一個請求,理清過程。 Flask-Origin 源碼版本 一直想好好理一下flask的實現,這個項目有Flask 0.1版本源碼并加了注解,挺清晰明了的,我在其基礎上完成了對Werkzeug的理解部分,大家如果想深入學習的話,可以參考werkzeug_flow.md. 閱讀前 為...

    Coly 評論0 收藏0
  • 對python socket編程的初探

    摘要:對于網絡編程來說,免不了要用到模塊。表示另一端的地址。以上主要是針對流數據的編程。對于協議的數據,處理略有不同。通過傳入對象調用來監聽對象的文件描述符,一旦發現對象就緒,就通知應用程序進行相應的讀寫操作。 對于python網絡編程來說,免不了要用到socket模塊。下面分享一下個人對python socket的一些理解。 socket編程步驟 服務端創建一個socket,綁定地址和端...

    stormgens 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<