摘要:用于注冊信號,以及連接路由等。驗證配置文件是個好主意,充分的檢查可以在產品部署時避免許多愚蠢的錯誤。由于框架使用了基于的,這是基于協程的異步模型。幸運的是為數據庫提供了異步的驅動。
typora-copy-images-to: ipic
[TOC]
配置環境首先檢查你的python版本:
$ python3 -V Python 3.6.3
安裝aiohttp:
$ pip3 install aiohttp
查看aiohttp版本號:
$ python3 -c "import aiohttp; print(aiohttp.__version__)" 3.0.7
項目結構與其他基于python的web項目非常相似:
. ├── README.rst └── polls ├── Makefile ├── README.rst ├── aiohttpdemo_polls │ ├── __init__.py │ ├── __main__.py │ ├── db.py │ ├── main.py │ ├── routes.py │ ├── templates │ ├── utils.py │ └── views.py ├── config │ └── polls.yaml ├── images │ └── example.png ├── setup.py ├── sql │ ├── create_tables.sql │ ├── install.sh │ └── sample_data.sql └── static └── style.css開始第一個aiohttp應用
這個教程基于Django的投票應用教程。
應用所有的aiohttp服務器都圍繞aiohttp.web.Application實例來構建。用于注冊startup/cleanup信號,以及連接路由等。
創建一個項目:
vote ├── config │?? └── __init__.py ├── models │?? └── __init__.py ├── static ├── template └── application └── __init__.py
目錄vote下面分別創建了config、models、application、static、template。
這里我使用pycharm開發,圖示如下:
創建一個應用:
from aiohttp import web app = web.Application() web.run_app(app, host="0.0.0.0", port=9000)
保存于vote/main.py并啟動服務器:
$ python3 /Users/junxi/program/vote/main.py
這里的vote是項目的根目錄。
你將在命令行中看到如下輸出:
======== Running on http://0.0.0.0:9000 ======== (Press CTRL+C to quit)
在瀏覽器中打開http://localhost:9000/或者使用命令
$ curl -X GET http://localhost:9000
不過,對于全部請求現在只會返回404: Not Found,讓我們創建一個路由和視圖來展示一些更有意義的東西。
視圖讓我們從第一個視圖開始。創建application/views.py并加入如下代碼:
from aiohttp import web async def hello(request): return web.Response(text="Hello Aiohttp!")
現在我們應該為這個 index 視圖創建一個路由。 將如下代碼寫入 application/routes.py (分離視圖,路由,模型是種很好的做法。 因為你可能擁有很多這些組件,放在不同的地方可以方便地管理代碼):
from .views import hello def setup_routes(app): app.router.add_get("/hello", hello)
此外,我們應該在某個地方調用 setup_routes 函數,最好是在 main.py 中調用它:
from aiohttp import web from application.routes import setup_routes app = web.Application() setup_routes(app) web.run_app(app, host="0.0.0.0", port=9000)
再次啟動服務器. 現在我們打開瀏覽器就可以看見:
$ curl -X GET localhost:9000/hello Hello Aiohttp!
工作目錄應該是像下面這樣:
vote ├── application │?? ├── __init__.py │?? ├── routes.py │?? └── views.py ├── config │?? ├── __init__.py │?? └── settings.py ├── main.py ├── models │?? ├── __init__.py ├── static └── template配置文件
aiohttp 的配置是不可知的。 這意味著這個庫不需要任何配置方法,并且也沒有內置支持任何配置模式。
但是請考慮下面這些事實:
99% 的服務器都有配置文件.
每個產品(除了像 Django 和 Flask 等基于 Python 的解決方案外)都不將配置文件寫入源代碼。
比如 Nginx 默認將自己的配置文件存儲在 /etc/nginx 文件夾下。
Mongo 將配置文件存為 /etc/mongodb.conf。
驗證配置文件是個好主意,充分的檢查可以在產品部署時避免許多愚蠢的錯誤。
因此,我們 建議 使用以下方法:
將配置存為 yaml 文件(json 或 ini 格式也不錯,但是 yaml 格式是最好的).
從預定位置加載 yaml 配置。例如 ./config/app_cfg.yaml, /etc/app_cfg.yaml。
保持可以通過命令行參數覆蓋配置文件的能力。例如 ./run_app --config=/opt/config/app_cfg.yaml。
對于加載的字典應用嚴格的檢查。 trafaret, colander or JSON schema 是這類型工作的好候選。
加載配置并在應用中讀取:
# load config from yaml file in current dir conf = load_config(str(pathlib.Path(".") / "config" / "settings.yaml")) app["config"] = conf
或者使用py文件當作配置文件:
├── config │?? ├── __init__.py │?? └── settings.py構建數據庫
操作MySQL數據庫的工具,之前django項目一直使用本身自帶的orm,tornado項目使用的torndb.py。其他項目則使用的pymysql庫,pymysql庫的用法在這里。
本文使用MySQL數據庫和aiomysql這個異步操作MySQL的庫。
安裝aiomysql
需要依賴pymysql
$ pip3 install pymysql $ pip3 install aiomysql
我們使用 aiomysql 來描述數據庫模式。
aiomysql官網連接示例
import asyncio from aiomysql import create_pool loop = asyncio.get_event_loop() async def go(): async with create_pool(host="127.0.0.1", port=3306, user="root", password="", db="mysql", loop=loop) as pool: async with pool.get() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") value = await cur.fetchone() print(value) loop.run_until_complete(go())
aiomysql官網連接池示例
import asyncio import aiomysql async def test_example(loop): pool = await aiomysql.create_pool(host="127.0.0.1", port=3306, user="root", password="", db="mysql", loop=loop) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") print(cur.description) (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(test_example(loop))
SQLAlchemy可選集成的示例
這里不使用sqlalchemy這個orm,原因:遷移功能不怎么好使,用慣了django的orm,感覺別的不咋好用。寫原生sql練習自己的原生sql編寫能力。
import asyncio import sqlalchemy as sa from aiomysql.sa import create_engine metadata = sa.MetaData() tbl = sa.Table("tbl", metadata, sa.Column("id", sa.Integer, primary_key=True), sa.Column("val", sa.String(255))) async def go(loop): engine = await create_engine(user="root", db="test_pymysql", host="127.0.0.1", password="", loop=loop) async with engine.acquire() as conn: await conn.execute(tbl.insert().values(val="abc")) await conn.execute(tbl.insert().values(val="xyz")) async for row in conn.execute(tbl.select()): print(row.id, row.val) engine.close() await engine.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(go(loop))創建數據庫表
查看mysql版本
$ mysql --version /usr/local/mysql/bin/mysql Ver 14.14 Distrib 5.7.20, for macos10.12 (x86_64) using EditLine wrapper
創建一個數據庫vote,并增加授權用戶
$ mysql -uroot -p123456 mysql> CREATE DATABASE IF NOT EXISTS vote CHARACTER SET utf8 COLLATE utf8_general_ci; mysql> grant all on vote.* to vote identified by "123456";
創建表user
CREATE TABLE IF NOT EXISTS `user`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "用戶ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `name` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "昵稱", `phone` VARCHAR(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "電話", `email` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "郵箱", `password` VARCHAR(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "密碼", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), INDEX `email` (`email`) USING BTREE, INDEX `phone` (`phone`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ;
查看user表結構
+-------------+-------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +-------------+-------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | delete_flag | tinyint(1) | NO | | 0 | | | name | varchar(40) | NO | | NULL | | | phone | varchar(11) | NO | MUL | NULL | | | email | varchar(40) | NO | MUL | NULL | | | password | varchar(16) | NO | | NULL | | | create_time | datetime | NO | | CURRENT_TIMESTAMP | | +-------------+-------------+------+-----+-------------------+----------------+
創建表question
CREATE TABLE IF NOT EXISTS `question`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "問題ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `user_id` INT(11) NOT NULL COMMENT "用戶ID", `question_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "問題內容", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT, INDEX `user_id` (`user_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ;
查看question表結構
+---------------+--------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +---------------+--------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | delete_flag | tinyint(1) | NO | | 0 | | | user_id | int(11) | NO | MUL | NULL | | | question_text | varchar(200) | NO | | NULL | | | create_time | datetime | NO | | CURRENT_TIMESTAMP | | +---------------+--------------+------+-----+-------------------+----------------+
創建表choice
CREATE TABLE IF NOT EXISTS `choice`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "選擇ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `choice_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT "選擇內容", `votes` INT(11) NOT NULL COMMENT "得票數", `question_id` INT(11) NOT NULL COMMENT "問題ID", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), FOREIGN KEY (`question_id`) REFERENCES `question` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT, INDEX `question_id` (`question_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ;
查看choice表結構
+-------------+--------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | delete_flag | tinyint(1) | NO | | 0 | | | choice_text | varchar(200) | YES | | NULL | | | votes | int(11) | NO | | NULL | | | question_id | int(11) | NO | MUL | NULL | | | create_time | datetime | NO | | CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+-------------------+----------------+創建連接池
我們需要創建一個全局的連接池,每個HTTP請求都可以從連接池中直接獲取數據庫連接。使用連接池的好處是不必頻繁地打開和關閉數據庫連接,而是能復用就盡量復用。
缺省情況下將編碼設置為utf8,自動提交事務:
async def create_pool(loop, **kw): """定義mysql全局連接池""" logging.info("create database connection pool...") global _mysql_pool _mysql_pool = await aiomysql.create_pool(host=DATABASES["host"], port=DATABASES["port"], user=DATABASES["user"], password=DATABASES["password"], db=DATABASES["db"], loop=loop, charset=kw.get("charset", "utf8"), autocommit=kw.get("autocommit", True), maxsize=kw.get("maxsize", 10), minsize=kw.get("minsize", 1)) return _mysql_pool封裝增刪改查
Web App里面有很多地方都要訪問數據庫。訪問數據庫需要創建數據庫連接、游標對象,然后執行SQL語句,最后處理異常,清理資源。這些訪問數據庫的代碼如果分散到各個函數中,勢必無法維護,也不利于代碼復用。
所以,我們要首先把常用的SELECT、INSERT、UPDATE和DELETE操作用函數封裝起來。
由于Web框架使用了基于asyncio的aiohttp,這是基于協程的異步模型。在協程中,不能調用普通的同步IO操作,因為所有用戶都是由一個線程服務的,協程的執行速度必須非常快,才能處理大量用戶的請求。而耗時的IO操作不能在協程中以同步的方式調用,否則,等待一個IO操作時,系統無法響應任何其他用戶。
這就是異步編程的一個原則:一旦決定使用異步,則系統每一層都必須是異步,“開弓沒有回頭箭”。
幸運的是aiomysql為MySQL數據庫提供了異步IO的驅動。
要執行SELECT語句,我們用select函數執行,需要傳入SQL語句和SQL參數:
async def fetchone(sql, args=(), size=None): """封裝select,查詢單個,返回數據為字典""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) rs = await cur.fetchone() return rs async def select(sql, args=(), size=None): """封裝select,查詢多個,返回數據為列表""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) if size: rs = await cur.fetchmany(size) else: rs = await cur.fetchall() logging.info("rows returned: %s" % len(rs)) return rs
注意要始終堅持使用帶參數的SQL,而不是自己拼接SQL字符串,這樣可以防止SQL注入攻擊。
注意到yield from將調用一個子協程(也就是在一個協程中調用另一個協程)并直接獲得子協程的返回結果。
如果傳入size參數,就通過fetchmany()獲取最多指定數量的記錄,否則,通過fetchall()獲取所有記錄。
Insert, Update, Delete
要執行INSERT、UPDATE、DELETE語句,可以定義一個通用的execute()函數,因為這3種SQL的執行都需要相同的參數,以及返回一個整數表示影響的行數:
async def execute(sql, args=()): """封裝insert, delete, update""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor() as cur: try: await cur.execute(sql, args) except BaseException: await conn.rollback() return else: affected = cur.rowcount return affected
execute()函數和select()函數所不同的是,cursor對象不返回結果集,而是通過rowcount返回結果數。
這三個函數定義在models文件夾下的db.py中(db.py是新創建的文件):
完整代碼如下:
import logging logging.basicConfig(level=logging.INFO) import aiomysql import aioredis from config.settings import DATABASES, CACHES def log(sql, args=()): logging.info("SQL: %s" % sql, *args) async def create_pool(loop, **kw): """定義mysql全局連接池""" logging.info("create database connection pool...") global _mysql_pool _mysql_pool = await aiomysql.create_pool(host=DATABASES["host"], port=DATABASES["port"], user=DATABASES["user"], password=DATABASES["password"], db=DATABASES["db"], loop=loop, charset=kw.get("charset", "utf8"), autocommit=kw.get("autocommit", True), maxsize=kw.get("maxsize", 10), minsize=kw.get("minsize", 1)) return _mysql_pool async def fetchone(sql, args=(), size=None): """封裝select,查詢單個,返回數據為字典""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) rs = await cur.fetchone() return rs async def select(sql, args=(), size=None): """封裝select,查詢多個,返回數據為列表""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) if size: rs = await cur.fetchmany(size) else: rs = await cur.fetchall() logging.info("rows returned: %s" % len(rs)) return rs async def execute(sql, args=()): """封裝insert, delete, update""" log(sql, args) async with _mysql_pool.acquire() as conn: async with conn.cursor() as cur: try: await cur.execute(sql, args) except BaseException: await conn.rollback() return else: affected = cur.rowcount return affected
把執行SQL的函數導入到models/__init__.py文件中,方便別的模塊引用:
from .db import * __all__ = ["create_pool", "select", "execute", "fetchone"]
把我們創建表的sql語句保存到models/create_table.sql文件中:
CREATE TABLE IF NOT EXISTS `user`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "用戶ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `name` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "昵稱", `phone` VARCHAR(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "電話", `email` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "郵箱", `password` VARCHAR(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "密碼", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), INDEX `email` (`email`) USING BTREE, INDEX `phone` (`phone`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ; CREATE TABLE IF NOT EXISTS `question`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "問題ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `user_id` INT(11) NOT NULL COMMENT "用戶ID", `question_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT "問題內容", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT, INDEX `user_id` (`user_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ; CREATE TABLE IF NOT EXISTS `choice`( `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT "選擇ID", `delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT "刪除標志", `choice_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT "選擇內容", `votes` INT(11) NOT NULL COMMENT "得票數", `question_id` INT(11) NOT NULL COMMENT "問題ID", `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT "創建時間", PRIMARY KEY ( `id` ), FOREIGN KEY (`question_id`) REFERENCES `question` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT, INDEX `question_id` (`question_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=DYNAMIC ;
models目錄結構:
models/ ├── __init__.py └── db.py編寫配置文件
之前我們說過的配置文件,我使用py文件當作配置文件,conf/settings.py內容如下:
DATABASES = { "engine": "mysql", "db": "vote", "user": "vote", "password": "123456", "host": "localhost", "port": 3306, }插入模擬數據
INSERT INTO user(name, phone, email, password) VALUES("露西", "16666666661", "luxi@qq.com", "123456"), ("南希", "16666666662", "nanxi@qq.com", "123456"), ("雪靈", "16666666663", "xueling@qq.com", "123456");
INSERT INTO question(question_text, user_id) VALUES("最受歡迎的計算機語言?", 1), ("最受歡迎的水果?", 2), ("男人最喜歡女人什么地方?", 3);
INSERT INTO choice(choice_text, question_id, votes) VALUES("python", 1, 3), ("java", 1, 2), ("go", 1, 1); INSERT INTO choice(choice_text, question_id, votes) VALUES("香蕉", 2, 3), ("蘋果", 2, 2), ("草莓", 2, 1); INSERT INTO choice(choice_text, question_id, votes) VALUES("漂亮臉蛋", 3, 3), ("大胸", 3, 2), ("大長腿", 3, 1);基礎視圖類
aiohttp.web提供django風格的基礎試圖類。
你可以從 View 類中繼承,并自定義http請求的處理方法:
from aiohttp import web from models import select import json import datetime import decimal class RewriteJsonEncoder(json.JSONEncoder): """重寫json類,為了解決datetime類型的數據無法被json格式化""" def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime("%Y-%m-%d %H:%M:%S") elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") elif isinstance(obj, decimal.Decimal): return str(obj) elif hasattr(obj, "isoformat"): # 處理日期類型 return obj.isoformat() else: return json.JSONEncoder.default(self, obj) def json_dumps(obj): return json.dumps(obj, cls=RewriteJsonEncoder) async def hello(request): return web.Response(text="Hello Aiohttp!") class QuestionChoices(web.View): """查看一個問題的可選答案""" async def get(self): question_id = self.request.match_info.get("question_id") result = await select(self.request.app["db"], "select * from choice where question_id = %s", (question_id,)) return web.json_response(data=result, dumps=json_dumps)
定義路由:
from .views import hello, QuestionChoices def setup_routes(app): app.router.add_get("/hello", hello, name="hello") app.router.add_route("*", "/question/{question_id}/choice", QuestionChoices)
打開瀏覽器或輸入下面命令訪問:
$ curl -X GET http://127.0.0.1:9000/question/1/choice [{"id": 1, "delete_flag": 0, "choice_text": "python", "votes": 3, "question_id": 1, "create_time": "2018-04-15 19:47:16"}, {"id": 2, "delete_flag": 0, "choice_text": "java", "votes": 2, "question_id": 1, "create_time": "2018-04-15 19:47:16"}, {"id": 3, "delete_flag": 0, "choice_text": "go", "votes": 1, "question_id": 1, "create_time": "2018-04-15 19:47:16"}]j
之前使用django比較多,個人喜歡使用類視圖。
裝飾器視圖路由裝飾器有點像Flask風格:
routes = web.RouteTableDef() @routes.get("/get") async def handle_get(request): ... @routes.post("/post") async def handle_post(request): ... app.router.add_routes(routes)
首先是要創建一個 aiohttp.web.RouteTableDef 對象。
該對象是一個類列表對象,額外提供aiohttp.web.RouteTableDef.get(),aiohttp.web.RouteTableDef.post()這些裝飾器來注冊路由。
最后調用add_routes()添加到應用的路由里。
靜態文件處理靜態文件( 圖片,JavaScripts, CSS文件等)最好的方法是使用反向代理,像是nginx或CDN服務。
但就開發來說,aiohttp服務器本身可以很方便的處理靜態文件。
只需要通過 UrlDispatcher.add_static()注冊個新的靜態路由即可:
app.router.add_static("/static", path_to_static_folder)
當訪問靜態文件的目錄時,默認服務器會返回 HTTP/403 Forbidden(禁止訪問)。 使用show_index并將其設置為True可以顯示出索引:
app.router.add_static("/static", path_to_static_folder, show_index=True)
當從靜態文件目錄訪問一個符號鏈接(軟鏈接)時,默認服務器會響應 HTTP/404 Not Found(未找到)。使用follow_symlinks并將其設置為True可以讓服務器使用符號鏈接:
app.router.add_static("/static", path_to_static_folder, follow_symlinks=True)
如果你想允許緩存清除,使用append_version并設為True。
緩存清除會對資源文件像JavaScript 和 CSS文件等的文件名上添加一個hash后的版本。這樣的好處是我們可以讓瀏覽器無限期緩存這些文件而不用擔心這些文件是否發布了新版本。
app.router.add_static("/static", path_to_static_folder, append_version=True)
這里我們添加一個靜態文件的路由
首先在配置文件conf/settings.py中指定項目、靜態文件、模版HTML路徑:
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 項目路徑 STATIC_DIR = os.path.join(BASE_DIR, "static") # 靜態文件路徑 TEMPLATE_DIR = os.path.join(BASE_DIR, "template") # 模版HTML路徑
接下里在application/routes.py文件中添加一個靜態文件路由:
def setup_static_routes(app): app.router.add_static("/static/", path=STATIC_DIR, name="static")
下載uikit的靜態文件到static目錄下:
static ├── css │?? ├── uikit-rtl.css │?? ├── uikit-rtl.min.css │?? ├── uikit.css │?? └── uikit.min.css └── js ├── uikit-icons.js ├── uikit-icons.min.js ├── uikit.js └── uikit.min.js
把添加靜態路由的函數添加到application/main.py文件的init函數中:
async def init(loop): mysql_pool = await create_pool(loop) app = web.Application(loop=loop) app["db"] = mysql_pool setup_routes(app) setup_static_routes(app) return app
重啟服務器訪問http://127.0.0.1:9000/static/js/bootstrap.js
$ curl -X GET http://127.0.0.1:9000/static/js/bootstrap.js /*! * Bootstrap v4.0.0 (https://getbootstrap.com) * Copyright 2011-2018 The Bootstrap Authors (https://github.com/twbs/bootstrap/graphs/contributors) * Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE) */ 。。。。。 。。。。。
可以正常訪問,靜態路由已經添加成功了。
模版aiohttp.web并不直接提供模板讀取,不過可以使用第三方庫 aiohttp_jinja2,該庫是由aiohttp作者維護的。
使用起來也很簡單。首先我們用aiohttp_jinja2.setup()來設置下jinja2環境。
安裝aiohttp_jinja2:
$ pip3 install aiohttp_jinja2
在application/routes.py文件中添加一個模版文件路由:
from config.settings import STATIC_DIR, TEMPLATE_DIR def setup_template_routes(app): aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(TEMPLATE_DIR))
把添加模版路由的函數添加到vote/main.py文件的init函數中:
from application.routes import setup_routes, setup_static_routes, setup_template_routes async def init(loop): mysql_pool = await create_pool(loop) app = web.Application(loop=loop) app["db"] = mysql_pool setup_routes(app) setup_static_routes(app) setup_template_routes(app) return app
增加pycharm普通項目對jinja2模版的支持,編輯.idea/vote.iml,在component標簽的同級添加如下內容:
新建一個模版HTML文件保存到template/index.html中,內容如下:
{% block title %}首頁 {% endblock %} {% block content %} {% endblock %}
新建注冊頁面保存到template/register.html中,內容如下:
{% extends "index.html" %} {% block title %}注冊 {% endblock %} {% block content %}{% endblock %}
頁面用到了jinja2模版的語法。
創建視圖函數用來訪問這個模版文件:
@aiohttp_jinja2.template("index.html") async def index(request): return @aiohttp_jinja2.template("register.html") async def register(request): return
創建與之對應的路由:
def setup_routes(app): app.router.add_get("/hello", hello, name="hello") app.router.add_get("/", index, name="index") app.router.add_get("/register", register, name="register") app.router.add_route("*", "/question/{question_id}/choice", QuestionChoices, name="QuestionChoices")
重啟服務器,瀏覽器訪問http://127.0.0.1:9000
瀏覽器訪問http://127.0.0.1:9000/register
調試工具箱開發aiohttp.web應用項目時,aiohttp_debugtoolbar是非常好用的一個調試工具。
可使用pip進行安裝:
$ pip3 install aiohttp_debugtoolbar
之后將aiohttp_debugtoolbar中間件添加到aiohttp.web.Applicaiton中并調用aiohttp_debugtoolbar.setup()來部署:
import aiohttp_debugtoolbar from aiohttp_debugtoolbar import toolbar_middleware_factory app = web.Application(middlewares=[toolbar_middleware_factory]) aiohttp_debugtoolbar.setup(app)
這里是我們的配置:
import asyncio import aiohttp_debugtoolbar from aiohttp import web from application.routes import setup_routes, setup_static_routes, setup_template_routes from models import create_pool from aiohttp_debugtoolbar import toolbar_middleware_factory async def init(loop): mysql_pool = await create_pool(loop) app = web.Application(loop=loop, middlewares=[toolbar_middleware_factory]) app["db"] = mysql_pool aiohttp_debugtoolbar.setup(app) setup_routes(app) setup_static_routes(app) setup_template_routes(app) return app
瀏覽器輸入地址http://127.0.0.1:9000/_debugtoolbar可以看到如下頁面:
開發工具aiohttp-devtools提供幾個簡化開發的小工具。
可以使用pip安裝:
$ pip3 install aiohttp-devtools * ``runserver`` 提供自動重載,實時重載,靜態文件服務和aiohttp_debugtoolbar_integration。 * ``start`` 是一個幫助做繁雜且必須的創建"aiohttp.web"應用的命令。
這是我們的項目啟動的例子:
$ adev runserver -v main.py --app-factory init -p 9000 --debug-toolbar --host localhost
這個adev著實難用,我們定義的init函數是個協程函數,但是它命令--app-factory要求必須是個普通函數,并且返回一個aiohttp.web.Application。由于我們要使用數據庫連接池,必須使用await協程語法。所以我放棄使用這個東西了。
創建和運行本地應用的文檔和指南請看aiohttp-devtools。
下面準備編寫注冊、登錄的邏輯了,這里先使用session會話機制。以后使用oauth2.0的token認證機制。
處理session會話你經常想要一個可以通過請求存儲用戶數據的倉庫。一般簡稱為會話。
aiohttp.web沒有內置會話,不過你可以使用第三方庫aiohttp_session來提供會話支持。
官網例子:
import asyncio import aioredis import time from aiohttp import web from aiohttp_session import setup, get_session from aiohttp_session.redis_storage import RedisStorage async def handler(request): session = await get_session(request) last_visit = session["last_visit"] if "last_visit" in session else None session["last_visit"] = time.time() text = "Last visited: {}".format(last_visit) return web.Response(text=text) async def make_redis_pool(): redis_address = ("127.0.0.1", "6379") return await aioredis.create_redis_pool(redis_address, timeout=1) def make_app(): loop = asyncio.get_event_loop() redis_pool = loop.run_until_complete(make_redis_pool()) storage = RedisStorage(redis_pool) async def dispose_redis_pool(app): redis_pool.close() await redis_pool.wait_closed() app = web.Application() setup(app, storage) app.on_cleanup.append(dispose_redis_pool) app.router.add_get("/", handler) return app web.run_app(make_app())
安裝aiohttp_session:
$ pip3 install aiohttp_session
session存儲使用redis,這里使用aioredis連接redis。
安裝aioredis:
$ pip3 install aioredis
創建redis全局連接池與redis命令簡單封裝,編輯models/db.py:
import aioredis from config.settings import DATABASES, CACHES async def create_redis_pool(loop): """定義redis全局連接池""" logging.info("create redis connection pool...") global _reids_pool _reids_pool = await aioredis.create_pool(address=CACHES["address"], db=CACHES["db"], password=CACHES["password"], minsize=CACHES["minsize"], maxsize=CACHES["maxsize"], loop=loop) return _reids_pool async def cache_set(*args, **kwargs): """redis set 命令封裝""" with await aioredis.commands.Redis(_reids_pool) as redis: await redis.set(*args, **kwargs) async def cache_get(*args, **kwargs): """redis get 命令封裝""" with await aioredis.commands.Redis(_reids_pool) as redis: return await redis.get(*args, **kwargs) async def cache_del(*args, **kwargs): """redis del 命令封裝""" with await aioredis.commands.Redis(_reids_pool) as redis: return await redis.delete(*args, **kwargs)
CACHES在我們config/settings.py里面定義:
CACHES = { "engine": "redis", "address": ("localhost", 6379), "password": None, "db": None, "minsize": 1, "maxsize": 10 }
把執行redis命令的函數導入到models/__init__.py文件中,方便別的模塊引用:
from .db import * __all__ = ["create_pool", "select", "execute", "fetchone", "create_redis_pool", "cache_set", "cache_get", "cache_del"]
注冊頁面:
{% extends "index.html" %} {% block title %}注冊 {% endblock %} {% block head_js %} {% endblock %} {% block content %}{% endblock %}
注冊視圖函數:
class Register(web.View): """a view handler for register page""" @aiohttp_jinja2.template("register.html") async def get(self): return async def post(self): data = await self.request.post() user = await fetchone("select id from user where email = %s or phone = %s", (data.get("email"), data.get("phone"))) # print(await self.request.multipart()) if user: msg = {"error_code": 20001, "error_msg": "The email or phone has been registered"} else: params = (data.get("name"), data.get("email"), data.get("phone"), data.get("password")) result = await fetchone("INSERT INTO user(name, email, phone, password) VALUES(%s, %s, %s, %s)", params) if result: msg = {"error_code": 0, "error_msg": "ok"} else: msg = {"error_code": 20002, "error_msg": "Please try again if registration fails"} # return web.json_response(data=msg, dumps=json_dumps) return web.json_response(data=msg, dumps=json_dumps)
登錄頁面:
{% extends "index.html" %} {% block title %}登錄 {% endblock %} {% block head_js %} {% endblock %} {% block content %}{% endblock %} {% block bottom_js %} {% endblock %} {% if msg %}{{ msg.error_msg }}
{% endif %}
登錄視圖函數:
class Login(web.View): """a view handler for login page""" async def get(self): return aiohttp_jinja2.render_template("login.html", self.request, locals()) async def post(self): data = await self.request.post() account = data.get("account") password = data.get("password") columns = "id, name, email, phone, password" if len(account) == 11 and re.match(r"^1[35678]d{9}", account): user = await fetchone("select {} from user where phone = %s".format(columns), (account,)) elif re.match(r"^[w-]+(.[w-]+)*@[w-]+(.[w-]+)+$", account): user = await fetchone("select {} from user where email = %s".format(columns), (account,)) else: msg = {"error_code": 20003, "error_msg": "User does not exists"} return aiohttp_jinja2.render_template("login.html", self.request, locals()) if password != user.get("password"): msg = {"error_code": 20004, "error_msg": "Password mismatch"} return aiohttp_jinja2.render_template("login.html", self.request, locals()) session = await get_session(self.request) session["uid"] = user.get("id") # sessionid = session.identity return web.Response(status=302, headers={"location": "/"})
給首頁視圖函數增加個驗證登錄到裝飾器:
from aiohttp_session import get_session from functools import wraps def login_required(func): # 用戶登錄狀態校驗 """This function applies only to class views.""" @wraps(func) async def inner(cls, *args, **kwargs): session = await get_session(cls.request) uid = session.get("uid") if uid: user = await fetchone("select id, name, email, phone from user where id = %s", (uid,)) cls.request.app.userdata = user return await func(cls, *args, **kwargs) else: return web.Response(status=302, headers={"location": "/login"}) return inner class Index(web.View): """a view handler for home page""" @login_required async def get(self): # response.headers["Content-Language"] = "utf-8" return aiohttp_jinja2.render_template("index.html", self.request, locals())
這里我把視圖處理函數全部改為類視圖方式編寫了。
增加路由:
#!/usr/bin/env python # _*_ coding:utf-8 _*_ __author__ = "junxi" import aiohttp_jinja2 import jinja2 import uuid from application.views import Hello, Index, Register, Login, QuestionChoices, Questions, hash_sha256 from config.settings import STATIC_DIR, TEMPLATE_DIR from aiohttp_session import setup from aiohttp_session.redis_storage import RedisStorage def setup_session(app, redis_pool): storage = RedisStorage(redis_pool=redis_pool, cookie_name="sessionid", key_factory=lambda: hash_sha256(uuid.uuid4().hex)) setup(app, storage) def setup_routes(app): app.router.add_view("/hello", Hello, name="Hello") app.router.add_view("", Index, name="Index") app.router.add_view("/register", Register, name="Register") app.router.add_view("/login", Login, name="Login") app.router.add_view("/questions/{question_id}/choice", QuestionChoices, name="QuestionChoices"
main.py增加session處理:
async def init(loop): mysql_pool = await create_pool(loop) redis_pool = await create_redis_pool(loop) # app = web.Application(loop=loop, middlewares=[toolbar_middleware_factory]) # aiohttp_debugtoolbar.setup(app) async def dispose_mysql_pool(): mysql_pool.close() await mysql_pool.wait_closed() async def dispose_redis_pool(): redis_pool.close() await redis_pool.wait_closed() async def dispose_pool(app): await dispose_mysql_pool() await dispose_redis_pool() app = web.Application(loop=loop) setup_session(app, redis_pool) setup_routes(app) setup_static_routes(app) setup_template_routes(app) app.on_cleanup.append(dispose_pool) return app
重新啟動服務器,輸入地址http://127.0.0.1:9000/ , 會跳轉到登錄頁面:
輸入賬號密碼登錄:
跳轉到首頁,可以看到右上角顯示昵稱,已經登錄成功了。
增加問答頁面:
{% extends "index.html" %} {% block title %}問答 {% endblock %} {% block head_js %} {% endblock %} {% block content %}{% endblock %} {% block bottom_js %} {% endblock %}{% for question in questions %}{% endfor %}{{ question.question_text }}
{% for i in question.question_choice|choice_split %} {% endfor %}
增加問答視圖函數:
class Questions(web.View): """a view handler for look at all questions""" @login_required async def get(self): questions = await select("select q.id as qid, q.question_text, (select group_concat(concat_ws("|", c.id, c.choice_text)) from choice c where c.question_id = q.id) as question_choice from question q;") return aiohttp_jinja2.render_template("questions.html", self.request, locals())
增加路由以及我們自定義的jinja2模版上下文處理函數:
import aiohttp_jinja2 import jinja2 import uuid from application.views import Hello, Index, Register, Login, QuestionChoices, Questions, hash_sha256 from config.settings import STATIC_DIR, TEMPLATE_DIR from aiohttp_session import setup from aiohttp_session.redis_storage import RedisStorage def setup_session(app, redis_pool): storage = RedisStorage(redis_pool=redis_pool, cookie_name="sessionid", key_factory=lambda: hash_sha256(uuid.uuid4().hex)) setup(app, storage) def setup_routes(app): app.router.add_view("/hello", Hello, name="Hello") app.router.add_view("", Index, name="Index") app.router.add_view("/register", Register, name="Register") app.router.add_view("/login", Login, name="Login") app.router.add_view("/questions/{question_id}/choice", QuestionChoices, name="QuestionChoices") app.router.add_view("/questions", Questions, name="Questions") def setup_static_routes(app): app.router.add_static("/static/", path=STATIC_DIR, name="static") def setup_template_routes(app): aiohttp_jinja2.setup(app, filters={"choice_split": choice_split}, loader=jinja2.FileSystemLoader(TEMPLATE_DIR)) def choice_split(choices): for i in choices.split(","): single = i.split("|") yield single
重啟服務后查看問答頁面http://127.0.0.1:9000/questions
項目展示這是完整代碼:https://github.com/junxi3166/python_study/tree/vote/python_practice
supervisor部署項目安裝supervisor:
mkdir ~/supervisor cd ~/supervisor/ wget https://files.pythonhosted.org/packages/44/60/698e54b4a4a9b956b2d709b4b7b676119c833d811d53ee2500f1b5e96dc3/supervisor-3.3.4.tar.gz tar zxf supervisor-3.3.4.tar.gz cd supervisor-3.3.4 sudo python setup.py install supervisord -v
生成配置文件:
$ echo_supervisord_conf > supervisord.conf
啟動:
$ supervisord -c supervisord.conf
查看 supervisord 是否在運行:
$ ps aux|grep supervisord junxi 5064 0.0 0.0 4267768 900 s000 S+ 10:37上午 0:00.00 grep --color supervisord junxi 5059 0.0 0.0 4344312 2196 ?? Ss 10:37上午 0:00.01 /usr/bin/python /usr/local/bin/supervisord -c supervisord.conf
打開配置文件:
vim supervisord.conf
創建aio目錄:
mkdir aio
在配置文件底部,配置include
[include] files = aio/*.conf
其他參數配置:
# grep -Ev "^;|^$" supervisord.conf [unix_http_server] file=/var/log/supervisor/supervisor.sock ; the path to the socket file [inet_http_server] ; inet (TCP) server disabled by default port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface username=user ; default is no username (open server) password=123 ; default is no password (open server) [supervisord] logfile=/var/log/supervisor/supervisord.log ; main log file; default $CWD/supervisord.log logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 loglevel=info ; log level; default info; others: debug,warn,trace pidfile=/var/log/supervisor/supervisord.pid ; supervisord pidfile; default supervisord.pid nodaemon=false ; start in foreground if true; default false minfds=1024 ; min. avail startup file descriptors; default 1024 minprocs=200 ; min. avail process descriptors;default 200 childlogdir=/var/log/supervisor ; "AUTO" child log dir, default $TEMP [include] files = /Users/junxi/supervisor/aio/*.conf
在aio文件夾下新建vote.conf文件用于啟動我們的vote項目,內容如下:
# vim aio/vote.conf [program:vote] numprocs = 4 numprocs_start = 1 process_name = vote_910%(process_num)s command=python3 /Users/junxi/program/vote/main.py --port=910%(process_num)s directory=/Users/junxi/program/vote autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/vote/access.log loglevel=info
創建存放日志的文件夾:
$ sudo mkdir /var/log/supervisor $ sudo chown -R junxi:admin /var/log/supervisor $ sudo mkdir /var/log/vote/ $ sudo chown -R junxi:admin /var/log/vote/
重啟supervisor:
$ kill -Hup `ps -ef|grep supervisord|awk "NR==1{print $2}"`
或者手動找到pid重啟。
使用客戶端supervisorctl管理進程的啟動
連接到服務端:
$ supervisorctl -c supervisord.conf
輸入默認的賬戶user,密碼123進入命令行。
查看狀態:
supervisor> help default commands (type help): ===================================== add exit open reload restart start tail avail fg pid remove shutdown status update clear maintail quit reread signal stop version supervisor> status vote:vote_9101 STOPPED Apr 17 11:00 PM vote:vote_9102 STOPPED Apr 17 11:00 PM vote:vote_9103 STOPPED Apr 17 11:00 PM vote:vote_9104
啟動vote:
supervisor> start all vote:vote_9101: started vote:vote_9102: started vote:vote_9103: started vote:vote_9104: started
瀏覽器輸入 http://127.0.0.1:9001/ 打開web頁面查看supervisor狀態,就是我們配置文件中的inet_http_server。
瀏覽器輸入4個端口(分別為9101、9102、9103、9104)分別進行訪問測試:
然后再使用nginx做個負載均衡:
proxy_next_upstream error; upstream votes { server 127.0.0.1:9101; server 127.0.0.1:9102; server 127.0.0.1:9103; server 127.0.0.1:9104; } server { listen 8008; server_name localhost; access_log /var/log/nginx/vote/access.log; error_log /var/log/nginx/vote/error.log; proxy_read_timeout 200; location /static/ { alias /Users/junxi/program/vote/static/; } location / { proxy_pass_header Server; proxy_set_header Host $http_host; proxy_redirect off; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Scheme $scheme; proxy_pass http://votes; } }
別忘了設置Nginx的worker_rlimit_nofile、worker_connections、worker_processes。
訪問http://localhost:8008/hello
Nice。
先寫到這里了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/41608.html
摘要:的異步代碼分析是的一個框架,基于,所以叫。不可避免的,可讀性會比較差。想找教程的話,請移步官方教程,寫得還是挺不錯的。建議不要直接使用,而只把它當成的一個樣例。 Python 的異步 IO:Aiohttp Client 代碼分析 Aiohttp 是 Python 的一個 HTTP 框架,基于 asyncio,所以叫 Aiohttp。 我主要是看源碼,想理解它的設計,所以附上了類圖與時序...
摘要:快速開始在安裝之前在支持異步的過程中,都經歷了哪些比較重大的更新。踏出第一步我們將正式使用來構建一個項目,讓我們踏出第一步,利用來編寫一個返回字符串的服務程序。本次示例的源代碼全部在上,見。 快速開始 在安裝Sanic之前,讓我們一起來看看Python在支持異步的過程中,都經歷了哪些比較重大的更新。 首先是Python3.4版本引入了asyncio,這讓Python有了支持異步IO的標...
摘要:在中,官方的異步協程庫正式成為標準。本項目就是以為基礎搭建的微服務框架。使用做單元測試,并且使用來避免訪問其他微服務。跟蹤每一個請求,記錄請求所經過的每一個微服務,以鏈條的方式串聯起來,對分析微服務的性能瓶頸至關重要。 介紹 使用python做web開發面臨的一個最大的問題就是性能,在解決C10K問題上顯的有點吃力。有些異步框架Tornado、Twisted、Gevent 等就是為了解...
摘要:事件循環是異步編程的底層基石。對事件集合進行輪詢,調用回調函數等一輪事件循環結束,循環往復。協程直接利用代碼的執行位置來表示狀態,而回調則是維護了一堆數據結構來處理狀態。時代的協程技術主要是,另一個比較小眾。 Coding Crush Python開發工程師 主要負責豈安科技業務風險情報系統redq。 引言 1.1. 存儲器山 存儲器山是 Randal Bryant 在《深入...
摘要:上一篇文章網絡爬蟲實戰請求庫安裝下一篇文章網絡爬蟲實戰解析庫的安裝的安裝在上一節我們了解了的配置方法,配置完成之后我們便可以用來驅動瀏覽器來做相應網頁的抓取。上一篇文章網絡爬蟲實戰請求庫安裝下一篇文章網絡爬蟲實戰解析庫的安裝 上一篇文章:Python3網絡爬蟲實戰---1、請求庫安裝:Requests、Selenium、ChromeDriver下一篇文章:Python3網絡爬蟲實戰--...
閱讀 2495·2021-11-24 10:29
閱讀 2634·2021-09-24 09:48
閱讀 5737·2021-09-22 15:56
閱讀 3151·2021-09-06 15:00
閱讀 2667·2019-08-30 15:54
閱讀 740·2019-08-30 13:48
閱讀 2893·2019-08-30 11:17
閱讀 3417·2019-08-29 11:20