Asynq实现Go异步crontab定时任务

最近在用Go写运维平台, 需要在Go应用程序中非同步处理任务, go cron并不能满足我的需求,于是在github发现了Asynq库。 让我们来动手实验asynq的用法吧!

Asynq是一个Go库,用于将任务排队并与工作者异步处理它们。它由Redis支持,设计成可扩展且易于上手。

任务队列被用作跨多台机器分配工作的机制。

一个系统可以由多个工作服务器和代理组成,从而实现高可用性和水平扩展。

 

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

 

快速开始

首先,确保你在本地运行Redis服务器。

$ redis-server

 

安装Asynq库

go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon

 

创建项目

mkdir ziji && cd ziji
go mod init ziji
mkdir tasks
touch tasks/beta.go  tasks/worker.go  tasks/task.go

 

Redis 连接选项

Asynq 使用 Redis 作为消息代理, beta.go worker.go 都需要连接到 Redis 进行写入和读取。
我们将使用 RedisClientOpt 指定如何连接到本地 Redis 实例。

beta.go

package tasks

import (
   "github.com/hibiken/asynq"
   "log"
   "pigs/common"
   "pigs/models/cmdb"
)

func TaskBeta() {
   c := common.CONFIG.Redis
   // 周期性任务
   scheduler := asynq.NewScheduler(
      asynq.RedisClientOpt{
         Addr:     c.Host,
         Username: c.UserName,
         Password: c.PassWord,
         DB:       c.DB,
      }, nil)

   var account cmdb.CloudPlatform
   common.DB.Table("cloud_platform").Where("enable != ? and type = ?", 0, "aliyun").Find(&account)
   syncResource := NewAliCloudTask(&account)
   // 每隔5分钟同步一次
   entryID, err := scheduler.Register("*/5 * * * *", syncResource)

   if err != nil {
      log.Fatal(err)
   }
   log.Printf("registered an entry: %q\n", entryID)

   if err := scheduler.Run(); err != nil {
      log.Fatal(err)
   }
}

 

  • NewScheduler

将运行调度程序,用于定期处理任务。调度器定期对任务排队,然后由集群中可用的工作服务器执行。

  • 时区

默认情况下,定期任务计划使用UTC时区,更改默认时区可以使用SchedulerOpts参数

scheduler.Register 接受三个参数,cron时间任务队列名asynq.Queue("cloud")

// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

 

task.go

package tasks

import (
   "context"
   "encoding/json"
   "github.com/hibiken/asynq"
   "log"
   "pigs/inner/cloud/cloudsync"
   "pigs/inner/cloud/cloudvendor"
   "pigs/models/cmdb"
)

const (
   SyncAliYunCloud  = "cmdb:aliyun"
   SyncTencentCloud = "cmdb:tencent"
)

// NewAliCloudTask 同步阿里云资产同步任务
func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task {
   payload, err := json.Marshal(conf)
   if err != nil {
      panic(err)
   }
   return asynq.NewTask(SyncAliYunCloud, payload)
}

func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error {

   var a cmdb.CloudPlatform
   if err := json.Unmarshal(t.Payload(), &a); err != nil {
      return err
   }

   _, err := cloudvendor.GetVendorClient(&a)
   if err != nil {
      log.Fatalf("AccountVerify GetVendorClient failed,%v", err)
      return err
   }

   cloudsync.SyncAliYunHost(&a)

   log.Printf("Aliyun Cloud assets are successfully synchronized...")
   return nil
}

 

Tasks 任务

在 asynq 中,工作单元被封装为 Task 类型。
其中有两个字段:“类型” 和 “有效载荷”。

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of a task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}

Type 是一个简单的字符串值,指示给定任务的类型。
Payload 保存执行任务所需的数据,您可以将其视为 map[string]interface{}。需要注意的重要一件事是有效负载值必须是可序列化的。

 

worker.go

package tasks

import (
   "context"
   "pigs/common"
   "time"

   "log"
   "os"
   "os/signal"

   "github.com/hibiken/asynq"
   "golang.org/x/sys/unix"
)

// loggingMiddleware 记录任务日志中间件
func loggingMiddleware(h asynq.Handler) asynq.Handler {
   return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
      start := time.Now()
      log.Printf("Start processing %q", t.Type())
      err := h.ProcessTask(ctx, t)
      if err != nil {
         return err
      }
      log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
      return nil
   })
}

func TaskWorker() {
   c := common.CONFIG.Redis
   srv := asynq.NewServer(
      asynq.RedisClientOpt{
         Addr:     c.Host,
         Username: c.UserName,
         Password: c.PassWord,
         DB:       c.DB,
      },
      asynq.Config{Concurrency: 20},
   )

   mux := asynq.NewServeMux()
   mux.Use(loggingMiddleware)
   // 任务执行时的handle
   mux.HandleFunc(SyncAliYunCloud, HandleAliCloudTask)

   // start server
   if err := srv.Start(mux); err != nil {
      log.Fatalf("could not start server: %v", err)
   }

   // Wait for termination signal.
   sigs := make(chan os.Signal, 1)
   signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
   for {
      s := <-sigs
      if s == unix.SIGTSTP {
         srv.Shutdown()
         continue
      }
      break
   }

   // Stop worker server.
   srv.Stop()
}

 

示例

建立任务使用NewTask方法,并为任务传递类型和有效负载。 可以通过Client.Schedule传入任务和需要处理的时间来计划任务

func main() {
  client := asynq.NewClient(redis)
  
  // 创建任务,声明任务类型,有效负载
  t1 := asynq.NewTask("send_register_email", map[string]interface{}{"userName": "zhangsan"})
  t2 := asynq.NewTask("send_forget_email", map[string]interface{}{"userName": "zhangsan"})

  // 立即处理任务
  err := client.Enqueue(t1, time.Now())
  if err != nil {
    log.Fatal(err)
  }

  // 2小时后处理任务, 延迟任务
  err := client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
  if err != nil {
    log.Fatal(err)
  }
}

 

asynq.Client 支持三种调度任务的方法:EnqueueEnqueueInEnqueueAt
使用 client.Enqueue 将任务立即加入队列。
使用 client.EnqueueInclient.EnqueueAt 来安排将来要处理的任务。 EnqueueAt支持 2021-11-11 15:10:00 时间格式定时执行任务

// 定时任务
package tasks

import (
   "fmt"
   "github.com/hibiken/asynq"
   "log"
   "pigs/common"
   "pigs/models/cmdb"
   "time"
)

type EmailTaskPayloadTest struct {
	UserID  int64
	Msg		string
}

func TaskBeta() {
   client := asynq.NewClient(
      asynq.RedisClientOpt{
         Addr:     ":6379",
         Password: "",
      })
   
   payload, err := json.Marshal(EmailTaskPayloadTest{
      UserID: 100,
      Msg:    "test",
   })
   if err != nil {
      log.Fatal(err)
   }
   t1 := asynq.NewTask("task:oneTask", payload)
   
   // 定时执行任务时间
   setDate := "2021-11-11 15:10:00"
   dateFormats := "2006-01-02 15:04:05"

   // 获取时区
   loc, _ := time.LoadLocation("Local")

   // 指定日期 转 当地 日期对象 类型为 time.Time
   timeObj, err := time.ParseInLocation(dateFormats, setDate, loc)
   if err != nil {
      fmt.Println("parse time failed err :", err)
      return
   }

   info, err := client.Enqueue(t1, asynq.ProcessAt(timeObj), asynq.Queue("test"))
   if err != nil {
      log.Fatal(err)
   }
   
   log.Printf(" [*] Successfully enqueued task: %+v", info)

 

启动任务

go run beta.go
go run worker.go

asynq: pid=1274467 2021/11/17 04:27:34.960443 INFO: Starting processing

2021/11/17 12:27:34 /home/risk/code/pigs/tasks/beat.go:25
2021/11/17 12:27:34 registered an entry: “fe209263-1561-4b94-8b72-98b29bab6efe”
asynq: pid=1274467 2021/11/17 04:27:34.962365 INFO: Scheduler starting
asynq: pid=1274467 2021/11/17 04:27:34.962371 INFO: Scheduler timezone is set to UTC
asynq: pid=1274467 2021/11/17 04:27:34.962379 INFO: Send signal TERM or INT to stop the scheduler

 

2021/11/17 12:45:13 Aliyun Cloud assets are successfully synchronized…
2021/11/17 12:45:13 Finished processing “cmdb:aliyun”: Elapsed Time = 12.471785932s

 

Web UI

Asynqmon是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的README

 

 

 

 

参考文档:

https://github.com/hibiken/asynq/wiki/Getting-Started

 

 

发表评论