Mongodb实践

Posted by     "zengchengjie" on Saturday, January 15, 2022

MongoDB 从入门到实战:开发者指南

前言

在NoSQL数据库的阵营中,MongoDB无疑是最耀眼的明星之一。作为一款面向文档的数据库,它以灵活的Schema设计、强大的查询能力和天然的水平扩展特性,赢得了无数开发者的青睐。

本文将从实际开发角度出发,带你深入了解MongoDB的核心概念,并通过丰富的代码示例展示在真实项目中如何优雅地使用MongoDB。

一、为什么选择MongoDB?

1.1 传统关系型数据库的痛点

在开始之前,我们先回顾一下传统SQL数据库的几个典型问题:

  • Schema僵化:修改表结构需要昂贵的ALTER TABLE操作
  • 对象-关系阻抗:应用层的对象需要映射为二维表结构
  • 扩展困难:分布式部署复杂,主从复制存在单点问题

1.2 MongoDB的解决方案

MongoDB使用文档模型存储数据,与应用程序中的对象模型天然对应:

{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "username": "zhang_san",
  "email": "zhangsan@example.com",
  "profile": {
    "age": 28,
    "city": "北京",
    "hobbies": ["阅读", "游泳"]
  },
  "orders": [
    {
      "order_id": "10001",
      "amount": 299.00,
      "status": "completed"
    }
  ],
  "created_at": ISODate("2024-01-15T08:30:00Z")
}

1.3 MongoDB适用场景

场景类型 是否推荐 说明
内容管理系统 ✅ 推荐 Schema灵活,适应多变的内容结构
实时数据分析 ✅ 推荐 聚合管道强大,支持复杂计算
物联网数据 ✅ 推荐 时序数据+TTL自动过期
用户画像/配置 ✅ 推荐 嵌入式文档天然适合个性化属性
电商订单 ⚠️ 谨慎 需要事务强一致性时考虑
金融核心账务 ❌ 不推荐 事务要求极高,SQL更合适

二、快速上手

2.1 安装与启动

使用Docker(推荐):

# 拉取镜像并启动
docker run -d \
  --name mongodb \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=secret \
  mongo:7.0

# 进入容器
docker exec -it mongodb mongosh -u admin -p secret

macOS:

brew tap mongodb/brew
brew install mongodb-community@7.0
brew services start mongodb-community

2.2 基本概念对比

SQL概念 MongoDB概念 说明
Database Database 数据库
Table Collection 集合(相当于表)
Row Document 文档(相当于行)
Column Field 字段(相当于列)
Index Index 索引
JOIN $lookup 关联查询
GROUP BY Aggregation Pipeline 聚合

三、核心操作实战

3.1 基础CRUD

// 连接到数据库
use myblog;

// ========== CREATE ==========
// 插入单条文档
db.users.insertOne({
    name: "张三",
    email: "zhang@example.com",
    age: 25,
    tags: ["developer", "gopher"],
    created_at: new Date()
});

// 批量插入
db.users.insertMany([
    { name: "李四", email: "li@example.com", age: 30 },
    { name: "王五", email: "wang@example.com", age: 28 },
    { name: "赵六", email: "zhao@example.com", age: 35 }
]);

// ========== READ ==========
// 查询所有
db.users.find();

// 条件查询
db.users.find({ age: { $gt: 28 } });

// 投影(只返回特定字段)
db.users.find({}, { name: 1, email: 1, _id: 0 });

// 复杂条件
db.users.find({
    $and: [
        { age: { $gte: 25 } },
        { age: { $lte: 30 } },
        { tags: "gopher" }
    ]
});

// ========== UPDATE ==========
// 更新单个字段
db.users.updateOne(
    { name: "张三" },
    { $set: { age: 26 } }
);

// 增加数值
db.users.updateOne(
    { name: "张三" },
    { $inc: { login_count: 1 } }
);

// 添加数组元素
db.users.updateOne(
    { name: "张三" },
    { $push: { tags: "mongodb" } }
);

// 更新或插入(upsert)
db.users.updateOne(
    { email: "newuser@example.com" },
    { $set: { name: "新用户", age: 20 } },
    { upsert: true }
);

// ========== DELETE ==========
// 删除单条
db.users.deleteOne({ name: "赵六" });

// 删除多条
db.users.deleteMany({ age: { $lt: 18 } });

// 清空集合
db.users.deleteMany({});

3.2 高级查询操作符

// 比较操作符
db.products.find({ price: { $gt: 100, $lt: 500 } });  // 100 < price < 500
db.products.find({ status: { $in: ["active", "pending"] } });
db.products.find({ stock: { $nin: [0, null] } });

// 逻辑操作符
db.products.find({
    $or: [
        { category: "electronics" },
        { price: { $lt: 50 } }
    ]
});

// 数组操作符
db.posts.find({ tags: { $all: ["mongodb", "nodejs"] } });      // 包含所有标签
db.posts.find({ likes: { $size: 10 } });                        // 长度为10
db.posts.find({ "comments.0.author": "admin" });               // 第一个评论的作者

// 正则表达式
db.users.find({ name: { $regex: /^张/, $options: "i" } });     // 以"张"开头

// 元素操作符
db.users.find({ email: { $exists: true } });                    // 存在email字段
db.users.find({ phone: { $type: "string" } });                  // phone字段是字符串

3.3 索引优化

// 创建单字段索引
db.users.createIndex({ email: 1 });  // 1:升序, -1:降序

// 创建唯一索引
db.users.createIndex({ email: 1 }, { unique: true });

// 创建复合索引
db.orders.createIndex({ user_id: 1, created_at: -1 });

// 创建文本索引(全文搜索)
db.articles.createIndex({ title: "text", content: "text" });

// 查看索引
db.users.getIndexes();

// 删除索引
db.users.dropIndex("email_1");

// 索引使用分析
db.orders.find({ user_id: "123" }).explain("executionStats");

3.4 聚合管道

聚合管道是MongoDB最强大的功能之一,类似于SQL中的GROUP BY + 多表JOIN + 窗口函数。

// 基础聚合:统计每个年龄段的用户数
db.users.aggregate([
    { $match: { age: { $exists: true } } },           // 过滤
    { $group: { _id: "$age", count: { $sum: 1 } } },  // 分组
    { $sort: { _id: 1 } }                              // 排序
]);

// 复杂聚合:订单统计分析
db.orders.aggregate([
    // 阶段1:关联用户信息
    {
        $lookup: {
            from: "users",
            localField: "user_id",
            foreignField: "_id",
            as: "user_info"
        }
    },
    // 阶段2:展开数组
    { $unwind: "$user_info" },
    // 阶段3:过滤最近30天的订单
    {
        $match: {
            created_at: { $gte: new Date(new Date() - 30*24*60*60*1000) }
        }
    },
    // 阶段4:按城市分组统计
    {
        $group: {
            _id: "$user_info.profile.city",
            total_amount: { $sum: "$amount" },
            avg_amount: { $avg: "$amount" },
            order_count: { $sum: 1 },
            unique_users: { $addToSet: "$user_id" }
        }
    },
    // 阶段5:计算用户数
    {
        $project: {
            city: "$_id",
            total_amount: 1,
            avg_amount: { $round: ["$avg_amount", 2] },
            order_count: 1,
            user_count: { $size: "$unique_users" }
        }
    },
    // 阶段6:按总金额降序
    { $sort: { total_amount: -1 } },
    // 阶段7:分页
    { $skip: 0 },
    { $limit: 10 }
]);

// 实战案例:文章热力图(按小时统计)
db.articles.aggregate([
    {
        $group: {
            _id: { $hour: "$created_at" },
            article_count: { $sum: 1 },
            avg_views: { $avg: "$views" }
        }
    },
    { $sort: { _id: 1 } }
]);

四、Go语言实践

4.1 安装驱动

go get go.mongodb.org/mongo-driver/mongo

4.2 连接与配置

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
)

type User struct {
    ID        primitive.ObjectID `bson:"_id,omitempty"`
    Name      string             `bson:"name"`
    Email     string             `bson:"email"`
    Age       int                `bson:"age"`
    Tags      []string           `bson:"tags,omitempty"`
    CreatedAt time.Time          `bson:"created_at"`
}

func main() {
    // 设置客户端选项
    clientOptions := options.Client().
        ApplyURI("mongodb://localhost:27017").
        SetAuth(options.Credential{
            Username: "admin",
            Password: "secret",
        }).
        SetMaxPoolSize(100).                // 最大连接池
        SetMinPoolSize(10).                 // 最小连接池
        SetMaxConnIdleTime(30 * time.Second)

    // 连接MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
        log.Fatal(err)
    }

    // 检查连接
    err = client.Ping(context.TODO(), nil)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Connected to MongoDB!")

    // 获取集合
    db := client.Database("myblog")
    usersCollection := db.Collection("users")

    // 确保连接关闭
    defer func() {
        if err = client.Disconnect(context.TODO()); err != nil {
            log.Fatal(err)
        }
    }()
    
    // ... 后续操作
}

4.3 CRUD实现

// ========== 插入文档 ==========
func insertUser(collection *mongo.Collection, user User) (primitive.ObjectID, error) {
    user.CreatedAt = time.Now()
    result, err := collection.InsertOne(context.TODO(), user)
    if err != nil {
        return primitive.NilObjectID, err
    }
    return result.InsertedID.(primitive.ObjectID), nil
}

// 批量插入
func insertUsers(collection *mongo.Collection, users []interface{}) error {
    _, err := collection.InsertMany(context.TODO(), users)
    return err
}

// ========== 查询文档 ==========
// 查询单个
func findUserByEmail(collection *mongo.Collection, email string) (*User, error) {
    var user User
    filter := bson.M{"email": email}
    err := collection.FindOne(context.TODO(), filter).Decode(&user)
    if err != nil {
        return nil, err
    }
    return &user, nil
}

// 查询多个(带分页)
func findUsersByAge(collection *mongo.Collection, minAge, maxAge int, page, pageSize int64) ([]User, error) {
    filter := bson.M{
        "age": bson.M{
            "$gte": minAge,
            "$lte": maxAge,
        },
    }
    
    // 分页选项
    findOptions := options.Find().
        SetSkip((page - 1) * pageSize).
        SetLimit(pageSize).
        SetSort(bson.D{{Key: "age", Value: -1}}) // 按年龄降序
    
    cursor, err := collection.Find(context.TODO(), filter, findOptions)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(context.TODO())
    
    var users []User
    if err = cursor.All(context.TODO(), &users); err != nil {
        return nil, err
    }
    return users, nil
}

// ========== 更新文档 ==========
// 更新单个字段
func updateUserAge(collection *mongo.Collection, id primitive.ObjectID, newAge int) error {
    filter := bson.M{"_id": id}
    update := bson.M{
        "$set": bson.M{"age": newAge},
        "$inc": bson.M{"update_count": 1},
    }
    _, err := collection.UpdateOne(context.TODO(), filter, update)
    return err
}

// 添加数组元素
func addUserTag(collection *mongo.Collection, email, tag string) error {
    filter := bson.M{"email": email}
    update := bson.M{"$addToSet": bson.M{"tags": tag}} // $addToSet避免重复
    _, err := collection.UpdateOne(context.TODO(), filter, update)
    return err
}

// ========== 删除文档 ==========
func deleteUserByID(collection *mongo.Collection, id primitive.ObjectID) error {
    filter := bson.M{"_id": id}
    _, err := collection.DeleteOne(context.TODO(), filter)
    return err
}

4.4 聚合管道实现

type OrderStats struct {
    City        string  `bson:"city"`
    TotalAmount float64 `bson:"total_amount"`
    AvgAmount   float64 `bson:"avg_amount"`
    OrderCount  int     `bson:"order_count"`
}

func getOrderStatsByCity(ordersCollection *mongo.Collection) ([]OrderStats, error) {
    // 聚合管道阶段
    pipeline := mongo.Pipeline{
        // 匹配阶段
        {{"$match", bson.D{
            {"created_at", bson.D{{"$gte", time.Now().AddDate(0, 0, -30)}}},
        }}},
        // 查找阶段(关联用户)
        {{"$lookup", bson.D{
            {"from", "users"},
            {"localField", "user_id"},
            {"foreignField", "_id"},
            {"as", "user_info"},
        }}},
        // 展开阶段
        {{"$unwind", bson.D{{"path", "$user_info"}}}},
        // 分组阶段
        {{"$group", bson.D{
            {"_id", "$user_info.profile.city"},
            {"total_amount", bson.D{{"$sum", "$amount"}}},
            {"avg_amount", bson.D{{"$avg", "$amount"}}},
            {"order_count", bson.D{{"$sum", 1}}},
        }}},
        // 投影阶段
        {{"$project", bson.D{
            {"city", "$_id"},
            {"total_amount", 1},
            {"avg_amount", bson.D{{"$round", []interface{}{"$avg_amount", 2}}}},
            {"order_count", 1},
            {"_id", 0},
        }}},
        // 排序阶段
        {{"$sort", bson.D{{"total_amount", -1}}}},
    }
    
    cursor, err := ordersCollection.Aggregate(context.TODO(), pipeline)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(context.TODO())
    
    var stats []OrderStats
    if err = cursor.All(context.TODO(), &stats); err != nil {
        return nil, err
    }
    return stats, nil
}

五、事务与数据一致性

MongoDB 4.0+支持多文档事务(类似ACID)。

func transferMoney(session mongo.Session, fromAccount, toAccount string, amount float64) error {
    // 启动事务
    err := session.StartTransaction()
    if err != nil {
        return err
    }
    defer session.EndSession(context.TODO())
    
    // 事务回调
    callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
        accountsColl := session.Client().Database("bank").Collection("accounts")
        
        // 扣减源账户
        _, err := accountsColl.UpdateOne(sessCtx, 
            bson.M{"account_id": fromAccount, "balance": bson.M{"$gte": amount}},
            bson.M{"$inc": bson.M{"balance": -amount}})
        if err != nil {
            return nil, err
        }
        
        // 增加目标账户
        _, err = accountsColl.UpdateOne(sessCtx,
            bson.M{"account_id": toAccount},
            bson.M{"$inc": bson.M{"balance": amount}})
        if err != nil {
            return nil, err
        }
        
        return nil, nil
    }
    
    // 执行事务
    _, err = session.WithTransaction(context.TODO(), callback)
    return err
}

六、性能优化最佳实践

6.1 Schema设计原则

// ❌ 避免:无限增长的数组
{
    "user_id": "123",
    "comments": [  // 可能无限增长,导致文档膨胀
        { "content": "...", "time": "..." },
        // 成千上万条...
    ]
}

// ✅ 推荐:引用设计
// comments集合独立存在
{
    "user_id": "123",
    "comment_count": 1245  // 只存计数
}

6.2 查询优化清单

// 1. 使用投影,减少网络传输
db.users.find({ age: { $gt: 18 } }, { name: 1, email: 1 });

// 2. 避免使用$where(JavaScript执行,慢)
// ❌ 避免
db.users.find({ $where: "this.age > 18" });

// 3. 使用limit限制结果集
db.orders.find().limit(100);

// 4. 批量操作代替循环
// ❌ 避免
for (let i = 0; i < 1000; i++) {
    db.users.updateOne({ _id: i }, { $set: { status: "active" } });
}
// ✅ 推荐
let bulkOps = [];
for (let i = 0; i < 1000; i++) {
    bulkOps.push({
        updateOne: {
            filter: { _id: i },
            update: { $set: { status: "active" } }
        }
    });
}
db.users.bulkWrite(bulkOps);

6.3 监控与诊断

// 查看慢查询日志
db.setProfilingLevel(2);  // 记录所有操作
db.setProfilingLevel(1, { slowms: 100 });  // 记录超过100ms的查询

// 查看集合统计
db.users.stats();

// 查看当前正在执行的操作
db.currentOp();

// 终止慢查询
db.killOp(opid);

七、生产环境注意事项

7.1 安全配置

# mongod.conf
security:
  authorization: enabled  # 启用认证
  enableEncryption: true   # 加密存储

net:
  bindIp: 127.0.0.1       # 只监听本地
  port: 27017
  tls:
    mode: requireTLS       # 启用TLS

setParameter:
  authenticationMechanisms: SCRAM-SHA-256

7.2 备份与恢复

# 备份
mongodump --host localhost --port 27017 \
  --username admin --password secret \
  --out /backup/mongodb-$(date +%Y%m%d)

# 恢复
mongorestore --host localhost --port 27017 \
  --username admin --password secret \
  /backup/mongodb-20240101

# 热备份(需开启oplog)
mongodump --oplog --out /backup/hot-backup

7.3 副本集部署

# docker-compose.yml
version: '3.8'
services:
  mongo-primary:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    ports:
      - "27017:27017"
    volumes:
      - ./data/primary:/data/db

  mongo-secondary:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    volumes:
      - ./data/secondary:/data/db

  mongo-arbiter:
    image: mongo:7.0
    command: mongod --replSet rs0 --bind_ip_all
    volumes:
      - ./data/arbiter:/data/db

初始化副本集:

rs.initiate({
    _id: "rs0",
    members: [
        { _id: 0, host: "mongo-primary:27017" },
        { _id: 1, host: "mongo-secondary:27017" },
        { _id: 2, host: "mongo-arbiter:27017", arbiterOnly: true }
    ]
})

八、常见问题与解决方案

问题 原因 解决方案
查询变慢 索引失效或不当 使用explain分析,创建合适索引
内存占用高 工作集超过内存 增加内存或优化查询模式
写入性能差 磁盘IO瓶颈 使用SSD,调整writeConcern
连接数超限 未正确关闭连接 使用连接池,设置maxPoolSize
文档超16MB 嵌套数组过大 重构Schema,使用引用模式

九、总结

MongoDB作为NoSQL数据库的杰出代表,在灵活性和性能之间取得了良好的平衡。通过本文的学习,你应该已经掌握了:

  1. 核心概念:文档模型、集合、索引、聚合管道
  2. 基础操作:CRUD、查询优化、索引设计
  3. 实战技巧:Go驱动使用、事务处理、性能调优
  4. 生产运维:安全配置、备份恢复、高可用部署

选型建议

  • 选择MongoDB:内容管理、用户画像、日志分析、物联网
  • 谨慎选择:强一致性事务、复杂JOIN查询、固定Schema场景

MongoDB官方有句名言:"The data model is your application model"(数据模型就是你的应用模型)。选择MongoDB,意味着拥抱变化,用灵活的数据模型应对快速迭代的业务需求。

下一步建议

  1. 学习MongoDB Change Streams(变更流),实现实时数据同步
  2. 探索MongoDB Atlas(云托管服务)的全球集群功能
  3. 研究Sharding分片集群,应对海量数据场景