定义
延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。
对于普通队列,如果队列里有数据,通过Pop方法可以立刻获取到队列的数据。
而对于延时队列,只有当队列里有数据,且该数据的指定消费时间小于等于当前时间,此刻该数据可被获取。
适用场景
延时队列适合用于设置一个定时器去触发某种事件的场景,如:
- 订单在十分钟之内未支付则自动取消。
- 优惠券过期提醒。
实现
先通过堆排序实现一个优先级队列,然后以时间为优先级,实现延时队列,队列里的元素指定的时间越小,优先级越高,就先出列,也就同时达到延时的效果、
优先级队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90package queue
import (
"container/heap"
"sync"
"sync/atomic"
"time"
)
type item struct {
Value interface{} // 元素
Priority int64 // 优先级
Index int // 索引
}
// priorityQueue 实现heap.Interface接口,利用堆排序实现的按优先级排序的队列
type priorityQueue []*item
// newPriorityQueue 返回指定容量的队列实例
func newPriorityQueue(capacity int) priorityQueue {
return make(priorityQueue, 0, capacity)
}
func (pq priorityQueue) Len() int {
return len(pq)
}
func (pq priorityQueue) Less(i, j int) bool {
// 根据优先级堆元素进行排序
return pq[i].Priority < pq[j].Priority
}
func (pq priorityQueue) Swap(i, j int) {
// 交换元素,同时更新index属性
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
c := cap(*pq)
if n+1 > c { // 触发扩容机制
newCap := c* 2
if c > 1024 { // 参考golang的slice扩容算法,当<=1024时,按倍数扩容,超过后就按1.25倍扩容
newCap = c + c/4
}
npq := make(priorityQueue, n, newCap)
copy(npq, *pq)
*pq = npq
}
*pq = (*pq)[0 : n+1]
item := x.(*item)
item.Index = n
(*pq)[n] = item
}
func (pq *priorityQueue) Pop() interface{} {
n := len(*pq)
c := cap(*pq)
if n < (c/2) && c > 32 { // 触发缩容机制
npq := make(priorityQueue, n, c/2)
copy(npq, *pq)
*pq = npq
}
item := (*pq)[n-1]
item.Index = -1
*pq = (*pq)[0 : n-1]
return item
}
// PeekAndShift 根据优先级获取队列里满足条件的第一个元素
func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
if pq.Len() == 0 {
return nil, 0
}
// 因为队列是经过堆排序的,所以第一个元素始终是优先级最高的
item := (*pq)[0]
if item.Priority > max {
// 如果未满足条件,返回差值
return nil, item.Priority - max
}
// 满足条件,移除第一个数据
heap.Remove(pq, 0)
return item, 0
}延时队列
1 |
|
- 使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func TestDeleteQueue(t *testing.T) {
dq := NewDelayQueue(10)
go dq.Poll(func() int64 {
return time.Now().UnixNano()
})
go func() {
for i := 0; i < 10; i++ {
dq.Offer(i, time.Now().UnixNano())
time.Sleep(time.Second * 2)
}
dq.Close()
}()
for {
select {
case item, ok := <-dq.Data:
if !ok {
return
}
fmt.Println(item)
default:
time.Sleep(time.Second)
}
}
}
- 本文作者: Hongker
- 本文链接: https://hongker.github.io/2022/07/17/algorithm-delayqueue/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!