使用 Remix.js 服务器发送的事件和作业队列构建实时应用程序

你将学到什么

我希望阅读本文后您有足够的背景知识,可以利用 SSE 和作业队列在 Remix.js 中创建更复杂的事件驱动应用程序。

动图最终应用

本文涵盖的内容

我们将涵盖几个方面,包括:

  • Remix.js 路由
  • [Quirrel]作业队列
  • Remix.js 服务器发送的事件

先决条件

在开始阅读本文之前,建议您了解 React、Remix 以及队列和服务器发送事件的一些概念。

创建项目

要在 Remix 中初始化项目,我们执行以下命令:

npx create-remix@latest nixy
cd nixy

我们使用以下命令启动开发服务器:

npm run dev

我们将使用Just the basics类型,部署目标是我们将在应用程序中Remix App Server使用。TypeScript

安装必要的依赖项:

npm install quirrel remix-utils superjson dayjs cuid
npm install -D concurrently

package.json文件中,我们将dev脚本更改为以下内容:

{
  "dev": "concurrently 'remix dev' 'quirrel'",
}

这样,在应用程序开发期间,Quirrel 的服务器将与 Next 的服务器同时运行。要访问 Quirrell UI,只需运行以下命令:

npm run quirrel ui

设置事件发射器

在今天的文章中,我们将有一个简单的例子,其中我们将只有一个进程在运行,这意味着我们将有一个可用的实例,对于这个用例,使用 EventEmitter 是理想的,但其他人[应该] 是如果您有很多实例,请考虑选择。

// @/app/common/emitter.ts
import { EventEmitter } from "events";


let emitter: EventEmitter;

declare global {
  var __emitter: EventEmitter | undefined;
}


if (process.env.NODE_ENV === "production") {
  emitter = new EventEmitter();
} else {
  if (!global.__emitter) {
    global.__emitter = new EventEmitter();
  }
  emitter = global.__emitter;
}

export { emitter };

创建实例后EventEmitter,我们可以继续下一步。

设置作业队列

我们要创建的队列很容易理解,首先我们定义有效载荷的数据类型Queue,在这种情况下我们需要添加标识符。然后,在回调中,我们所做的是发出一个新事件,考虑到队列的名称和Job我们要提交的数据,在这种情况下需要序列化。

// @/app/queues/add.server.ts
import { Queue } from "quirrel/remix";
import superjson from "superjson";


import { emitter } from "~/common/emitter";


export const addQueueEvtName = "addJobQueue";

export default Queue<{ identifier: string }>("queue/add", async (job) => {
  emitter.emit(
    addQueueEvtName,
    superjson.stringify({ identifier: job.identifier })
  );
});

创建队列后,我们可以继续下一步。

设置路线

现在我们已经准备好所有需要使用的东西,我们可以开始定义应用程序的路由了。我们将在应用程序中拥有的路线如下:

  • _index.tsx– 应用程序的主要路线,其中所有实时部分都是可见的。
  • queue.add.ts– 此路由将公开Queue作为操作创建的 ,以便可以公开和使用它。
  • sse.add.ts– 这条路线将创建一个事件流,将每个事件推送到用户界面。

通过在文件夹中创建上述路由routes/,我们可以处理负责事件流的路由:

// @/app/routes/sse.add.ts
import type { LoaderFunction } from "@remix-run/node";
import { eventStream } from "remix-utils";


import { emitter } from "~/common/emitter";

import { addQueueEvtName } from "~/queues/add.server";

export const loader: LoaderFunction = ({ request }) => {
  // event stream setup
  return eventStream(request.signal, (send) => {
    // listener handler
    const listener = (data: string) => {
      // data should be serialized
      send({ data });
    };

    // event listener itself
    emitter.on(addQueueEvtName, listener);


    // cleanup
    return () => {
      emitter.off(addQueueEvtName, listener);
    };
  });
};

在上面的代码片段中,我们使用事件发射器实例来监听每个发出的事件,并使用remix-utilseventStream依赖项的功能,我们可以简化从后端到客户端的实时更新设置。

现在转到Queue路由注册,如前所述,它将是这样的:

// @/app/routes/queue.add.ts
import addQueue from "~/queues/add.server";


export const action = addQueue;

在上面的代码片段中,我们导入了过去创建的队列,并使用actionRemix 的原语公开。

最后但同样重要的是,我们现在可以在应用程序的客户端工作,在那里我们可以利用到目前为止创建的所有内容。该页面将有一个按钮,该按钮将调用服务器端操作以JobQueue.

Job简单,只需要传一个唯一标识,这样在视觉上我们可以识别出每个发出的事件都是真正唯一的,在队列中我们会加个五秒的延迟,保证真正进入队列,并且每Job一个他们被处理。

然后,在 UI 端,我们将使用挂钩useEventSource将组件/页面连接到之前创建的事件流,并使用useEffect将每个发出的事件的消息添加到组件的本地状态。这是为了更新页面中的无序列表JSX。这边走:

// @/app/routes/_index.tsx
import type { V2_MetaFunction } from "@remix-run/node";
import { Form } from "@remix-run/react";
import { useEffect, useState } from "react";
import { useEventSource } from "remix-utils";
import cuid from "cuid";
import superjson from "superjson";
import dayjs from "dayjs";


import addQueue from "~/queues/add.server";

export const meta: V2_MetaFunction = () => {
  return [{ title: "SSE and Quirrel" }];
};

export const action = async () => {
  const currentTime = dayjs();
  const newTime = currentTime.add(5, "second");


  await addQueue.enqueue({ identifier: cuid() }, { runAt: newTime.toDate() });

  return null;
};

export default function Index() {
  const [messages, setMessages] = useState<{ identifier: string }[]>([]);
  const lastMessage = useEventSource("/sse/add");

  useEffect(() => {
    setMessages((datums) => {
      if (lastMessage !== null) {
        return datums.concat(superjson.parse(lastMessage));
      }
      return datums;
    });
  }, [lastMessage]);

  return (
    <div>
      <h2>Server-sent events and Quirrel</h2>

      <ul>
        {messages.map((message, messageIdx) => (
          <li key={messageIdx}>{message.identifier}</li>
        ))}
      </ul>

      <Form method="POST">
        <button type="submit">Add New Job</button>
      </Form>
    </div>
  );
}

至此,我结束了本文的最后一步。值得强调的是,虽然这个例子很简单,但它最终可以作为创建更复杂系统的基础,例如,我们可以使用队列来限制在数据库中进行的插入和更新次数,以减少背压。或者创建调度程序来运行一组任务,如提醒、自动消息、通知等。从这里开始,它的用途是多种多样的。

结论

我希望这篇文章对您有所帮助,无论您是在现有项目中使用这些信息还是只是为了好玩而尝试一下。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYxQ3005' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片