RabbitMQ-官方文档-Work Queues-Go

工作队列(Work Queues)

image.png

第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。

这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。

准备

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整图片大小或渲染pdf文件,所以让我们假装我们很忙——利用时间。睡眠功能。我们将把字符串中点的数量作为它的复杂度;每个点将占“工作”的一秒钟。例如,Hello…只需要三秒钟。

我们会稍微修改一下发送。使用前面示例中的代码,允许从命令行发送任意消息。这个程序将把任务调度到我们的工作队列中,所以我们将它命名为new_task.go:

package main







import (



	"context"
	"log"

	"os"
	"strings"
	"time"


	amqp "github.com/rabbitmq/amqp091-go"
)



func failOnError(err error, msg string) {

	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}



func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	q, err := ch.QueueDeclare("hello", true, false, false, false, nil)
	failOnError(err, "Failed to declare a queue")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()


	body := bodyFrom(os.Args)
	err = ch.PublishWithContext(ctx,
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)
}

消费者Go脚本还需要一些更改:它需要为消息体中的每个点进行一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其命名为worker.go:

package main







import (



	"bytes"
	"log"

	"time"


	amqp "github.com/rabbitmq/amqp091-go"
)



func failOnError(err error, msg string) {

	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello_world", // name
		true,          // durable
		false,         // delete when unused
		false,         // exclusive
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "Failed to declare a queue")
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			dotCount := bytes.Count(d.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

像教程1中那样运行它们:

# shell 1

go run worker.go




# shell 2

go run new_task.go

循环调度

使用任务队列的优点之一是能够轻松地并行化工作。如果我们积压了大量的工作,我们可以增加更多的工作人员,这样就很容易扩大规模。

首先,让我们试着运行两个worker。同时运行Go脚本。它们都将从队列中获取消息,但具体如何获取呢?让我们来看看。

你需要打开三个控制台。两个人将运行worker.go脚本。这些控制台将是我们的两个消费者- C1和C2。

# shell 1

go run worker.go




# => [*] Waiting for messages. To exit press CTRL+C

# shell 2

go run worker.go




# => [*] Waiting for messages. To exit press CTRL+C

在第三个终端中,我们将发布新的任务。一旦你启动了消费者,你就可以发布一些消息:

# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....

让我们看看我们的woker得到了什么:

# shell 1
go run worker.go




# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
go run worker.go




# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为轮询。可以在有三个或更多的worker的情况下尝试一下。

消息确认(Message acknowledgment)

执行一个任务可能需要几秒钟的时间,您可能想知道,如果消费者启动一个长任务,并在完成之前终止,会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果终止一个worker,它正在处理的消息将丢失。发送给此特定worker但尚未处理的消息也会丢失。

但我们不想丢掉任何任务。如果一个工人死了,我们希望把任务交给另一个工人。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个ack(知识)由消费者返回,告诉RabbitMQ一个特定的消息已经被接收和处理,并且RabbitMQ可以自由地删除它。

如果一个消费者死亡(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送ack, RabbitMQ将理解消息没有被完全处理,并将重新将其排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。这样就可以确保没有信息丢失,即使工作人员偶尔会死亡。

在消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有bug的(卡住的)消费者。您可以按照交付确认超时中所述增加此超时。

在本教程中,我们将通过为“auto-ack”参数传递false来使用手动消息确认,然后在完成任务后,使用d.Ack(false)从worker发送适当的确认(此确认单个交付)。

我们已经学会了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要将其声明为持久的:

q, err := ch.QueueDeclare(

  "hello",      // name
  true,         // durable

  false,        // delete when unused

  false,        // exclusive

  false,        // no-wait

  nil,          // arguments

)

failOnError(err, "Failed to declare a queue")

虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许你用不同的参数重新定义一个已经存在的队列,任何试图这样做的程序都会返回一个错误。但是有一个快速的解决方案——让我们用不同的名字声明一个队列,例如task_queue:

q, err := ch.QueueDeclare(

  "task_queue", // name
  true,         // durable

  false,        // delete when unused

  false,        // exclusive

  false,        // no-wait

  nil,          // arguments

)

failOnError(err, "Failed to declare a queue")

durable选项更改需要同时应用于生产者和消费者代码。
此时我们可以确定,即使RabbitMQ重启,task_queue队列也不会丢失。现在我们需要使用amqp将消息标记为持久消息-通过在amqp.Publishing中使用amqp.Persistent选项

err = ch.PublishWithContext(ctx,
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
})

将消息标记为持久消息并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存它时,仍然有很短的时间窗口。而且,RabbitMQ并没有对每条消息进行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,那么你可以使用发行商确认。

公平调度

您可能已经注意到调度仍然没有完全按照我们希望的那样工作。例如,在两个worker的情况下,当所有奇数消息都很重,偶数消息很轻时,一个worker将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地分发消息。

这是因为RabbitMQ只在消息进入队列时调度消息。它不关注消费者未确认消息的数量。它只是盲目地将第n条消息分派给第n个消费者。

image.png

为了解决这个问题,我们可以将预取计数设置为1。这告诉RabbitMQ一次不要给一个worker发送多个消息。或者,换句话说,在工作线程处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工人。

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")

Note about queue size

如果所有的工人都很忙,你的队列就会被填满。你需要密切关注这一点,可能会增加更多的员工,或者有其他的策略。

全部代码

new_task.go

package main







import (



        "context"
        "log"

        "os"
        "strings"
        "time"


        amqp "github.com/rabbitmq/amqp091-go"
)



func failOnError(err error, msg string) {

        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}



func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        body := bodyFrom(os.Args)
        err = ch.PublishWithContext(ctx,
                "",           // exchange
                q.Name,       // routing key
                false,        // mandatory
                false,
                amqp.Publishing{
                        DeliveryMode: amqp.Persistent,
                        ContentType:  "text/plain",
                        Body:         []byte(body),
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
}


func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

worker.go::

package main







import (



        "bytes"
        "log"

        "time"


        amqp "github.com/rabbitmq/amqp091-go"
)



func failOnError(err error, msg string) {

        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}


func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "task_queue", // name
                true,         // durable
                false,        // delete when unused
                false,        // exclusive
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")


        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        var forever chan struct{}


        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                        dotCount := bytes.Count(d.Body, []byte("."))
                        t := time.Duration(dotCount)
                        time.Sleep(t * time.Second)
                        log.Printf("Done")
                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

使用消息确认和预取计数可以设置工作队列。持久性选项让任务即使在RabbitMQ重新启动时也能存活。

有关amqp的更多信息。通道方法和消息属性,可以浏览amqp API参考

请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。

在发布应用之前,请先查看其他文档。我们特别推荐以下指南:发行商确认和消费者确认,生产清单和监控。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY87kROc' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片