当前位置:首页 > 技术知识 > 正文内容

Go语言进阶:时间轮(golang时间轮)

maynowei11个月前 (08-03)技术知识213

时间轮概念

时间轮(Timing Wheel)是一种高效的定时任务调度数据结构,特别适合处理大量定时任务。它通过一个循环数组(轮盘)和多个槽位(buckets)来组织定时任务,每个槽位代表一个时间间隔。

核心思想

  • 循环数组:时间轮是一个环形结构,指针按固定时间间隔移动
  • 槽位(Buckets):每个槽位存储在该时间点需要执行的任务
  • 时间刻度:每个槽位代表一个时间单位(如1秒)
  • 高效操作:添加/删除定时任务的时间复杂度为 O(1)

优势

  1. 高效管理大量定时任务
  2. 避免频繁的系统调用
  3. 减少内存占用
  4. 适用于心跳检测、超时控制等场景

Go 实现的单级时间轮

// Task 定义定时任务结构
type Task struct {
	delay   time.Duration // 任务延迟执行的时间
	key     string        // 任务唯一标识
	job     func()        // 任务执行函数
	rounds  int           // 任务需要经历的轮次(用于处理长延迟任务)
	slotIdx int           // 任务所在槽位索引
	element *list.Element // 在链表中的位置(用于快速删除)
}

// TimeWheel 时间轮结构
type TimeWheel struct {
	tick         time.Duration    // 时间轮每次转动的时间间隔
	slotsNum     int              // 时间轮槽位数量
	slots        []*list.List     // 槽位数组,每个槽位是一个任务链表
	currentPos   int              // 当前指针位置
	ticker       *time.Ticker     // 定时触发器
	stopChan     chan struct{}    // 停止通道
	taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
	mutex        sync.Mutex       // 互斥锁,保证并发安全
	wg           sync.WaitGroup   // 等待组,用于优雅关闭
}

// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
	if tick < time.Millisecond {
		tick = time.Millisecond // 确保最小时间单位为毫秒
	}

	tw := &TimeWheel{
		tick:         tick,
		slotsNum:     slotsNum,
		slots:        make([]*list.List, slotsNum),
		currentPos:   0,
		stopChan:     make(chan struct{}),
		taskRegistry: make(map[string]*Task),
	}

	// 初始化每个槽位的链表
	for i := 0; i < slotsNum; i++ {
		tw.slots[i] = list.New()
	}

	return tw
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
	tw.ticker = time.NewTicker(tw.tick)
	tw.wg.Add(1)

	go func() {
		defer tw.wg.Done()

		for {
			select {
			case <-tw.ticker.C:
				tw.tickHandler() // 处理定时触发
			case <-tw.stopChan:
				tw.ticker.Stop()
				return
			}
		}
	}()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
	close(tw.stopChan) // 发送停止信号
	tw.wg.Wait()       // 等待所有goroutine退出
}

// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 检查任务是否已存在
	if _, exists := tw.taskRegistry[key]; exists {
		return fmt.Errorf("task with key '%s' already exists", key)
	}

	// 计算延迟对应的轮次和槽位
	delayTicks := int(delay / tw.tick)
	rounds := delayTicks / tw.slotsNum
	slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum

	// 创建新任务
	task := &Task{
		delay:   delay,
		key:     key,
		job:     job,
		rounds:  rounds,
		slotIdx: slotIdx,
	}

	// 将任务添加到对应槽位
	element := tw.slots[slotIdx].PushBack(task)
	task.element = element

	// 注册任务
	tw.taskRegistry[key] = task

	return nil
}

// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	task, exists := tw.taskRegistry[key]
	if !exists {
		return false
	}

	// 从槽位链表中移除
	tw.slots[task.slotIdx].Remove(task.element)

	// 从注册表中移除
	delete(tw.taskRegistry, key)

	return true
}

// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 获取当前槽位的任务链表
	currentList := tw.slots[tw.currentPos]

	// 处理当前槽位的所有任务
	var next *list.Element
	for e := currentList.Front(); e != nil; e = next {
		next = e.Next()
		task := e.Value.(*Task)

		// 如果任务轮次为0,执行任务
		if task.rounds == 0 {
			go task.job() // 异步执行任务
			currentList.Remove(e)
			delete(tw.taskRegistry, task.key)
		} else {
			// 减少轮次
			task.rounds--
		}
	}

	// 移动指针到下一个槽位
	tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}

// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()
	return len(tw.taskRegistry)
}

完整代码

package main

import (
	"container/list"
	"fmt"
	"sync"
	"time"
)

// Task 定义定时任务结构
type Task struct {
	delay   time.Duration // 任务延迟执行的时间
	key     string        // 任务唯一标识
	job     func()        // 任务执行函数
	rounds  int           // 任务需要经历的轮次(用于处理长延迟任务)
	slotIdx int           // 任务所在槽位索引
	element *list.Element // 在链表中的位置(用于快速删除)
}

// TimeWheel 时间轮结构
type TimeWheel struct {
	tick         time.Duration    // 时间轮每次转动的时间间隔
	slotsNum     int              // 时间轮槽位数量
	slots        []*list.List     // 槽位数组,每个槽位是一个任务链表
	currentPos   int              // 当前指针位置
	ticker       *time.Ticker     // 定时触发器
	stopChan     chan struct{}    // 停止通道
	taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
	mutex        sync.Mutex       // 互斥锁,保证并发安全
	wg           sync.WaitGroup   // 等待组,用于优雅关闭
}

// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
	if tick < time.Millisecond {
		tick = time.Millisecond // 确保最小时间单位为毫秒
	}

	tw := &TimeWheel{
		tick:         tick,
		slotsNum:     slotsNum,
		slots:        make([]*list.List, slotsNum),
		currentPos:   0,
		stopChan:     make(chan struct{}),
		taskRegistry: make(map[string]*Task),
	}

	// 初始化每个槽位的链表
	for i := 0; i < slotsNum; i++ {
		tw.slots[i] = list.New()
	}

	return tw
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
	tw.ticker = time.NewTicker(tw.tick)
	tw.wg.Add(1)

	go func() {
		defer tw.wg.Done()

		for {
			select {
			case <-tw.ticker.C:
				tw.tickHandler() // 处理定时触发
			case <-tw.stopChan:
				tw.ticker.Stop()
				return
			}
		}
	}()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
	close(tw.stopChan) // 发送停止信号
	tw.wg.Wait()       // 等待所有goroutine退出
}

// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 检查任务是否已存在
	if _, exists := tw.taskRegistry[key]; exists {
		return fmt.Errorf("task with key '%s' already exists", key)
	}

	// 计算延迟对应的轮次和槽位
	delayTicks := int(delay / tw.tick)
	rounds := delayTicks / tw.slotsNum
	slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum

	// 创建新任务
	task := &Task{
		delay:   delay,
		key:     key,
		job:     job,
		rounds:  rounds,
		slotIdx: slotIdx,
	}

	// 将任务添加到对应槽位
	element := tw.slots[slotIdx].PushBack(task)
	task.element = element

	// 注册任务
	tw.taskRegistry[key] = task

	return nil
}

// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	task, exists := tw.taskRegistry[key]
	if !exists {
		return false
	}

	// 从槽位链表中移除
	tw.slots[task.slotIdx].Remove(task.element)

	// 从注册表中移除
	delete(tw.taskRegistry, key)

	return true
}

// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 获取当前槽位的任务链表
	currentList := tw.slots[tw.currentPos]

	// 处理当前槽位的所有任务
	var next *list.Element
	for e := currentList.Front(); e != nil; e = next {
		next = e.Next()
		task := e.Value.(*Task)

		// 如果任务轮次为0,执行任务
		if task.rounds == 0 {
			go task.job() // 异步执行任务
			currentList.Remove(e)
			delete(tw.taskRegistry, task.key)
		} else {
			// 减少轮次
			task.rounds--
		}
	}

	// 移动指针到下一个槽位
	tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}

// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()
	return len(tw.taskRegistry)
}

func main() {
	// 创建时间轮:1秒转动一次,共60个槽位(可管理60秒内的任务)
	tw := NewTimeWheel(time.Second, 60)
	tw.Start()
	defer tw.Stop() // 确保程序退出时停止时间轮

	// 添加3秒后执行的任务
	tw.AddTask("task1", 3*time.Second, func() {
		fmt.Println("Task1 executed at", time.Now().Format("15:04:05"))
	})

	// 添加10秒后执行的任务
	tw.AddTask("task2", 10*time.Second, func() {
		fmt.Println("Task2 executed at", time.Now().Format("15:04:05"))
	})

	// 添加61秒后执行的任务(测试跨轮次)
	tw.AddTask("task3", 61*time.Second, func() {
		fmt.Println("Task3 executed at", time.Now().Format("15:04:05"))
	})

	fmt.Println("Time wheel started. Current tasks:", tw.GetTaskCount())

	// 模拟运行一段时间
	time.Sleep(5 * time.Second)
	fmt.Println("After 5 seconds, tasks left:", tw.GetTaskCount())

	// 移除task2
	if tw.RemoveTask("task2") {
		fmt.Println("Task2 removed")
	}

	// 等待所有任务完成
	time.Sleep(65 * time.Second)
}

欢迎关注,持续更新~

#如何学习go语言##Go语言##golang#

相关文章

webview 渲染机制:硬件加速方式渲染的Android Web

webview 渲染是什么?webview 渲染是用于展现web页面的控件; webview 可以内嵌在移动端,实现前端的混合式开发,大多数混合式开发框架都是基于 webview 模式进行二次开发的w...

Android主流UI开源库整理(android完整开源项目)

前言最近老大让我整理一份 Android主流UI开源库 的资料,以补充公司的Android知识库。由于对格式不做特别限制,于是打算用博客的形式记录下来,方便查看、防丢并且可以持续维护、不断更新。标题隐...

CPU「离奇」飙到 100%!开发者挖出 Linux 内核 16 年老 Bug:这么多年竟无人发现?

【CSDN 编者按】每一次对旧设备的升级都仿佛是一场跨越时代的冒险。本文作者致力于将基于 PXA166 的 Chumby 8 设备从 Linux 2.6.28 版本升级到现代 6.x 版本,然而,在看...

Linux系统编程:条件变量为什么要用锁

条件变量可以解决线程同步和共享资源访问的问题,条件变量是对互斥锁的补充,它允许一个线程阻塞并等待另一个线程发送的信号,当收到信号时,阻塞的线程被唤醒并试图锁定与之相关的互斥锁。具体定义如下:等待:in...

C++26中同步与原子操作新变化(c++ 同步)

引言随着多核处理器和并发编程的普及,C++26进一步增强了对同步与原子操作的支持,为开发者提供了更高效、更安全的工具来应对多线程编程中的数据竞争与同步挑战。自C++11引入原子操作以来,C++标准库在...