摘要:最近剛看完多線程,為了加深印象,按照分鐘實現延遲消息功能的思路,實現了一個簡易版的異步隊列。讀取任務時,計算當前和,取出需要執行的任務,使用多線程的形式執行。加鎖的主要作用是防止多線程同時操作文件讀寫,影響數據一致性。
最近剛看完python多線程,為了加深印象,按照1分鐘實現“延遲消息”功能的思路,實現了一個簡易版的異步隊列。
高效延時消息,包含兩個重要的數據結構:
1.環形隊列,例如可以創建一個包含3600個slot的環形隊列(本質是個數組)
2.任務集合,環上每一個slot是一個Set
同時,啟動一個timer,這個timer每隔1s,在上述環形隊列中移動一格,有一個Current Index指針來標識正在檢測的slot。
Task結構中有兩個很重要的屬性:
(1)Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務
(2)Task-Function:需要執行的任務指針
下邊是代碼(代碼不止100行,但是在200行內,也算100行了。)
#! -*- coding: utf-8 -*- try: import cPickle as pickle except ImportError: import pickle try: import simplejson as json except ImportError: import json import os import errno import Queue import random import logging from functools import wraps from threading import Timer, RLock, Thread from time import sleep, time from base64 import b64encode, b64decode # json 的數據結構 # tasks = { # index: { # cycle_num: [(func, bargs)] # } # } logging.basicConfig(level=logging.DEBUG, format="(%(asctime)-15s) %(message)s",) tasks_file = "tasks.json" flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY # 為了防止任務太多需要生成過多的線程,我們使用Queue 來限制生成的線程數量 WORKER_NUMS = 2 q = Queue.Queue(WORKER_NUMS) lock = RLock() def check_file(): try: file_handle = os.open(tasks_file, flags) except OSError as e: if e.errno == errno.EEXIST: # Failed as the file already exists. pass else: raise else: with os.fdopen(file_handle, "w") as file_obj: file_obj.write("{}") def set_delay_task(func_name, *args, **kwargs): # 使用鎖來保證每次只要一個線程寫入文件,防止數據出錯 with lock: with open(tasks_file, "r+") as json_file: count_down = kwargs.pop("count_down", 0) tasks = json.load(json_file) # 執行時間 exec_time = int(time()) + count_down # 循環索引 index = str(exec_time % 3600) # 圈數 cycle_num = str(exec_time / 3600 + 1) dargs = pickle.dumps((args, kwargs)) bargs = b64encode(dargs) index_data = tasks.get(index, {}) index_data.setdefault(cycle_num, []).append((func_name, bargs)) tasks[index] = index_data json_file.seek(0) json.dump(tasks, json_file) logging.debug("Received task: %s" % func_name) def get_delay_tasks(): with open(tasks_file, "r+") as json_file: tasks = json.load(json_file) # 執行時間 current_time = int(time()) # 循環索引 index = str(current_time % 3600) # 圈數 cycle_num = str(current_time / 3600 + 1) current_tasks = tasks.get(index, {}).get(cycle_num, []) tasks = [] for func, bargs in current_tasks: dargs = b64decode(bargs) args, kwargs = pickle.loads(dargs) tasks.append((func, (args, kwargs))) return tasks def get_method_by_name(method_name): possibles = globals().copy() possibles.update(locals()) method = possibles.get(method_name) return method def create_task(task_class, func, task_name=None, **kwargs): def execute(self): args, kwargs = self.data or ((), {}) return func(*args, **kwargs) attrs = { "execute": execute, "func_name": func.__name__, "__module__": func.__module__, "__doc__": func.__doc__ } attrs.update(kwargs) klass = type( task_name or func.__name__, (task_class,), attrs ) return klass class Hu(object): def __init__(self, func_name=None): self.func_name = func_name check_file() def task(self): def deco(func): self.func_name = func.__name__ klass = create_task(Hu, func, self.func_name) func.delay = klass(func_name=klass.func_name).delay @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return deco def delay(self, *args, **kwargs): _args = [self.func_name] _args.extend(args) Timer(0, set_delay_task, _args, kwargs).start() return True def boss(): while True: current_tasks = get_delay_tasks() for func, params in current_tasks: # Task accepted: auth.tasks.send_msg logging.debug("Task accepted: %s" % func) q.put((func, params)) sleep(1) def worker(): while True: func, params = q.get() print "get task: %s " % func method = get_method_by_name(func) args, kwargs = params # Task auth.tasks.send_msgsucceeded in start_time = time() method(*args, **kwargs) end_time = time() logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time)) q.task_done() def main(): check_file() print("starting at:", time()) for target in (boss, worker): t = Thread(target=target) t.start() print("all DONE at:", time()) hu = Hu() # 使用方式如下: @hu.task() def test(num): sleep(2) print "test: %s" % num if __name__ == "__main__": for i in range(10): test.delay(i, count_down=random.randint(1, 10)) main() # output (2017-03-21 15:59:20,394) Received task: test (2017-03-21 15:59:20,396) Received task: test (2017-03-21 15:59:20,397) Received task: test (2017-03-21 15:59:20,398) Received task: test (2017-03-21 15:59:20,400) Received task: test (2017-03-21 15:59:20,401) Received task: test (2017-03-21 15:59:20,403) Received task: test (2017-03-21 15:59:20,404) Received task: test (2017-03-21 15:59:20,406) Received task: test (2017-03-21 15:59:20,408) Received task: test get task: test (2017-03-21 15:59:21,395) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test test: 2 get task: test (2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796 (2017-03-21 15:59:24,404) Task accepted: test test: 1 get task: test
按照1分鐘實現“延遲消息”功能的思路。隊列的數據結構為
{ index: { cycle_num: [(func, bargs)] } }
index的值為 1-3600。每小時一個循環。
cycle_num 則是 由 (時間戳 / 3600 + 1) 計算得到的值,是圈數。
每當有任務加入,我們計算出index和cycle_num 將參數和方法名寫入json文件。
讀取任務時,計算當前 index和cycle_num, 取出需要執行的任務,使用多線程的形式執行。
為了防止任務太多需要生成過多的線程,我們使用Queue 來限制生成的線程數量。
加鎖的主要作用是防止多線程同時操作文件讀寫,影響數據一致性。
當然,也可以使用redis 存儲隊列,因為 redis 是單線程操作,可以防止多線程操作影響數據一致性的問題。
這一部分有需要的可以自己實現。
參考:
python線程筆記
1分鐘實現“延遲消息”功能
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38539.html
摘要:文章轉自視頻教程優雅的應用調試工具新擴展是由和開源的應用的調試工具。計劃任務列出已運行的計劃任務。該封閉函數會被序列化為一個長字符串,加上他的哈希與簽名如出一轍該功能將記錄所有異常,并可查看具體異常情況。事件顯示所有事件的列表。 文章轉自:https://laravel-china.org/topics/19013視頻教程:047. 優雅的應用調試工具--laravel/telesco...
摘要:所以回來后就想著補一篇文章針對時間切片展開詳細的討論。所以時間切片的目的是不阻塞主線程,而實現目的的技術手段是將一個長任務拆分成很多個不超過的小任務分散在宏任務隊列中執行。上周我在FDConf的分享《讓你的網頁更絲滑》中提到了時間切片,由于時間關系當時并沒有對時間切片展開更細致的討論。所以回來后就想著補一篇文章針對時間切片展開詳細的討論。 從用戶的輸入,再到顯示器在視覺上給用戶的輸出,這一過...
摘要:我的這篇文章沒有任何高大上的術語,就是行代碼,實現一個最簡單的區塊鏈原型。檢查該區塊鏈是否有效。而通過在循環里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區塊鏈圈內俗稱的挖礦。 不知從什么時候起,區塊鏈在網上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網上...
摘要:我的這篇文章沒有任何高大上的術語,就是行代碼,實現一個最簡單的區塊鏈原型。檢查該區塊鏈是否有效。而通過在循環里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區塊鏈圈內俗稱的挖礦。 不知從什么時候起,區塊鏈在網上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網上...
閱讀 705·2021-11-18 10:02
閱讀 2241·2021-11-15 18:13
閱讀 3158·2021-11-15 11:38
閱讀 2947·2021-09-22 15:55
閱讀 3674·2021-08-09 13:43
閱讀 2447·2021-07-25 14:19
閱讀 2456·2019-08-30 14:15
閱讀 3448·2019-08-30 14:15