Overview
What happens if we miss some messages? Sure, we could launch a new subscription after pre-populating our chat window with the older, unseen messages, but why not get it all in one?
We can do this by adding a cursor to our subscription field. We'll use this cursor to pass a timestamp for the last message received.
type Subscription {# Start a new subscription for messages in a particular conversation, with an optional timestamp cursor to replay fromlistenForMessageInConversation(id: ID!fromMessageReceivedAt: String): Message}
To make this work, we'll first need to publish our changes.
rover subgraph publish APOLLO_GRAPH_REF \--name messages--schema ./src/schema.graphql
Try swapping out our listenForMessageInConversation
for the following. This code makes use of asynchronous generators to replay all the messages that were missed after a certain point.
import { Resolvers } from "../__generated__/resolvers-types";import { NewMessageEvent } from "../datasources/models";export const Subscription: Resolvers = {Subscription: {listenForMessageInConversation: {// @ts-ignoresubscribe: async (_,{ fromMessageReceivedAt, id },{ pubsub, dataSources }) => {// GOAL: If a cursor `fromMessageReceivedAt` is passed, fetch all messages sent after// Check whether a timestamp was passedconst timestampMs = parseInt(fromMessageReceivedAt);// Validate that timestamp is a number, if so retrieve messages sent after that timestampif (!isNaN(timestampMs) && timestampMs > 0) {const messages = await dataSources.db.getMessagesAfterDate(timestampMs,id);return {// Set up the generatorasync *[Symbol.asyncIterator]() {console.log("STEP 1: I am called the first time the subscription runs!");// Initially, iterate through all the messages to "play back" what was missed// We're not awaiting NEW messages, just yielding the messages we already have in DBfor (let i = 0; i < messages.length; i++) {yield { listenForMessageInConversation: messages[i] };}console.log("STEP 2 TO INFINITY: creating a new iterator for each event");// The thing we want to do with every new messagelet iterator = {[Symbol.asyncIterator]: () =>pubsub.asyncIterator<NewMessageEvent>(["NEW_MESSAGE_SENT"]),};// The loop that awaits new message events and yields themfor await (const event of iterator) {if (event.conversationId == id) {yield event;}}},};// If no timestamp is passed, handle new messages as we normally would}return pubsub.asyncIterator(["NEW_MESSAGE_SENT"]);},},},};
Running an operation
Let's jump back into Studio. We've sent a few messages now, so let's grab one of the older timestamps.
Run the following operation to access all of our past messages in the "xeno-ripley-chat"
conversation.
query GetConversationWithRecipient($recipientId: ID!) {conversation(recipientId: $recipientId) {idcreatedAtmessages {idtextsentTime}}}
And in the Variables panel:
{"recipientId": "ripley"}
Make sure your Headers contain the following:
Authorization: Bearer xeno
We should see some output in the Response panel. Let's grab the sentTime
value for that first message, and set up our subscription.
subscription SubscribeToMessages($listenForMessageInConversationId: ID!$fromMessageReceivedAt: String) {listenForMessageInConversation(id: $listenForMessageInConversationIdfromMessageReceivedAt: $fromMessageReceivedAt) {idtext}}
In the Variables panel, make sure you include both variables.
{"listenForMessageInConversationId": "xeno-ripley-chat","fromMessageReceivedAt": "YOUR-TIMESTAMP-HERE"}
Run the subscription and... we'll see immediate replay! Then we can proceed with sending messages to the conversation as normal.
Share your questions and comments about this lesson
Your feedback helps us improve! If you're stuck or confused, let us know and we'll help you out. All comments are public and must follow the Apollo Code of Conduct. Note that comments that have been resolved or addressed may be removed.
You'll need a GitHub account to post below. Don't have one? Post in our Odyssey forum instead.