publicHashedWheelTimer( ThreadFactory threadFactory, // tickDuration 每一次指针持续时间 // ticksPerWheel 时间格的个数 // leakDetection 泄漏检测 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts){
if (threadFactory == null) { thrownew NullPointerException("threadFactory"); } if (unit == null) { thrownew NullPointerException("unit"); } if (tickDuration <= 0) { thrownew IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { thrownew IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); }
// Normalize ticksPerWheel to power of two and initialize the wheel. wheel =createWheel(ticksPerWheel); mask = wheel.length - 1;
// Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration);
@Override public Set<Timeout> stop(){ // worker线程不能停止时间轮 if (Thread.currentThread() == workerThread) { thrownew IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); }
// CAS 替换状态为2:停止 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } }
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){ // 参数校验 if (task == null) { thrownew NullPointerException("task"); } if (unit == null) { thrownew NullPointerException("unit"); }
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); thrownew RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }
// 如果时间轮没有启动,则启动 start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. // 计算deadline long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) privatevolatileint state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the // HashedWheelTimeout will be added to the correct HashedWheelBucket. // 离任务执行的轮数,当将此任务加入到格子中是计算该值,每过一轮,该值减一。 long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list. // As only the workerThread will act on it there is no need for synchronization / volatile. // 双向链表结构,由于只有worker线程会访问,这里不需要synchronization / volatile HashedWheelTimeout next; HashedWheelTimeout prev;
// The bucket to which the timeout was added // 定时任务所在的格子 HashedWheelBucket bucket;
@Override publicbooleancancel(){ // only update the state it will be removed from HashedWheelBucket on next tick. // 这里只是修改状态为ST_CANCELLED,会在下次tick时,在格子中移除 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { returnfalse; } // If a task should be canceled we put this to another queue which will be processed on each tick. // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. // 加入到时间轮的待取消队列,并在每次tick的时候,从相应格子中移除。 timer.cancelledTimeouts.add(this); returntrue; }
privatestaticfinalclassHashedWheelBucket{ // Used for the linked-list datastructure // 指向格子中任务的首尾 private HashedWheelTimeout head; private HashedWheelTimeout tail;
/** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. * 过期并执行格子中的到期任务,tick到该格子的时候,worker线程会调用这个方法,根据deadline和remainingRounds判断任务是否过期 */ publicvoidexpireTimeouts(long deadline){ HashedWheelTimeout timeout = head;
// process all timeouts // 遍历 while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. thrownew IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } elseif (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds --; } timeout = next; } }
public HashedWheelTimeout remove(HashedWheelTimeout timeout){ HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; }
if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } elseif (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; timeout.timer.pendingTimeouts.decrementAndGet(); return next; }
/** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ publicvoidclearTimeouts(Set<Timeout> set){ for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } }
private HashedWheelTimeout pollTimeout(){ HashedWheelTimeout head = this.head; if (head == null) { returnnull; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; }
// null out prev and next to allow for GC. head.next = null; head.prev = null; head.bucket = null; return head; } }
privatefinalclassWorkerimplementsRunnable{ privatefinal Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
privatelong tick;
@Override publicvoidrun(){ // Initialize the startTime. // 初始化startTime.只有所有任务的的deadline都是想对于这个时间点 startTime = System.nanoTime(); // 由于System.nanoTime()可能返回0,甚至负数。并且0是一个标示符,用来判断startTime是否被初始化,所以当startTime=0的时候,重新赋值为1 if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; }
// Notify the other threads waiting for the initialization at start(). // 唤醒阻塞在start()的线程 时间轮线程 startTimeInitialized.countDown();
// Fill the unprocessedTimeouts so we can return them from stop() method. // 这里应该是时间轮停止了,清除所有格子中的任务,并加入到未处理任务列表,以供stop()方法返回 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } // 将还没有加入到格子中的待处理定时任务队列中的任务取出,如果是未取消的任务,则加入到未处理任务队列中,以供stop()方法返回 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } // 处理取消任务 processCancelledTasks(); }
// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中 privatevoidtransferTimeoutsToBuckets(){ // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 每次tick只处理10w个任务,以免阻塞worker线程 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // 如果没有任务了,直接跳出循环 // all processed break; } // 还没有放入到格子中就取消了,直接略过 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; }
// 将取消的任务取出,并从格子中移除 privatevoidprocessCancelledTasks(){ for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } }
/** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ //sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长 privatelongwaitForNextTick(){ //下次tick的时间点, 用于计算需要sleep的时间 long deadline = tickDuration * (tick + 1);
// Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; }