Realtime: Stream updates from Inngest functions

Inngest Realtime enables you to stream updates from your functions, power live UIs, and implement bi-directional workflows such as Human-in-the-Loop. Use channels and topics to broadcast updates, stream logs, or await user input.

Pattern: Stream updates from a single function run

Enable users to follow the progress of a long-running task by streaming updates from a dedicated channel. Here's how to trigger a function and subscribe to its updates:

src/inngest/channels.ts
import { realtime, staticSchema } from "inngest";

export const helloChannel = realtime.channel({
  name: ({ uuid }: { uuid: string }) => `hello-world:${uuid}`,
  topics: {
    logs: { schema: staticSchema<{ message: string }>() },
  },
});
app/api/hello-world/route.ts
import crypto from "crypto";
import { inngest } from "@/inngest/client";
import { subscribe } from "inngest/realtime";
import { helloChannel } from "@/inngest/channels";

export async function POST(req: Request) {
  const json = await req.json();
  const { prompt } = json;
  const uuid = crypto.randomUUID();

  await inngest.send({
    name: "hello-world/hello",
    data: { uuid },
  });

  const ch = helloChannel({ uuid });

  const stream = await subscribe({
    app: inngest,
    channel: ch,
    topics: ["logs"],
  });

  return new Response(stream.getEncodedStream(), {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

Your function can then publish updates to this channel:

src/inngest/functions/hello-world.ts
import { inngest } from "../client";
import { helloChannel } from "../channels";

export const someTask = inngest.createFunction(
  { id: "hello-world", triggers: [{ event: "hello-world/hello" }] },
  async ({ event, step, publish }) => {
    const { uuid } = event.data;
    const ch = helloChannel({ uuid });
    await publish(ch.logs, { message: "Hello, world!" });
  }
);

By creating a channel with a unique identifier, you can stream updates for a specific run to the end user.

Pattern: Stream updates from multiple function runs

A Realtime channel can be used to stream updates from multiple function runs. Here, we'll define two channels: one global channel and one post-specific channel:

src/inngest/channels.ts
import { realtime, staticSchema } from "inngest";
import { z } from "zod";

export const globalChannel = realtime.channel({
  name: "global",
  topics: {
    logs: { schema: staticSchema<{ message: string }>() },
  },
});

export const postChannel = realtime.channel({
  name: ({ postId }: { postId: string }) => `post:${postId}`,
  topics: {
    updated: {
      schema: z.object({
        id: z.string(),
        likes: z.number(),
      }),
    },
    deleted: {
      schema: z.object({
        id: z.string(),
        reason: z.string(),
      }),
    },
  },
});

Our likePost function will publish updates to both channels. Note that globalChannel is a static channel (string name), so it can be used directly without instantiation. postChannel is parameterized, so we call it with the post ID to get a channel instance:

src/inngest/functions/likePost.ts
import { inngest } from "../client";
import { globalChannel, postChannel } from "../channels";

export const likePost = inngest.createFunction(
  {
    id: "post/like",
    retries: 0,
    triggers: [{ event: "app/post.like" }],
  },
  async ({
    event: {
      data: { postId = "123" },
    },
    step,
    publish,
  }) => {
    if (!postId) {
      await publish(globalChannel.logs, {
        message: "Missing postId when trying to like post",
      });
      throw new Error("Missing postId");
    }

    await publish(globalChannel.logs, {
      message: `Liking post ${postId}`,
    });

    const post = await step.run("update-likes", async () => {
      const fakePost = {
        id: "123",
        likes: Math.floor(Math.random() * 10000),
      };

      const ch = postChannel({ postId: fakePost.id });
      await publish(ch.updated, fakePost);
      return fakePost;
    });

    return post;
  }
);

The globalChannel will be used to stream updates for all posts, and the postChannel will be used to stream updates for specific posts.

Human in the loop: Bi-directional workflows

Combine Realtime with waitForEvent() to enable workflows that require user input, such as review or approval steps. Here's how to send a message to the user and wait for their confirmation:

src/inngest/channels.ts
import { realtime } from "inngest";
import { z } from "zod";

export const agenticWorkflowChannel = realtime.channel({
  name: "agentic-workflow",
  topics: {
    messages: {
      schema: z.object({
        message: z.string(),
        confirmationId: z.string(),
      }),
    },
  },
});
src/inngest/functions/agentic-workflow.ts
import crypto from "crypto";
import { inngest } from "../client";
import { agenticWorkflowChannel } from "../channels";

export const agenticWorkflow = inngest.createFunction(
  { id: "agentic-workflow", triggers: [{ event: "agentic-workflow/start" }] },
  async ({ event, step, publish }) => {
    await step.run(/* ... */);

    const confirmationId = await step.run("get-confirmation-id", async () =>
      crypto.randomUUID()
    );

    await publish(agenticWorkflowChannel.messages, {
      message: "Confirm to proceed?",
      confirmationId,
    });

    const confirmation = await step.waitForEvent("wait-for-confirmation", {
      event: "agentic-workflow/confirmation",
      timeout: "15m",
      if: `async.data.confirmationId == \"${confirmationId}\"`,
    });

    if (confirmation) {
      // continue workflow
    }
  }
);

The confirmationId links the published message to the reply event, ensuring the correct user response is handled.

Learn more