你将学到什么
我希望阅读本文后您有足够的背景知识,可以利用 SSE 和作业队列在 Remix.js 中创建更复杂的事件驱动应用程序。
动图
本文涵盖的内容
我们将涵盖几个方面,包括:
- Remix.js 路由
- [Quirrel]作业队列
- Remix.js 服务器发送的事件
先决条件
在开始阅读本文之前,建议您了解 React、Remix 以及队列和服务器发送事件的一些概念。
创建项目
要在 Remix 中初始化项目,我们执行以下命令:
npx create-remix@latest nixy
cd nixy
我们使用以下命令启动开发服务器:
npm run dev
我们将使用Just the basics
类型,部署目标是我们将在应用程序中Remix App Server
使用。TypeScript
安装必要的依赖项:
npm install quirrel remix-utils superjson dayjs cuid
npm install -D concurrently
在package.json
文件中,我们将dev
脚本更改为以下内容:
{
"dev": "concurrently 'remix dev' 'quirrel'",
}
这样,在应用程序开发期间,Quirrel 的服务器将与 Next 的服务器同时运行。要访问 Quirrell UI,只需运行以下命令:
npm run quirrel ui
设置事件发射器
在今天的文章中,我们将有一个简单的例子,其中我们将只有一个进程在运行,这意味着我们将有一个可用的实例,对于这个用例,使用 EventEmitter 是理想的,但其他人[应该] 是如果您有很多实例,请考虑选择。
// @/app/common/emitter.ts
import { EventEmitter } from "events";
let emitter: EventEmitter;
declare global {
var __emitter: EventEmitter | undefined;
}
if (process.env.NODE_ENV === "production") {
emitter = new EventEmitter();
} else {
if (!global.__emitter) {
global.__emitter = new EventEmitter();
}
emitter = global.__emitter;
}
export { emitter };
创建实例后EventEmitter
,我们可以继续下一步。
设置作业队列
我们要创建的队列很容易理解,首先我们定义有效载荷的数据类型Queue
,在这种情况下我们需要添加标识符。然后,在回调中,我们所做的是发出一个新事件,考虑到队列的名称和Job
我们要提交的数据,在这种情况下需要序列化。
// @/app/queues/add.server.ts
import { Queue } from "quirrel/remix";
import superjson from "superjson";
import { emitter } from "~/common/emitter";
export const addQueueEvtName = "addJobQueue";
export default Queue<{ identifier: string }>("queue/add", async (job) => {
emitter.emit(
addQueueEvtName,
superjson.stringify({ identifier: job.identifier })
);
});
创建队列后,我们可以继续下一步。
设置路线
现在我们已经准备好所有需要使用的东西,我们可以开始定义应用程序的路由了。我们将在应用程序中拥有的路线如下:
_index.tsx
– 应用程序的主要路线,其中所有实时部分都是可见的。queue.add.ts
– 此路由将公开Queue
作为操作创建的 ,以便可以公开和使用它。sse.add.ts
– 这条路线将创建一个事件流,将每个事件推送到用户界面。
通过在文件夹中创建上述路由routes/
,我们可以处理负责事件流的路由:
// @/app/routes/sse.add.ts
import type { LoaderFunction } from "@remix-run/node";
import { eventStream } from "remix-utils";
import { emitter } from "~/common/emitter";
import { addQueueEvtName } from "~/queues/add.server";
export const loader: LoaderFunction = ({ request }) => {
// event stream setup
return eventStream(request.signal, (send) => {
// listener handler
const listener = (data: string) => {
// data should be serialized
send({ data });
};
// event listener itself
emitter.on(addQueueEvtName, listener);
// cleanup
return () => {
emitter.off(addQueueEvtName, listener);
};
});
};
在上面的代码片段中,我们使用事件发射器实例来监听每个发出的事件,并使用remix-utilseventStream
依赖项的功能,我们可以简化从后端到客户端的实时更新设置。
现在转到Queue
路由注册,如前所述,它将是这样的:
// @/app/routes/queue.add.ts
import addQueue from "~/queues/add.server";
export const action = addQueue;
在上面的代码片段中,我们导入了过去创建的队列,并使用action
Remix 的原语公开。
最后但同样重要的是,我们现在可以在应用程序的客户端工作,在那里我们可以利用到目前为止创建的所有内容。该页面将有一个按钮,该按钮将调用服务器端操作以Job
向Queue
.
很Job
简单,只需要传一个唯一标识,这样在视觉上我们可以识别出每个发出的事件都是真正唯一的,在队列中我们会加个五秒的延迟,保证真正进入队列,并且每Job
一个他们被处理。
然后,在 UI 端,我们将使用挂钩useEventSource
将组件/页面连接到之前创建的事件流,并使用useEffect
将每个发出的事件的消息添加到组件的本地状态。这是为了更新页面中的无序列表JSX
。这边走:
// @/app/routes/_index.tsx
import type { V2_MetaFunction } from "@remix-run/node";
import { Form } from "@remix-run/react";
import { useEffect, useState } from "react";
import { useEventSource } from "remix-utils";
import cuid from "cuid";
import superjson from "superjson";
import dayjs from "dayjs";
import addQueue from "~/queues/add.server";
export const meta: V2_MetaFunction = () => {
return [{ title: "SSE and Quirrel" }];
};
export const action = async () => {
const currentTime = dayjs();
const newTime = currentTime.add(5, "second");
await addQueue.enqueue({ identifier: cuid() }, { runAt: newTime.toDate() });
return null;
};
export default function Index() {
const [messages, setMessages] = useState<{ identifier: string }[]>([]);
const lastMessage = useEventSource("/sse/add");
useEffect(() => {
setMessages((datums) => {
if (lastMessage !== null) {
return datums.concat(superjson.parse(lastMessage));
}
return datums;
});
}, [lastMessage]);
return (
<div>
<h2>Server-sent events and Quirrel</h2>
<ul>
{messages.map((message, messageIdx) => (
<li key={messageIdx}>{message.identifier}</li>
))}
</ul>
<Form method="POST">
<button type="submit">Add New Job</button>
</Form>
</div>
);
}
至此,我结束了本文的最后一步。值得强调的是,虽然这个例子很简单,但它最终可以作为创建更复杂系统的基础,例如,我们可以使用队列来限制在数据库中进行的插入和更新次数,以减少背压。或者创建调度程序来运行一组任务,如提醒、自动消息、通知等。从这里开始,它的用途是多种多样的。
结论
我希望这篇文章对您有所帮助,无论您是在现有项目中使用这些信息还是只是为了好玩而尝试一下。