摘要:上一篇我們介紹了包,以及如何使用異步編程管理網絡應用中的高并發。倒排索引保存在本地一個名為的文件中。運行示例如下這個模塊沒有使用并發,主要作用是為使用包編寫的服務器提供支持。
async/await語法asyncio 上一篇我們介紹了 asyncio 包,以及如何使用異步編程管理網絡應用中的高并發。在這一篇,我們主要介紹使用 asyncio 包編程的兩個例子。
我們先介紹下 async/await 語法,要不然看完這篇可能會困惑,為什么之前使用 asyncio.coroutine 裝飾器 和 yield from,這里都是 用的 async 和 await?
python并發2:使用asyncio處理并發
async/await 是Python3.5 的新語法,語法如下:
async def read_data(db): pass
async 是明確將函數聲明為協程的關鍵字,即使沒有await表達式,函數執行也會返回一個協程對象。
在協程函數內部,可以在某個表達式之前使用 await 關鍵字來暫停協程的執行,以等待某協程完成:
async def read_data(db): data = await db.fetch("SELECT ...")
這個代碼如果使用 asyncio.coroutine 裝飾器語法為:
@asyncio.coroutine def read_data(db): data = yield from db.fetch("SELECT ...")
這兩段代碼執行的結果是一樣的,也就是說 可以把 asyncio.coroutine 替換為 async, yield from 替換為 await。
使用新的語法有什么好處呢:
使生成器和協程的概念更容易理解,因為語法不同
可以消除由于重構時不小心移出協程中yield 聲明而導致的不明確錯誤,這回導致協程變成普通的生成器。
使用 asyncio 包編寫服務器這個例子主要是使用 asyncio 包 和 unicodedata 模塊,實現通過規范名稱查找Unicode 字符。
我們先來看一下代碼:
# charfinder.py import sys import re import unicodedata import pickle import warnings import itertools import functools from collections import namedtuple RE_WORD = re.compile("w+") RE_UNICODE_NAME = re.compile("^[A-Z0-9 -]+$") RE_CODEPOINT = re.compile("U+[0-9A-F]{4, 6}") INDEX_NAME = "charfinder_index.pickle" MINIMUM_SAVE_LEN = 10000 CJK_UNI_PREFIX = "CJK UNIFIED IDEOGRAPH" CJK_CMP_PREFIX = "CJK COMPATIBILITY IDEOGRAPH" sample_chars = [ "$", # DOLLAR SIGN "A", # LATIN CAPITAL LETTER A "a", # LATIN SMALL LETTER A "u20a0", # EURO-CURRENCY SIGN "u20ac", # EURO SIGN ] CharDescription = namedtuple("CharDescription", "code_str char name") QueryResult = namedtuple("QueryResult", "count items") def tokenize(text): """ :param text: :return: return iterable of uppercased words """ for match in RE_WORD.finditer(text): yield match.group().upper() def query_type(text): text_upper = text.upper() if "U+" in text_upper: return "CODEPOINT" elif RE_UNICODE_NAME.match(text_upper): return "NAME" else: return "CHARACTERS" class UnicodeNameIndex: # unicode name 索引類 def __init__(self, chars=None): self.load(chars) def load(self, chars=None): # 加載 unicode name self.index = None if chars is None: try: with open(INDEX_NAME, "rb") as fp: self.index = pickle.load(fp) except OSError: pass if self.index is None: self.build_index(chars) if len(self.index) > MINIMUM_SAVE_LEN: try: self.save() except OSError as exc: warnings.warn("Could not save {!r}: {}" .format(INDEX_NAME, exc)) def save(self): with open(INDEX_NAME, "wb") as fp: pickle.dump(self.index, fp) def build_index(self, chars=None): if chars is None: chars = (chr(i) for i in range(32, sys.maxunicode)) index = {} for char in chars: try: name = unicodedata.name(char) except ValueError: continue if name.startswith(CJK_UNI_PREFIX): name = CJK_UNI_PREFIX elif name.startswith(CJK_CMP_PREFIX): name = CJK_CMP_PREFIX for word in tokenize(name): index.setdefault(word, set()).add(char) self.index = index def word_rank(self, top=None): # (len(self.index[key], key) 是一個生成器,需要用list 轉成列表,要不然下邊排序會報錯 res = [list((len(self.index[key], key)) for key in self.index)] res.sort(key=lambda item: (-item[0], item[1])) if top is not None: res = res[:top] return res def word_report(self, top=None): for postings, key in self.word_rank(top): print("{:5} {}".format(postings, key)) def find_chars(self, query, start=0, stop=None): stop = sys.maxsize if stop is None else stop result_sets = [] for word in tokenize(query): # tokenize 是query 的生成器 a b 會是 ["a", "b"] 的生成器 chars = self.index.get(word) if chars is None: result_sets = [] break result_sets.append(chars) if not result_sets: return QueryResult(0, ()) result = functools.reduce(set.intersection, result_sets) result = sorted(result) # must sort to support start, stop result_iter = itertools.islice(result, start, stop) return QueryResult(len(result), (char for char in result_iter)) def describe(self, char): code_str = "U+{:04X}".format(ord(char)) name = unicodedata.name(char) return CharDescription(code_str, char, name) def find_descriptions(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe(char) def get_descriptions(self, chars): for char in chars: yield self.describe(char) def describe_str(self, char): return "{:7} {} {}".format(*self.describe(char)) def find_description_strs(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe_str(char) @staticmethod # not an instance method due to concurrency def status(query, counter): if counter == 0: msg = "No match" elif counter == 1: msg = "1 match" else: msg = "{} matches".format(counter) return "{} for {!r}".format(msg, query) def main(*args): index = UnicodeNameIndex() query = " ".join(args) n = 0 for n, line in enumerate(index.find_description_strs(query), 1): print(line) print("({})".format(index.status(query, n))) if __name__ == "__main__": if len(sys.argv) > 1: main(*sys.argv[1:]) else: print("Usage: {} word1 [word2]...".format(sys.argv[0]))
這個模塊讀取Python內建的Unicode數據庫,為每個字符名稱中的每個單詞建立索引,然后倒排索引,存入一個字典。
例如,在倒排索引中,"SUN" 鍵對應的條目是一個集合,里面是名稱中包含"SUN" 這個詞的10個Unicode字符。倒排索引保存在本地一個名為charfinder_index.pickle 的文件中。如果查詢多個單詞,會計算從索引中所得集合的交集。
運行示例如下:
>>> main("rook") # doctest: +NORMALIZE_WHITESPACE U+2656 ? WHITE CHESS ROOK U+265C ? BLACK CHESS ROOK (2 matches for "rook") >>> main("rook", "black") # doctest: +NORMALIZE_WHITESPACE U+265C ? BLACK CHESS ROOK (1 match for "rook black") >>> main("white bishop") # doctest: +NORMALIZE_WHITESPACE U+2657 ? WHITE CHESS BISHOP (1 match for "white bishop") >>> main("jabberwocky"s vest") (No match for "jabberwocky"s vest")
這個模塊沒有使用并發,主要作用是為使用 asyncio 包編寫的服務器提供支持。
下面我們來看下 tcp_charfinder.py 腳本:
# tcp_charfinder.py import sys import asyncio # 用于構建索引,提供查詢方法 from charfinder import UnicodeNameIndex CRLF = b" " PROMPT = b"?> " # 實例化UnicodeNameIndex 類,它會使用charfinder_index.pickle 文件 index = UnicodeNameIndex() async def handle_queries(reader, writer): # 這個協程要傳給asyncio.start_server 函數,接收的兩個參數是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象 while True: # 這個循環處理會話,直到從客戶端收到控制字符后退出 writer.write(PROMPT) # can"t await! # 這個方法不是協程,只是普通函數;這一行發送 ?> 提示符 await writer.drain() # must await! # 這個方法刷新writer 緩沖;因為它是協程,所以要用 await data = await reader.readline() # 這個方法也是協程,返回一個bytes對象,也要用await try: query = data.decode().strip() except UnicodeDecodeError: # Telenet 客戶端發送控制字符時,可能會拋出UnicodeDecodeError異常 # 我們這里默認發送空字符 query = "x00" client = writer.get_extra_info("peername") # 返回套接字連接的遠程地址 print("Received from {}: {!r}".format(client, query)) # 在控制臺打印查詢記錄 if query: if ord(query[:1]) < 32: # 如果收到控制字符或者空字符,退出循環 break # 返回一個生成器,產出包含Unicode 碼位、真正的字符和字符名稱的字符串 lines = list(index.find_description_strs(query)) if lines: # 使用默認的UTF-8 編碼把lines 轉換成bytes 對象,并在每一行末添加回車符合換行符 # 參數列表是一個生成器 writer.writelines(line.encode() + CRLF for line in lines) writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態 await writer.drain() # 刷新輸出緩沖 print("Sent {} results".format(len(lines))) # 在服務器控制臺記錄響應 print("Close the client socket") # 在控制臺記錄會話結束 writer.close() # 關閉StreamWriter流 def main(address="127.0.0.1", port=2323): # 添加默認地址和端口,所以調用默認可以不加參數 port = int(port) loop = asyncio.get_event_loop() # asyncio.start_server 協程運行結束后, # 返回的協程對象返回一個asyncio.Server 實例,即一個TCP套接字服務器 server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro) # 驅動server_coro 協程,啟動服務器 host = server.sockets[0].getsockname() # 獲得這個服務器的第一個套接字的地址和端口 print("Serving on {}. Hit CTRL-C to stop.".format(host)) # 在控制臺中顯示地址和端口 try: loop.run_forever() # 運行事件循環 main 函數在這里阻塞,直到服務器的控制臺中按CTRL-C 鍵 except KeyboardInterrupt: # CTRL+C pressed pass print("Server shutting down.") server.close() # server.wait_closed返回一個 future # 調用loop.run_until_complete 方法,運行 future loop.run_until_complete(server.wait_closed()) loop.close() # 終止事件循環 if __name__ == "__main__": main(*sys.argv[1:])
運行 tcp_charfinders.py
python tcp_charfinders.py
打開終端,使用 telnet 命令請求服務,運行結果如下所示:
main 函數幾乎會立即顯示 Serving on... 消息,然后在調用loop.run_forever() 方法時阻塞。這時,控制權流動到事件循環中,而且一直等待,偶爾會回到handle_queries 協程,這個協程需要等待網絡發送或接收數據時,控制權又交給事件循環。
handle_queries 協程可以處理多個客戶端發來的多次請求。只要有新客戶端連接服務器,就會啟動一個handle_queries 協程實例。
handle_queries 的I/O操作都是使用bytes格式。我們從網絡得到的數據要解碼,發出去的數據也要編碼
asyncio包提供了高層的流API,提供了現成的服務器,我們只需要實現一個處理程序。詳細信息可以查看文檔:https://docs.python.org/3/library/asyncio-stream.html
雖然,asyncio包提供了服務器,但是功能相對來說還是比較簡陋的,現在我們使用一下 基于asyncio包的 web 框架 sanci,用它來實現一個http版的簡易服務器
使用 sanic 包編寫web 服務器sanic 的簡單入門在上一篇文章有介紹,python web 框架 Sanci 快速入門
Sanic 是一個和類Flask 的基于Python3.5+的web框架,提供了比較高階的API,比如路由、request參數,response等,我們只需要實現處理邏輯即可。
下邊是使用 sanic 實現的簡易的 字符查詢http web 服務:
from sanic import Sanic from sanic import response from charfinder import UnicodeNameIndex app = Sanic() index = UnicodeNameIndex() html_temp = "{char}
" @app.route("/charfinder") # app.route 函數的第一個參數是url path,我們這里指定路徑是charfinder async def charfinder(request): # request.args 可以取到url 的查詢參數 # ?key1=value1&key2=value2 的結果是 {"key1": ["value1"], "key2": ["value2"]} # 我們這里支持傳入多個查詢參數,所以這里使用 request.args.getlist("char") # 如果我們 使用 request.args.get("char") 只能取到第一個參數 query = request.args.getlist("char") query = " ".join(query) lines = list(index.find_description_strs(query)) # 將得到的結果生成html html = " ".join([html_temp.format(char=line) for line in lines]) return response.html(html) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000) # 設置服務器運行地址和端口號
對比兩段代碼可以發現,使用 sanic 非常簡單。
運行服務:
python http_charsfinder.py
我們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinde... 結果示例如下
現在對比下兩段代碼在TCP 的示例中,服務器通過main函數下的這兩行代碼創建并排定運行時間:
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro)
而在sanic的HTTP示例中,使用,創建服務器:
app.run(host="0.0.0.0", port=8000)
這兩個看起來運行方式完全不同,但如果我們翻開sanic的源碼會看到 app.run() 內部是調用 的 server_coroutine = loop.create_server()創建服務器,
server_coroutine 是通過 loop.run_until_complete()驅動的。
所以說,為了啟動服務器,這兩個都是由 loop.run_until_complete 驅動,完成運行的。只不過 sanic 封裝了run 方法,使得使用更加方便。
這里可以得到一個基本事實:只有驅動協程,協程才能做事,而驅動 asyncio.coroutine 裝飾的協程有兩種方式,使用 yield from 或者傳給asyncio 包中某個參數為協程或future的函數,例如 run_until_complete
現在如果你搜索 cjk,會得到7萬多條數據3M 的一個html文件,耗時大約2s,這如果是生產服務的一個請求,耗時2s是不能接收的,我們可以使用分頁,這樣我們可以每次只取200條數據,當用戶想看更多數據時再使用 ajax 或者 websockets發送下一批數據。
這一篇我們使用 asyncio 包實現了TCP服務器,使用sanic(基于asyncio sanic 默認使用 uvloop替代asyncio)實現了HTTP服務器,用于按名稱搜索Unicode 字符。但是并沒有涉及服務器并發部分,這部分可以以后再討論。
參考鏈接這一篇還是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python并發的第三篇,《使用線程處理并發》。
Python 3.5將支持Async/Await異步編程:http://www.infoq.com/cn/news/2015/05/python-async-await
python web 框架 Sanci 快速入門
python并發2:使用asyncio處理并發
最后,感謝女朋友支持。
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38648.html
摘要:并發用于制定方案,用來解決可能但未必并行的問題。在協程中使用需要注意兩點使用鏈接的多個協程最終必須由不是協程的調用方驅動,調用方顯式或隱式在最外層委派生成器上調用函數或方法。對象可以取消取消后會在協程當前暫停的處拋出異常。 導語:本文章記錄了本人在學習Python基礎之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、了解asyncio...
摘要:具有以下基本同步原語子進程提供了通過創建和管理子進程的。雖然隊列不是線程安全的,但它們被設計為專門用于代碼。表示異步操作的最終結果。 Python的asyncio是使用 async/await 語法編寫并發代碼的標準庫。通過上一節的講解,我們了解了它不斷變化的發展歷史。到了Python最新穩定版 3.7 這個版本,asyncio又做了比較大的調整,把這個庫的API分為了 高層級API和...
摘要:是之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。沒有能從外部終止線程,因為線程隨時可能被中斷。上一篇并發使用處理并發我們介紹過的,在中,只是調度執行某物的結果。 asyncio asyncio 是Python3.4 之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。asyncio 包在引入標準庫之前代號 Tulip(郁金香),所以在網上搜索資料時,會經常看到這種花的...
摘要:我們以請求網絡服務為例,來實際測試一下加入多線程之后的效果。所以,執行密集型操作時,多線程是有用的,對于密集型操作,則每次只能使用一個線程。說到這里,對于密集型,可以使用多線程或者多進程來提高效率。 為了提高系統密集型運算的效率,我們常常會使用到多個進程或者是多個線程,python中的Threading包實現了線程,multiprocessing 包則實現了多進程。而在3.2版本的py...
摘要:創建第一個協程推薦使用語法來聲明協程,來編寫異步應用程序。協程兩個緊密相關的概念是協程函數通過定義的函數協程對象調用協程函數返回的對象。它是一個低層級的可等待對象,表示一個異步操作的最終結果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...
閱讀 2986·2020-01-08 12:17
閱讀 1991·2019-08-30 15:54
閱讀 1152·2019-08-30 15:52
閱讀 2033·2019-08-29 17:18
閱讀 1042·2019-08-29 15:34
閱讀 2460·2019-08-27 10:58
閱讀 1861·2019-08-26 12:24
閱讀 368·2019-08-23 18:23