摘要:算法序和年的論文提出了一種定時輪的方式來管理和維護大量的調度算法內核中的定時器采用的就是這個方案。使用實例每一次的時間間隔每一次就會到達下一個槽位輪中的數源碼解讀之時間輪算法實現定時輪算法細說延時任務的處理定時器的實現
HashedWheelTimer算法
序George Varghese 和 Tony Lauck 1996 年的論文:Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility提出了一種定時輪的方式來管理和維護大量的Timer調度算法.Linux 內核中的定時器采用的就是這個方案。
原理一個Hash Wheel Timer是一個環形結構,可以想象成時鐘,分為很多格子,一個格子代表一段時間(越短Timer精度越高),并用一個List保存在該格子上到期的所有任務,同時一個指針隨著時間流逝一格一格轉動,并執行對應List中所有到期的任務。任務通過取模決定應該放入哪個格子。
環形結構可以根據超時時間的 hash 值(這個 hash 值實際上就是ticks & mask)將 task 分布到不同的槽位中, 當 tick 到那個槽位時, 只需要遍歷那個槽位的 task 即可知道哪些任務會超時(而使用線性結構, 你每次 tick 都需要遍歷所有 task), 所以, 我們任務量大的時候, 相應的增加 wheel 的 ticksPerWheel 值, 可以減少 tick 時遍歷任務的個數.
結構圖以上圖為例,假設一個格子是1秒,則整個wheel能表示的時間段為8s,假如當前指針指向2,此時需要調度一個3s后執行的任務,顯然應該加入到(2+3=5)的方格中,指針再走3次就可以執行了;如果任務要在10s后執行,應該等指針走完一個round零2格再執行,因此應放入4,同時將round(1)保存到任務中。檢查到期任務時應當只執行round為0的,格子上其他任務的round應減1。
效率添加任務:O(1)
刪除/取消任務:O(1)
過期/執行任務:最差情況為O(n)->也就是當HashMap里面的元素全部hash沖突,退化為一條鏈表的情況。平均O(1)
槽位越多,每個槽位上的鏈表就越短,這里需要權衡時間與空間。
netty3.10的實現 相關參數tickDuration: 每 tick 一次的時間間隔, 每 tick 一次就會到達下一個槽位
ticksPerWheel: 輪中的 slot 數,hash算法計算目標槽位
/** * Creates a new timer with the default thread factory * ({@link Executors#defaultThreadFactory()}). * * @param tickDuration the duration between tick * @param unit the time unit of the {@code tickDuration} * @param ticksPerWheel the size of the wheel */ public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) { this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); }HashedWheelBucket定義
/** * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy * removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no * extra object creation is needed. */ private static final class HashedWheelBucket { // Used for the linked-list datastructure private HashedWheelTimeout head; private HashedWheelTimeout tail; /** * Add {@link HashedWheelTimeout} to this bucket. */ public void addTimeout(HashedWheelTimeout timeout) { assert timeout.bucket == null; timeout.bucket = this; if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } /** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { boolean remove = false; if (timeout.remainingRounds <= 0) { if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } remove = true; } else if (timeout.isCancelled()) { remove = true; } else { timeout.remainingRounds --; } // store reference to next as we may null out timeout.next in the remove block. HashedWheelTimeout next = timeout.next; if (remove) { remove(timeout); } timeout = next; } } public void remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; } /** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ public void clearTimeouts(Setset) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } } private HashedWheelTimeout pollTimeout() { HashedWheelTimeout head = this.head; if (head == null) { return null; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; } // null out prev and next to allow for GC. head.next = null; head.prev = null; return head; } }
HashedWheelTimeout
private static final class HashedWheelTimeout implements Timeout { private static final int ST_INIT = 0; private static final int ST_IN_BUCKET = 1; private static final int ST_CANCELLED = 2; private static final int ST_EXPIRED = 3; private static final AtomicIntegerFieldUpdaterHashedWheelBucket創建STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); private final HashedWheelTimer timer; private final TimerTask task; private final long deadline; @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) private volatile int state = ST_INIT; // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the // HashedWheelTimeout will be added to the correct HashedWheelBucket. long remainingRounds; // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. // As only the workerThread will act on it there is no need for synchronization / volatile. HashedWheelTimeout next; HashedWheelTimeout prev; // The bucket to which the timeout was added HashedWheelBucket bucket; HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { this.timer = timer; this.task = task; this.deadline = deadline; } public Timer getTimer() { return timer; } public TimerTask getTask() { return task; } public void cancel() { int state = state(); if (state >= ST_CANCELLED) { // fail fast if the task was cancelled or expired before. return; } if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) { // Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket. // In this case we can just return here as it will be discarded by the WorkerThread when handling // the adding of HashedWheelTimeout to the HashedWheelBuckets. return; } // only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) { return; } // Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick // and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get // GC"ed once the user has no reference to it anymore. timer.timeouts.add(this); } public void remove() { if (bucket != null) { bucket.remove(this); } } public boolean compareAndSetState(int expected, int state) { return STATE_UPDATER.compareAndSet(this, expected, state); } public int state() { return state; } public boolean isCancelled() { return state == ST_CANCELLED; } public boolean isExpired() { return state > ST_IN_BUCKET; } public HashedWheelTimeout value() { return this; } public void expire() { if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) { assert state() != ST_INIT; return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t); } } } @Override public String toString() { final long currentTime = System.nanoTime(); long remaining = deadline - currentTime + timer.startTime; StringBuilder buf = new StringBuilder(192); buf.append(getClass().getSimpleName()); buf.append("("); buf.append("deadline: "); if (remaining > 0) { buf.append(remaining); buf.append(" ns later"); } else if (remaining < 0) { buf.append(-remaining); buf.append(" ns ago"); } else { buf.append("now"); } if (isCancelled()) { buf.append(", cancelled"); } buf.append(", task: "); buf.append(getTask()); return buf.append(")").toString(); } }
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }timeouts隊列
private final QueueWorkertimeouts = new ConcurrentLinkedQueue (); public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
HashedWheelTimer的核心,主要處理tick的轉動、過期任務。
private final class Worker implements Runnable { private final SetunprocessedTimeouts = new HashSet (); private long tick; public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it"s not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0) { transferTimeoutsToBuckets(); HashedWheelBucket bucket = wheel[(int) (tick & mask)]; bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } unprocessedTimeouts.add(timeout); } } private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) { // Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout // in the queue timeout.remove(); continue; } long calculated = timeout.deadline / tickDuration; long remainingRounds = (calculated - tick) / wheel.length; timeout.remainingRounds = remainingRounds; final long ticks = Math.max(calculated, tick); // Ensure we don"t schedule for past. int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } } /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ private long waitForNextTick() { long deadline = tickDuration * (tick + 1); for (;;) { final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (DetectionUtil.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } public Set unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } }
定位槽位
HashedWheelBucket[] wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; HashedWheelBucket bucket = wheel[(int) (tick & mask)];
比如有16個槽,則mask為15,假設當前tick=30,則槽位=14
更新該槽位任務的remainingRounds
每走一個tick都要更新該tick對應的槽位下面的任務的remainingRounds或者執行到期的任務
bucket.expireTimeouts(deadline); public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { boolean remove = false; if (timeout.remainingRounds <= 0) { if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } remove = true; } else if (timeout.isCancelled()) { remove = true; } else { timeout.remainingRounds --; } // store reference to next as we may null out timeout.next in the remove block. HashedWheelTimeout next = timeout.next; if (remove) { remove(timeout); } timeout = next; } }
執行到期任務
public void expire() { if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) { assert state() != ST_INIT; return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t); } } }
注意,這里是同步執行,會阻塞整個timer的,需要異步。
transfer
每走一個tick的時候,要把task從queue中取出來,放到槽位。
long calculated = timeout.deadline / tickDuration; long remainingRounds = (calculated - tick) / wheel.length; timeout.remainingRounds = remainingRounds; final long ticks = Math.max(calculated, tick); // Ensure we don"t schedule for past. int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout);使用實例
/** * ?tickDuration: 每 tick 一次的時間間隔, 每 tick 一次就會到達下一個槽位 * ticksPerWheel: 輪中的 slot 數 */ @Test public void testHashedWheelTimer() throws InterruptedException { HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1000/**tickDuration**/, TimeUnit.MILLISECONDS, 16 /**ticksPerWheel**/); System.out.println(LocalTime.now()+" submitted"); Timeout timeout = hashedWheelTimer.newTimeout((t) -> { new Thread(){ @Override public void run() { System.out.println(new Date() + " executed"); System.out.println(hashedWheelTimer); try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " FINISH"); } }.start(); }, 5, TimeUnit.SECONDS); hashedWheelTimer.newTimeout((t) -> { new Thread(){ @Override public void run() { System.out.println(new Date() + " TASK2 executed"); System.out.println(hashedWheelTimer); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " TASK2 FINISH"); } }.start(); }, 15, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(500); }doc
netty源碼解讀之時間輪算法實現-HashedWheelTimer
Timing Wheel 定時輪算法
細說延時任務的處理
ifesdjeen-hashed-wheel-timer
TimingWheels.ppt
定時器(Timer)的實現
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70318.html
摘要:目前支持多種注冊中心。本編文章是分析使用作為注冊中心,如何整合進行服務注冊和訂閱服務。 目前dubbo支持多種注冊中心:Zookeeper、Redis、Simple、Multicast、Etcd3。 本編文章是分析使用Zookeeper作為注冊中心,dubbo如何整合Zookeeper進行服務注冊和訂閱服務。 首先dubbo將服務注冊到Zookeeper后,目錄結構如下所示:(注冊接口...
摘要:服務器大規模下發數據幀時,可進行有效的擁塞控制超時重發,可有效提升集群設備的可靠性,降低集群設備的研發難度。幀調度策略由于這些問題,故自行制定如下幀調度策略,實踐表明,該策略可最大程度上解決以上問題。 「博客搬家」 原地址: 簡書 原發表時間: 2017-07-19 最近正在做一個 Java 后端項目「大規模集群設備的管理平臺」。使用 Spring 作為基礎框架,使用 Netty...
摘要:再附一部分架構面試視頻講解本文已被開源項目學習筆記總結移動架構視頻大廠面試真題項目實戰源碼收錄 Java反射(一)Java反射(二)Java反射(三)Java注解Java IO(一)Java IO(二)RandomAccessFileJava NIOJava異常詳解Java抽象類和接口的區別Java深拷貝和淺拷...
摘要:下文中我們將分別使用和來實現加解密,二者同步加解密的要點為使用何種填充算法。下面我們給出填充算法的實現填充算法移去填充算法默認使用自動對待加密數據進行填充以對齊加密算法數據塊長度。需固定使用,并通過調整的長度,來實現加密算法。 對稱加解密算法中,當前最為安全的是 AES 加密算法(以前應該是是 DES 加密算法),PHP 提供了兩個可以用于 AES 加密算法的函數簇:Mcrypt 和 ...
閱讀 3891·2021-11-22 13:54
閱讀 2669·2021-09-30 09:48
閱讀 2353·2021-09-28 09:36
閱讀 3104·2021-09-22 15:26
閱讀 1336·2019-08-30 15:55
閱讀 2505·2019-08-30 15:54
閱讀 1419·2019-08-30 14:17
閱讀 2335·2019-08-28 18:25