10. Bonus! Replaying events
10m

Overview

What happens if we miss some messages? Sure, we could a new after pre-populating our chat window with the older, unseen messages, but why not get it all in one?

We can do this by adding a to our . We'll use this cursor to pass a timestamp for the last message received.

schema.graphql
type Subscription {
# Start a new subscription for messages in a particular conversation, with an optional timestamp cursor to replay from
listenForMessageInConversation(
id: ID!
fromMessageReceivedAt: String
): Message
}

To make this work, we'll first need to publish our changes.

messages
rover subgraph publish APOLLO_GRAPH_REF \
--name messages
--schema ./src/schema.graphql

Try swapping out our listenForMessageInConversation for the following. This code makes use of asynchronous generators to replay all the messages that were missed after a certain point.

import { Resolvers } from "../__generated__/resolvers-types";
import { NewMessageEvent } from "../datasources/models";
export const Subscription: Resolvers = {
Subscription: {
listenForMessageInConversation: {
// @ts-ignore
subscribe: async (
_,
{ fromMessageReceivedAt, id },
{ pubsub, dataSources }
) => {
// GOAL: If a cursor `fromMessageReceivedAt` is passed, fetch all messages sent after
// Check whether a timestamp was passed
const timestampMs = parseInt(fromMessageReceivedAt);
// Validate that timestamp is a number, if so retrieve messages sent after that timestamp
if (!isNaN(timestampMs) && timestampMs > 0) {
const messages = await dataSources.db.getMessagesAfterDate(
timestampMs,
id
);
return {
// Set up the generator
async *[Symbol.asyncIterator]() {
console.log(
"STEP 1: I am called the first time the subscription runs!"
);
// Initially, iterate through all the messages to "play back" what was missed
// We're not awaiting NEW messages, just yielding the messages we already have in DB
for (let i = 0; i < messages.length; i++) {
yield { listenForMessageInConversation: messages[i] };
}
console.log(
"STEP 2 TO INFINITY: creating a new iterator for each event"
);
// The thing we want to do with every new message
let iterator = {
[Symbol.asyncIterator]: () =>
pubsub.asyncIterator<NewMessageEvent>(["NEW_MESSAGE_SENT"]),
};
// The loop that awaits new message events and yields them
for await (const event of iterator) {
if (event.conversationId == id) {
yield event;
}
}
},
};
// If no timestamp is passed, handle new messages as we normally would
}
return pubsub.asyncIterator(["NEW_MESSAGE_SENT"]);
},
},
},
};

Running an operation

Let's jump back into Studio. We've sent a few messages now, so let's grab one of the older timestamps.

Run the following to access all of our past messages in the "xeno-ripley-chat" conversation.

query GetConversationWithRecipient($recipientId: ID!) {
conversation(recipientId: $recipientId) {
id
createdAt
messages {
id
text
sentTime
}
}
}

And in the Variables panel:

{
"recipientId": "ripley"
}

Make sure your Headers contain the following:

Authorization: Bearer xeno

We should see some output in the Response panel. Let's grab the sentTime value for that first message, and set up our .

subscription SubscribeToMessages(
$listenForMessageInConversationId: ID!
$fromMessageReceivedAt: String
) {
listenForMessageInConversation(
id: $listenForMessageInConversationId
fromMessageReceivedAt: $fromMessageReceivedAt
) {
id
text
}
}

In the Variables panel, make sure you include both .

{
"listenForMessageInConversationId": "xeno-ripley-chat",
"fromMessageReceivedAt": "YOUR-TIMESTAMP-HERE"
}

Run the and... we'll see immediate replay! Then we can proceed with sending messages to the conversation as normal.

Previous

Share your questions and comments about this lesson

Your feedback helps us improve! If you're stuck or confused, let us know and we'll help you out. All comments are public and must follow the Apollo Code of Conduct. Note that comments that have been resolved or addressed may be removed.

You'll need a GitHub account to post below. Don't have one? Post in our Odyssey forum instead.