Asynq实现Go异步crontab定时任务
最近在用Go写运维平台, 需要在Go应用程序中非同步处理任务, go cron并不能满足我的需求,于是在github发现了Asynq库。 让我们来动手实验asynq的用法吧!
Asynq是一个Go库,用于将任务排队并与工作者异步处理它们。它由Redis支持,设计成可扩展且易于上手。
任务队列被用作跨多台机器分配工作的机制。
一个系统可以由多个工作服务器和代理组成,从而实现高可用性和水平扩展。
Highlevel overview of how Asynq works:
- Client puts task on a queue
- Server pulls task off queues and starts a worker goroutine for each task
- Tasks are processed concurrently by multiple workers
快速开始
首先,确保你在本地运行Redis服务器。
$ redis-server
安装Asynq库
go get -u github.com/hibiken/asynq go get -u github.com/hibiken/asynq/tools/asynqmon
创建项目
mkdir ziji && cd ziji go mod init ziji mkdir tasks touch tasks/beta.go tasks/worker.go tasks/task.go
Redis 连接选项
Asynq 使用 Redis 作为消息代理, beta.go
和 worker.go
都需要连接到 Redis 进行写入和读取。
我们将使用 RedisClientOpt
指定如何连接到本地 Redis 实例。
beta.go
package tasks import ( "github.com/hibiken/asynq" "log" "pigs/common" "pigs/models/cmdb" ) func TaskBeta() { c := common.CONFIG.Redis // 周期性任务 scheduler := asynq.NewScheduler( asynq.RedisClientOpt{ Addr: c.Host, Username: c.UserName, Password: c.PassWord, DB: c.DB, }, nil) var account cmdb.CloudPlatform common.DB.Table("cloud_platform").Where("enable != ? and type = ?", 0, "aliyun").Find(&account) syncResource := NewAliCloudTask(&account) // 每隔5分钟同步一次 entryID, err := scheduler.Register("*/5 * * * *", syncResource) if err != nil { log.Fatal(err) } log.Printf("registered an entry: %q\n", entryID) if err := scheduler.Run(); err != nil { log.Fatal(err) } }
- NewScheduler
将运行调度程序,用于定期处理任务。调度器定期对任务排队,然后由集群中可用的工作服务器执行。
- 时区
默认情况下,定期任务计划使用UTC时区,更改默认时区可以使用SchedulerOpts参数
scheduler.Register
接受三个参数,cron时间、任务、队列名asynq.Queue("cloud")
// Example of using America/Los_Angeles timezone instead of the default UTC timezone. loc, err := time.LoadLocation("America/Los_Angeles") if err != nil { panic(err) } scheduler := asynq.NewScheduler( redisConnOpt, &asynq.SchedulerOpts{ Location: loc, }, )
task.go
package tasks import ( "context" "encoding/json" "github.com/hibiken/asynq" "log" "pigs/inner/cloud/cloudsync" "pigs/inner/cloud/cloudvendor" "pigs/models/cmdb" ) const ( SyncAliYunCloud = "cmdb:aliyun" SyncTencentCloud = "cmdb:tencent" ) // NewAliCloudTask 同步阿里云资产同步任务 func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task { payload, err := json.Marshal(conf) if err != nil { panic(err) } return asynq.NewTask(SyncAliYunCloud, payload) } func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error { var a cmdb.CloudPlatform if err := json.Unmarshal(t.Payload(), &a); err != nil { return err } _, err := cloudvendor.GetVendorClient(&a) if err != nil { log.Fatalf("AccountVerify GetVendorClient failed,%v", err) return err } cloudsync.SyncAliYunHost(&a) log.Printf("Aliyun Cloud assets are successfully synchronized...") return nil }
Tasks 任务
在 asynq
中,工作单元被封装为 Task
类型。
其中有两个字段:“类型” 和 “有效载荷”。
// Task represents a task to be performed. type Task struct { // Type indicates the type of a task to be performed. Type string // Payload holds data needed to perform the task. Payload Payload }
Type
是一个简单的字符串值,指示给定任务的类型。
Payload
保存执行任务所需的数据,您可以将其视为 map[string]interface{}
。需要注意的重要一件事是有效负载值必须是可序列化的。
worker.go
package tasks import ( "context" "pigs/common" "time" "log" "os" "os/signal" "github.com/hibiken/asynq" "golang.org/x/sys/unix" ) // loggingMiddleware 记录任务日志中间件 func loggingMiddleware(h asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { start := time.Now() log.Printf("Start processing %q", t.Type()) err := h.ProcessTask(ctx, t) if err != nil { return err } log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start)) return nil }) } func TaskWorker() { c := common.CONFIG.Redis srv := asynq.NewServer( asynq.RedisClientOpt{ Addr: c.Host, Username: c.UserName, Password: c.PassWord, DB: c.DB, }, asynq.Config{Concurrency: 20}, ) mux := asynq.NewServeMux() mux.Use(loggingMiddleware) // 任务执行时的handle mux.HandleFunc(SyncAliYunCloud, HandleAliCloudTask) // start server if err := srv.Start(mux); err != nil { log.Fatalf("could not start server: %v", err) } // Wait for termination signal. sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) for { s := <-sigs if s == unix.SIGTSTP { srv.Shutdown() continue } break } // Stop worker server. srv.Stop() }
示例
建立任务使用NewTask
方法,并为任务传递类型和有效负载。 可以通过Client.Schedule
传入任务和需要处理的时间来计划任务
func main() { client := asynq.NewClient(redis) // 创建任务,声明任务类型,有效负载 t1 := asynq.NewTask("send_register_email", map[string]interface{}{"userName": "zhangsan"}) t2 := asynq.NewTask("send_forget_email", map[string]interface{}{"userName": "zhangsan"}) // 立即处理任务 err := client.Enqueue(t1, time.Now()) if err != nil { log.Fatal(err) } // 2小时后处理任务, 延迟任务 err := client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour))) if err != nil { log.Fatal(err) } }
asynq.Client 支持三种调度任务的方法:Enqueue,EnqueueIn 和 EnqueueAt。
使用 client.Enqueue 将任务立即加入队列。
使用 client.EnqueueIn 或 client.EnqueueAt 来安排将来要处理的任务。 EnqueueAt支持 2021-11-11 15:10:00 时间格式定时执行任务
// 定时任务 package tasks import ( "fmt" "github.com/hibiken/asynq" "log" "pigs/common" "pigs/models/cmdb" "time" ) type EmailTaskPayloadTest struct { UserID int64 Msg string } func TaskBeta() { client := asynq.NewClient( asynq.RedisClientOpt{ Addr: ":6379", Password: "", }) payload, err := json.Marshal(EmailTaskPayloadTest{ UserID: 100, Msg: "test", }) if err != nil { log.Fatal(err) } t1 := asynq.NewTask("task:oneTask", payload) // 定时执行任务时间 setDate := "2021-11-11 15:10:00" dateFormats := "2006-01-02 15:04:05" // 获取时区 loc, _ := time.LoadLocation("Local") // 指定日期 转 当地 日期对象 类型为 time.Time timeObj, err := time.ParseInLocation(dateFormats, setDate, loc) if err != nil { fmt.Println("parse time failed err :", err) return } info, err := client.Enqueue(t1, asynq.ProcessAt(timeObj), asynq.Queue("test")) if err != nil { log.Fatal(err) } log.Printf(" [*] Successfully enqueued task: %+v", info)
启动任务
go run beta.go go run worker.go
asynq: pid=1274467 2021/11/17 04:27:34.960443 INFO: Starting processing
2021/11/17 12:27:34 /home/risk/code/pigs/tasks/beat.go:25
2021/11/17 12:27:34 registered an entry: “fe209263-1561-4b94-8b72-98b29bab6efe”
asynq: pid=1274467 2021/11/17 04:27:34.962365 INFO: Scheduler starting
asynq: pid=1274467 2021/11/17 04:27:34.962371 INFO: Scheduler timezone is set to UTC
asynq: pid=1274467 2021/11/17 04:27:34.962379 INFO: Send signal TERM or INT to stop the scheduler
2021/11/17 12:45:13 Aliyun Cloud assets are successfully synchronized…
2021/11/17 12:45:13 Finished processing “cmdb:aliyun”: Elapsed Time = 12.471785932s
Web UI
Asynqmon是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的README。
参考文档:
https://github.com/hibiken/asynq/wiki/Getting-Started