国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

100行代碼實現任務隊列

xorpay / 692人閱讀

摘要:最近剛看完多線程,為了加深印象,按照分鐘實現延遲消息功能的思路,實現了一個簡易版的異步隊列。讀取任務時,計算當前和,取出需要執行的任務,使用多線程的形式執行。加鎖的主要作用是防止多線程同時操作文件讀寫,影響數據一致性。

最近剛看完python多線程,為了加深印象,按照1分鐘實現“延遲消息”功能的思路,實現了一個簡易版的異步隊列。

高效延時消息,包含兩個重要的數據結構:

1.環形隊列,例如可以創建一個包含3600個slot的環形隊列(本質是個數組)

2.任務集合,環上每一個slot是一個Set

同時,啟動一個timer,這個timer每隔1s,在上述環形隊列中移動一格,有一個Current Index指針來標識正在檢測的slot。

Task結構中有兩個很重要的屬性:

(1)Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務
(2)Task-Function:需要執行的任務指針

下邊是代碼(代碼不止100行,但是在200行內,也算100行了。)

#! -*- coding: utf-8 -*-

try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import simplejson as json
except ImportError:
    import json

import os
import errno
import Queue
import random
import logging
from functools import wraps
from threading import Timer, RLock, Thread
from time import sleep, time
from base64 import b64encode, b64decode

# json 的數據結構
# tasks = {
#     index: {
#         cycle_num: [(func, bargs)]
#     }
# }

logging.basicConfig(level=logging.DEBUG,
                    format="(%(asctime)-15s) %(message)s",)
tasks_file = "tasks.json"
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY

# 為了防止任務太多需要生成過多的線程,我們使用Queue 來限制生成的線程數量
WORKER_NUMS = 2
q = Queue.Queue(WORKER_NUMS)

lock = RLock()


def check_file():
    try:
        file_handle = os.open(tasks_file, flags)
    except OSError as e:
        if e.errno == errno.EEXIST:  # Failed as the file already exists.
            pass
        else:
            raise
    else:
        with os.fdopen(file_handle, "w") as file_obj:
            file_obj.write("{}")


def set_delay_task(func_name, *args, **kwargs):
    # 使用鎖來保證每次只要一個線程寫入文件,防止數據出錯
    with lock:
        with open(tasks_file, "r+") as json_file:
            count_down = kwargs.pop("count_down", 0)
            tasks = json.load(json_file)
            # 執行時間
            exec_time = int(time()) + count_down
            # 循環索引
            index = str(exec_time % 3600)
            # 圈數
            cycle_num = str(exec_time / 3600 + 1)
            dargs = pickle.dumps((args, kwargs))
            bargs = b64encode(dargs)
            index_data = tasks.get(index, {})
            index_data.setdefault(cycle_num, []).append((func_name, bargs))
            tasks[index] = index_data
            json_file.seek(0)
            json.dump(tasks, json_file)
            logging.debug("Received task: %s" % func_name)


def get_delay_tasks():
    with open(tasks_file, "r+") as json_file:
        tasks = json.load(json_file)
        # 執行時間
        current_time = int(time())
        # 循環索引
        index = str(current_time % 3600)
        # 圈數
        cycle_num = str(current_time / 3600 + 1)
        current_tasks = tasks.get(index, {}).get(cycle_num, [])
    tasks = []
    for func, bargs in current_tasks:
        dargs = b64decode(bargs)
        args, kwargs = pickle.loads(dargs)
        tasks.append((func, (args, kwargs)))
    return tasks


def get_method_by_name(method_name):
    possibles = globals().copy()
    possibles.update(locals())
    method = possibles.get(method_name)
    return method


def create_task(task_class, func, task_name=None, **kwargs):

    def execute(self):
        args, kwargs = self.data or ((), {})
        return func(*args, **kwargs)

    attrs = {
        "execute": execute,
        "func_name": func.__name__,
        "__module__": func.__module__,
        "__doc__": func.__doc__
    }
    attrs.update(kwargs)

    klass = type(
        task_name or func.__name__,
        (task_class,),
        attrs
    )

    return klass


class Hu(object):

    def __init__(self, func_name=None):
        self.func_name = func_name
        check_file()

    def task(self):
        def deco(func):
            self.func_name = func.__name__
            klass = create_task(Hu, func, self.func_name)
            func.delay = klass(func_name=klass.func_name).delay
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            return wrapper
        return deco

    def delay(self, *args, **kwargs):
        _args = [self.func_name]
        _args.extend(args)
        Timer(0, set_delay_task, _args, kwargs).start()
        return True


def boss():
    while True:
        current_tasks = get_delay_tasks()
        for func, params in current_tasks:
            # Task accepted: auth.tasks.send_msg
            logging.debug("Task accepted: %s" % func)
            q.put((func, params))
        sleep(1)


def worker():
    while True:
        func, params = q.get()
        print "get task: %s
" % func
        method = get_method_by_name(func)
        args, kwargs = params
        # Task auth.tasks.send_msgsucceeded in
        start_time = time()
        method(*args, **kwargs)
        end_time = time()
        logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time))
        q.task_done()


def main():
    check_file()
    print("starting at:", time())
    for target in (boss, worker):
        t = Thread(target=target)
        t.start()
    print("all DONE at:", time())

hu = Hu()

# 使用方式如下:

@hu.task()
def test(num):
    sleep(2)
    print "test: %s" % num


if __name__ == "__main__":
    for i in range(10):
        test.delay(i, count_down=random.randint(1, 10))
    main()

# output

(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test

(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2
get task: test

(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796
(2017-03-21 15:59:24,404) Task accepted: test
test: 1
get task: test

按照1分鐘實現“延遲消息”功能的思路。隊列的數據結構為

{
    index: {
        cycle_num: [(func, bargs)]
    }
}

index的值為 1-3600。每小時一個循環。
cycle_num 則是 由 (時間戳 / 3600 + 1) 計算得到的值,是圈數。

每當有任務加入,我們計算出index和cycle_num 將參數和方法名寫入json文件。
讀取任務時,計算當前 index和cycle_num, 取出需要執行的任務,使用多線程的形式執行。

為了防止任務太多需要生成過多的線程,我們使用Queue 來限制生成的線程數量。

加鎖的主要作用是防止多線程同時操作文件讀寫,影響數據一致性。

當然,也可以使用redis 存儲隊列,因為 redis 是單線程操作,可以防止多線程操作影響數據一致性的問題。
這一部分有需要的可以自己實現。

參考:

python線程筆記

1分鐘實現“延遲消息”功能

>歡迎關注 >請我喝芬達

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38539.html

相關文章

  • Laravel Telescope:優雅的應用調試工具

    摘要:文章轉自視頻教程優雅的應用調試工具新擴展是由和開源的應用的調試工具。計劃任務列出已運行的計劃任務。該封閉函數會被序列化為一個長字符串,加上他的哈希與簽名如出一轍該功能將記錄所有異常,并可查看具體異常情況。事件顯示所有事件的列表。 文章轉自:https://laravel-china.org/topics/19013視頻教程:047. 優雅的應用調試工具--laravel/telesco...

    MasonEast 評論0 收藏0
  • 時間切片(Time Slicing)

    摘要:所以回來后就想著補一篇文章針對時間切片展開詳細的討論。所以時間切片的目的是不阻塞主線程,而實現目的的技術手段是將一個長任務拆分成很多個不超過的小任務分散在宏任務隊列中執行。上周我在FDConf的分享《讓你的網頁更絲滑》中提到了時間切片,由于時間關系當時并沒有對時間切片展開更細致的討論。所以回來后就想著補一篇文章針對時間切片展開詳細的討論。 從用戶的輸入,再到顯示器在視覺上給用戶的輸出,這一過...

    Freeman 評論0 收藏0
  • 300ABAP代碼實現一個最簡單的區塊鏈原型

    摘要:我的這篇文章沒有任何高大上的術語,就是行代碼,實現一個最簡單的區塊鏈原型。檢查該區塊鏈是否有效。而通過在循環里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區塊鏈圈內俗稱的挖礦。 不知從什么時候起,區塊鏈在網上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網上...

    cikenerd 評論0 收藏0
  • 300ABAP代碼實現一個最簡單的區塊鏈原型

    摘要:我的這篇文章沒有任何高大上的術語,就是行代碼,實現一個最簡單的區塊鏈原型。檢查該區塊鏈是否有效。而通過在循環里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區塊鏈圈內俗稱的挖礦。 不知從什么時候起,區塊鏈在網上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網上...

    DangoSky 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<