时间轮
大黄 Lv4

工作流程图

时间轮 NettyHashedWheelTimer流程图.jpg

由于netty动辄管理100w+的连接,每一个连接都会有很多超时任务。比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量的资源。

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public HashedWheelTimer(
ThreadFactory threadFactory,
// tickDuration 每一次指针持续时间
// ticksPerWheel 时间格的个数
// leakDetection 泄漏检测
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel =createWheel(ticksPerWheel);
mask = wheel.length - 1;

// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);

// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}

if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}

workerThread = threadFactory.newThread(worker);

leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

this.maxPendingTimeouts = maxPendingTimeouts;

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}

/**
*
* 时间格
*/
private static HashedWheelTimer.HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
} else if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
} else {
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelTimer.HashedWheelBucket[] wheel = new HashedWheelTimer.HashedWheelBucket[ticksPerWheel];

for(int i = 0; i < wheel.length; ++i) {
// 每个时间格一条队列
wheel[i] = new HashedWheelTimer.HashedWheelBucket();
}

return wheel;
}
}

HashedWheelTimer源码之启动

不需要显示调用,添加任务时会自动执行

没有任务却启动时间轮,也是在空耗资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void start() {
// 判断当前时间轮的状态
// 这里是一个Lock Free的设计。因为可能有多个线程调用启动方法,这里使用AtomicIntegerFieldUpdater原子的更新时间轮的状态
switch(WORKER_STATE_UPDATER.get(this)) {
case 0:
if (WORKER_STATE_UPDATER.compareAndSet(this, 0, 1)) {
this.workerThread.start();
}
case 1:
break;
case 2:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// 等待worker线程初始化时间轮的启动时间
while(this.startTime == 0L) {
try {
this.startTimeInitialized.await();
} catch (InterruptedException var2) {
}
}

}

HashedWheelTimer源码之停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public Set<Timeout> stop() {
// worker线程不能停止时间轮
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}

// CAS 替换状态为2:停止
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

// 终止worker线程
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}

HashedWheelTimer源码之添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 参数校验
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}

// 如果时间轮没有启动,则启动
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 计算deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 这里定时任务不是直接加入到对应的格子,而是先加入一个队列里,等到下一个tick的时候
// 会从队列里取出100000个任务加入到指定的格子里
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

这里使用的Queue不是普通java自带的Queue的实现,而是使用JCTool–一个高性能的的并发Queue实现包。

HashedWheelTimer源码之HashedWheelTimeout(任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
private static final class HashedWheelTimeout implements Timeout {

// 定义定时任务的3个状态:初始化、取消、过期
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
// 用来CAS方式更新定时任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

// 时间轮引用
private final HashedWheelTimer timer;
// 具体到期需要执行的任务
private final TimerTask task;
private final long deadline;

@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;

// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
// 离任务执行的轮数,当将此任务加入到格子中是计算该值,每过一轮,该值减一。
long remainingRounds;

// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
// 双向链表结构,由于只有worker线程会访问,这里不需要synchronization / volatile
HashedWheelTimeout next;
HashedWheelTimeout prev;

// The bucket to which the timeout was added
// 定时任务所在的格子
HashedWheelBucket bucket;

HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}

@Override
public Timer timer() {
return timer;
}

@Override
public TimerTask task() {
return task;
}

@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
// 这里只是修改状态为ST_CANCELLED,会在下次tick时,在格子中移除
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
// 加入到时间轮的待取消队列,并在每次tick的时候,从相应格子中移除。
timer.cancelledTimeouts.add(this);
return true;
}

// 从格子中移除自身
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}

public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}

public int state() {
return state;
}

@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}

@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}

// 过期并执行任务
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}

@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;

StringBuilder buf = new StringBuilder(192)
.append(simpleClassName(this))
.append('(')
.append("deadline: ");
if (remaining > 0) {
buf.append(remaining)
.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
.append(" ns ago");
} else {
buf.append("now");
}

if (isCancelled()) {
buf.append(", cancelled");
}

return buf.append(", task: ")
.append(task())
.append(')')
.toString();
}
}

HashedWheelTimer源码之HashedWheelBucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
// 指向格子中任务的首尾
private HashedWheelTimeout head;
private HashedWheelTimeout tail;

/**
* Add {@link HashedWheelTimeout} to this bucket.
* 基础的链表添加操作
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
* 过期并执行格子中的到期任务,tick到该格子的时候,worker线程会调用这个方法,根据deadline和remainingRounds判断任务是否过期
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
// 遍历
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}

public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}

if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}

/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}

private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}

// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

private long tick;

@Override
public void run() {
// Initialize the startTime.
// 初始化startTime.只有所有任务的的deadline都是想对于这个时间点
startTime = System.nanoTime();
// 由于System.nanoTime()可能返回0,甚至负数。并且0是一个标示符,用来判断startTime是否被初始化,所以当startTime=0的时候,重新赋值为1
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}

// Notify the other threads waiting for the initialization at start().
// 唤醒阻塞在start()的线程 时间轮线程
startTimeInitialized.countDown();

// 只要时间轮的状态为WORKER_STATE_STARTED,就循环的“转动”tick,循环判断响应格子中的到期任务
do {
// waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick
// 返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间
final long deadline = waitForNextTick();
// 可能溢出或者被中断的时候会返回负数, 所以小于等于0不管
if (deadline > 0) {
// 获取tick对应的格子索引
int idx = (int) (tick & mask);
// 移除被取消的任务
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 从任务队列中取出任务加入到对应的格子中
transferTimeoutsToBuckets();
// 过期执行格子中的任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
// 这里应该是时间轮停止了,清除所有格子中的任务,并加入到未处理任务列表,以供stop()方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}

// 将还没有加入到格子中的待处理定时任务队列中的任务取出,如果是未取消的任务,则加入到未处理任务队列中,以供stop()方法返回
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消任务
processCancelledTasks();
}

// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 每次tick只处理10w个任务,以免阻塞worker线程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 如果没有任务了,直接跳出循环
// all processed
break;
}
// 还没有放入到格子中就取消了,直接略过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}

// 计算任务需要经过多少个tick
long calculated = timeout.deadline / tickDuration;
// 计算任务的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;

// 如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行.
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);

// 将任务加入到响应的格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}

// 将取消的任务取出,并从格子中移除
private void processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}

/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
//sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长
private long waitForNextTick() {
//下次tick的时间点, 用于计算需要sleep的时间
long deadline = tickDuration * (tick + 1);

for (;;) {
// 计算需要sleep的时间, 之所以加999999后再除10000000, 是为了保证足够的sleep时间
// 例如:当deadline - currentTime=2000002的时候,如果不加999999,则只睡了2ms,
// 而2ms其实是未到达deadline这个时间点的,所有为了使上述情况能sleep足够的时间,加上999999后,会多睡1ms
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
// 移除问题
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 调用HashedWheelTimer.stop()时优雅退出
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}

问题

  • 关于在timeouts里面的队列里面的任务,还没有来得及执行就到期了的情况(存在吗?存在的话怎么解决的)

问题是存在的

在 transferTimeoutsToBuckets 方法有一个处理:

如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行.

  • Post title:时间轮
  • Post author:大黄
  • Create time:2022-01-13 16:18:17
  • Post link:https://huangbangjing.cn/2022/01/13/时间轮/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.