Open this lesson in your favourite AI. It'll walk you through the why, explain the demo, and quiz you on the try-it list.
Change streams let you subscribe to a real-time stream of all insert, update, replace, and delete operations on a collection, database, or entire cluster — without polling. This is the foundation of CDC (Change Data Capture) in MongoDB: downstream systems stay in sync without dual-writes, without scheduled batch jobs, and without tight coupling between services. Change streams use the oplog under the hood and require a replica set (or sharded cluster), which means they also work in local development with a single-node replica set — a common gotcha when change streams 'work in prod but not locally'.
Watch a collection for inserts and updates in real time using a change stream.
fullDocument field. Then run an update — confirm a separate event fires with operationType: 'update'._id of each change event. Stop the watcher, insert 3 more documents, then restart using resumeAfter: storedToken. Verify you receive the 3 missed events.rs.initiate() and confirm it works.Use these three in order. Each builds on the one before.
What is a MongoDB change stream? What types of operations does it capture and what are the prerequisites for using it?
MongoDB change streams are built on the oplog. Explain what the oplog is, how it relates to replication, and why change streams require a replica set.
I need to fan out a change stream to 5 downstream consumers without each consumer reading the oplog independently. What architecture pattern handles this, and how do resume tokens factor into fault tolerance?
package main
import (
"context"; "fmt"; "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
client, _ := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017/?replicaSet=rs0"))
col := client.Database("demo").Collection("orders")
// Watch only inserts
pipeline := bson.A{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
stream, _ := col.Watch(context.TODO(), pipeline)
defer stream.Close(context.TODO())
fmt.Println("Watching for inserts...")
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
for stream.Next(ctx) {
var event bson.M
stream.Decode(&event)
fmt.Println("New order:", event["fullDocument"])
}
}go run main.go