1. 方案背景与目标
1.1 为什么需要优化高并发写入?
在高并发场景下(如 注册、登录、提交订单、IoT 设备上报),直接写入数据库会导致:
数据库连接池耗尽(如 MySQL 的 被打满)。事务锁竞争(如行锁、表锁导致请求阻塞)。磁盘 I/O 和 CPU 压力激增(如 InnoDB 日志刷盘、索引维护)。接口响应变慢(P99 延迟飙升,用户体验下降)。
max_connections
目标:通过 Redis 缓冲 + 批量入库 + 最终一致性,将瞬时高并发写入 平滑削峰,提升系统吞吐量(QPS),同时保证数据最终不丢失。
2. 整体架构设计
2.1 核心流程
客户端 提交写入请求(如注册信息)。Gin 接口层:
幂等校验(防止重复提交,如用 Redis SETNX 对 加锁)。快速写入 Redis 队列(如
phone,JSON 格式存储请求数据)。立即返回(代表请求已接收,后续异步处理)。
LPUSH reg_queue
批处理 Worker(独立 Goroutine):
定时/定量触发(如每 100ms 或每 500 条数据)。从 Redis 拉取批量数据(如 )。去重 & 合并(按业务唯一键,如
BRPOP reg_queue 50ms去重)。批量写入 MySQL(使用
phone)。成功后清理 Redis 消息(或标记为已处理)。
INSERT ... ON DUPLICATE KEY UPDATE
结果查询(可选):客户端通过 轮询或回调获取最终状态。
request_id
2.2 关键组件
| 组件 | 作用 | 技术选型 |
|---|---|---|
| Redis | 缓冲队列、幂等去重、结果缓存 | Go-Redis(官方库) |
| MySQL | 最终数据存储 | GORM / database/sql |
| 批处理 Worker | 定时/定量拉取 Redis 数据并入库 | Goroutine + Channel |
| Gin | HTTP 接口层 | Gin Framework |
3. 详细实现(Go + Gin 代码示例)
3.1 1. 幂等校验(防止重复提交)
目标:同一 在 10 秒内 只能提交一次,避免重复写入 Redis 和数据库。
phone
// redis_dedup.go
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
var rdb *redis.Client
func initRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 地址
Password: "", // 无密码
DB: 0, // 默认 DB
})
}
// TryAcquireDedup 尝试获取幂等锁(10 秒过期)
func TryAcquireDedup(ctx context.Context, phone string) bool {
key := fmt.Sprintf("reg:dedup:%s", phone)
// SET key "1" NX EX 10 → 如果 key 不存在则设置,并设置 10 秒过期
ok, err := rdb.SetNX(ctx, key, "1", 10*time.Second).Result()
if err != nil {
fmt.Printf("Redis SETNX 失败: %v
", err)
return false
}
return ok
}
在 Gin 接口层调用:
// main.go
r := gin.Default()
initRedis() // 初始化 Redis
r.POST("/register", func(c *gin.Context) {
phone := c.PostForm("phone")
if phone == "" {
c.JSON(400, gin.H{"error": "phone 不能为空"})
return
}
ctx := context.Background()
if !TryAcquireDedup(ctx, phone) {
c.JSON(200, gin.H{"message": "重复提交,请稍后再试"})
return
}
// 2. 写入 Redis 队列(下一步)
// ...
})
3.2 2. 写入 Redis 缓冲队列
目标:将注册请求存入 Redis List,供批处理 Worker 消费。
// redis_queue.go
package main
import (
"context"
"encoding/json"
"github.com/redis/go-redis/v9"
)
type RegisterRequest struct {
Phone string `json:"phone"`
Password string `json:"password"`
Email string `json:"email,omitempty"`
}
// EnqueueRegister 将注册请求写入 Redis 队列
func EnqueueRegister(ctx context.Context, req RegisterRequest) error {
jsonData, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("JSON 序列化失败: %v", err)
}
// LPUSH reg_queue jsonData → 将数据插入队列头部
err = rdb.LPush(ctx, "reg_queue", jsonData).Err()
if err != nil {
return fmt.Errorf("写入 Redis 队列失败: %v", err)
}
return nil
}
在 Gin 接口层调用:
r.POST("/register", func(c *gin.Context) {
phone := c.PostForm("phone")
password := c.PostForm("password")
email := c.PostForm("email")
req := RegisterRequest{
Phone: phone,
Password: password,
Email: email,
}
ctx := context.Background()
if !TryAcquireDedup(ctx, phone) {
c.JSON(200, gin.H{"message": "重复提交,请稍后再试"})
return
}
// 写入 Redis 队列
err := EnqueueRegister(ctx, req)
if err != nil {
c.JSON(500, gin.H{"error": "系统繁忙,请稍后再试"})
return
}
c.JSON(202, gin.H{"message": "注册请求已接收,正在处理..."})
})
3.3 3. 批处理 Worker(定时/定量拉取并入库)
目标:从 Redis 队列拉取数据,批量写入 MySQL。
// batch_worker.go
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/redis/go-redis/v9"
)
var db *sql.DB
func initDB() {
var err error
// DSN: user:password@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True
dsn := "root:123456@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True"
db, err = sql.Open("mysql", dsn)
if err != nil {
log.Fatalf("MySQL 连接失败: %v", err)
}
db.SetMaxOpenConns(10) // 连接池大小
db.SetMaxIdleConns(5)
}
// BatchWorker 批处理 Worker(定时拉取 Redis 数据并入库)
func BatchWorker(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond) // 每 100ms 触发一次
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
processBatch(ctx)
}
}
}
// processBatch 从 Redis 拉取一批数据并写入 MySQL
func processBatch(ctx context.Context) {
// 1. 从 Redis 队列拉取数据(BRPOP 阻塞式拉取,超时 50ms)
result, err := rdb.BRPop(ctx, 50*time.Millisecond, "reg_queue").Result()
if err != nil {
if err != redis.Nil {
log.Printf("BRPOP 失败: %v", err)
}
return
}
if len(result) < 2 {
return
}
jsonData := result[1]
var req RegisterRequest
err = json.Unmarshal([]byte(jsonData), &req)
if err != nil {
log.Printf("JSON 反序列化失败: %v", err)
return
}
// 2. 批量写入 MySQL(这里简化为单条写入,实际可攒批)
err = insertToMySQL(ctx, req)
if err != nil {
log.Printf("写入 MySQL 失败: %v", err)
// 可选:重试或记录到 DLQ
}
}
// insertToMySQL 写入 MySQL(使用 ON DUPLICATE KEY UPDATE 实现幂等)
func insertToMySQL(ctx context.Context, req RegisterRequest) error {
query := `
INSERT INTO users (phone, password, email, created_at)
VALUES (?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
password = VALUES(password),
email = VALUES(email),
created_at = NOW()
`
_, err := db.ExecContext(ctx, query, req.Phone, req.Password, req.Email)
return err
}
MySQL 表结构:
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
phone VARCHAR(20) UNIQUE NOT NULL, -- 业务唯一键
password VARCHAR(255) NOT NULL,
email VARCHAR(100),
created_at DATETIME NOT NULL
);
启动 Worker:
func main() {
initRedis()
initDB()
ctx := context.Background()
go BatchWorker(ctx) // 启动批处理 Worker
r := gin.Default()
// 注册路由(见 3.2 节)
r.POST("/register", func(c *gin.Context) { /* ... */ })
r.Run(":8080") // 启动 Gin 服务
}
4. 优化与稳定性保障
4.1 避免 Redis 内存雪崩
设置随机 TTL:幂等锁的过期时间设为 (避免同时失效)。限制队列长度:
10±2秒(防止无限堆积)。监控 Redis 内存:通过
LTRIM reg_queue 0 9999观察
INFO memory,设置
used_memory(如
maxmemory-policy)。
allkeys-lru
4.2 批处理优化
双阈值触发:数量阈值(如 500 条) + 时间窗口(如 100ms),优先数量阈值。批量入库:攒够 500 条后,用 一次性写入。失败重试:写入失败的数据重新放回 Redis 队列,或进入 DLQ(死信队列)。
INSERT ... ON DUPLICATE KEY UPDATE
4.3 降级策略
Redis 故障:降级为 同步直写 MySQL(限流),或写入本地文件后续补录。批处理延迟高:自动缩短批处理窗口(如从 100ms → 50ms)。数据库压力大:临时关闭非核心功能(如邮件通知)。
5. 性能对比(Redis 缓冲 vs 直写数据库)
| 指标 | Redis 缓冲 + 批量入库 | 直写数据库 |
|---|---|---|
| 并发能力 | 500~1000 QPS(同机 2C4G) | 100~200 QPS |
| 写入延迟(P95) | 50~100ms(含 Redis + 批处理) | 10~30ms(直接写入) |
| 数据库压力 | 极低(批量写入) | 高(瞬时高并发) |
| 一致性 | 最终一致(幂等 + 重试) | 强一致(事务) |
| 复杂度 | 较高(需维护 Redis + Worker) | 低(直接写入) |
适用场景:
选 Redis 缓冲:高并发写入、允许短时延迟(如注册、日志上报)。选直写数据库:强一致业务、低并发(如支付回调)。
6. 总结
Redis 缓冲 + 批量入库 是高并发写入的终极解决方案,通过 异步化 + 批量合并 显著提升吞吐量,适合 注册、IoT 上报、埋点 等场景。直写数据库 适合 强一致、低并发 业务,但需谨慎控制并发量。Go + Gin 实现 核心代码仅 200 行左右,轻量级且高性能,可直接落地到生产环境。
下一步优化方向:
引入消息队列(如 Kafka):进一步解耦写入和业务逻辑。分库分表:当单表数据量过大时,按 哈希分片。监控告警:对 Redis 队列长度、批处理延迟、MySQL 写入 RT 做实时监控。
phone
按此方案落地,你的系统可轻松支撑万级 QPS 写入! 🚀

