什么是存款移民及其基本原理

存款移民(Deposit Migration)是一种新兴的数据库迁移策略,它通过在源数据库和目标数据库之间建立增量数据同步机制,实现数据的平滑迁移。与传统的全量迁移不同,存款移民的核心思想是”先迁移存量,再同步增量”,就像银行存款一样,先存入本金(历史数据),再持续处理利息(新增数据)。

这种迁移方式特别适用于以下场景:

  • 数据库需要从旧版本升级到新版本
  • 系统需要从单机架构迁移到分布式架构
  • 需要将商业数据库迁移到开源数据库
  • 系统需要在不停机的情况下完成迁移

存款移民的核心优势

1. 降低业务风险

存款移民最大的优势是支持双向同步,可以在迁移过程中随时回滚。如果在迁移过程中发现任何问题,可以立即将业务切回源数据库,而不会丢失数据。

2. 支持灰度发布

可以逐步将业务从源数据库切换到目标数据库,先切换非核心业务,验证稳定后再切换核心业务。

3. 减少停机时间

由于采用增量同步的方式,实际的停机时间只需要在最后切换的瞬间,通常只需要几秒钟。

存款移民的完整实施流程

阶段一:迁移前准备

1. 环境评估

在开始迁移之前,需要对源数据库和目标数据库进行全面评估:

-- 检查源数据库版本
SELECT VERSION();

-- 检查源数据库表结构
SHOW TABLES;

-- 检查源数据库数据量
SELECT 
    table_name,
    table_rows,
    data_length,
    index_length
FROM information_schema.tables
WHERE table_schema = 'your_database';

-- 检查源数据库性能指标
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Queries_per_second';

2. 数据库兼容性检查

确保目标数据库支持源数据库的所有特性:

-- 检查字符集和排序规则
SELECT 
    DEFAULT_CHARACTER_SET_NAME,
    DEFAULT_COLLATION_NAME
FROM information_schema.schemata
WHERE schema_name = 'your_database';

-- 检查存储引擎
SHOW TABLE STATUS WHERE Name = 'your_table';

3. 网络和安全配置

确保源数据库和目标数据库之间的网络连通性:

# 测试网络连通性
ping target_database_host

# 测试端口连通性
telnet target_database_host 3306

# 配置SSL连接(推荐)
mysql --ssl-mode=REQUIRED -h target_host -u user -p

阶段二:初始数据迁移

1. 全量数据导出

使用数据库自带的导出工具导出历史数据:

# MySQL全量导出
mysqldump --single-transaction \
          --master-data=2 \
          --routines \
          --triggers \
          --events \
          -u root \
          -p \
          your_database > full_backup.sql

# PostgreSQL全量导出
pg_dump -h source_host \
        -U username \
        -d your_database \
        -f full_backup.sql \
        --verbose \
        --no-password

2. 全量数据导入

将导出的数据导入到目标数据库:

# MySQL全量导入
mysql -h target_host -u root -p your_database < full_backup.sql

# PostgreSQL全量导入
psql -h target_host -U username -d your_database -f full_backup.sql

3. 验证数据完整性

导入完成后,需要验证数据是否完整:

-- 对比表数量
SELECT COUNT(*) FROM information_schema.tables 
WHERE table_schema = 'your_database';

-- 对比数据行数(抽样)
SELECT 'source' as source, COUNT(*) as row_count FROM source_table
UNION ALL
SELECT 'target' as source, COUNT(*) as row_count FROM target_table;

-- 对比关键统计信息
SELECT 
    COUNT(*) as total_records,
    MAX(updated_at) as last_update,
    SUM(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) as active_records
FROM your_table;

阶段三:增量数据同步

1. 配置主从复制

在源数据库配置主库,在目标数据库配置从库:

-- 在源数据库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'secure_password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 查看源数据库binlog位置
SHOW MASTER STATUS;

2. 配置目标数据库作为从库

-- 在目标数据库执行
CHANGE MASTER TO
    MASTER_HOST='source_host',
    MASTER_USER='repl',
    MASTER_PASSWORD='secure_password',
    MASTER_LOG_FILE='mysql-bin.000001',
    MASTER_LOG_POS=123456;

-- 启动从库复制
START SLAVE;

-- 检查复制状态
SHOW SLAVE STATUS\G

3. 监控同步延迟

-- 创建监控视图
CREATE VIEW replication_lag AS
SELECT 
    Seconds_Behind_Master,
    Slave_IO_Running,
    Slave_SQL_Running
FROM information_schema.processlist
WHERE USER = 'system user';

-- 定期检查延迟
SELECT * FROM replication_lag;

阶段四:业务切换

1. 配置双写模式

在切换前,可以配置应用层同时向源数据库和目标数据库写入数据:

# Python示例:双写模式
import pymysql

class DualWriteDB:
    def __init__(self, source_config, target_config):
        self.source_conn = pymysql.connect(**source_config)
        self.target_conn = pymysql.connect(**target_config)
    
    def execute_write(self, sql, params=None):
        # 同时写入源和目标
        try:
            with self.source_conn.cursor() as source_cursor:
                source_cursor.execute(sql, params)
                self.source_conn.commit()
        except Exception as e:
            print(f"Source write failed: {e}")
        
        try:
            with self.target_conn.cursor() as target_cursor:
                target_cursor.execute(sql, params)
                self.target_conn.commit()
        except Exception as e:
            print(f"Target write failed: {e}")
    
    def close(self):
        self.source_conn.close()
        self.target_conn.close()

# 使用示例
db = DualWriteDB(source_config, target_config)
db.execute_write("INSERT INTO users (name, email) VALUES (%s, %s)", 
                 ("John Doe", "john@example.com"))
db.close()

2. 灰度切换

逐步将读请求切换到目标数据库:

# 配置读写分离
class MigrationRouter:
    def __init__(self, source_db, target_db, read_ratio=0.1):
        self.source_db = source_db
        self.target_db = target_db
        self.read_ratio = read_ratio  # 目标库读取比例
    
    def read_query(self, sql, params=None):
        # 根据比例路由读请求
        import random
        if random.random() < self.read_ratio:
            return self.target_db.execute(sql, params)
        else:
            return self.source_db.execute(sql, params)
    
    def write_query(self, sql, params=None):
        # 写请求同时发往两个库
        self.source_db.execute(sql, params)
        self.target_db.execute(sql,1params)

# 逐步增加目标库读取比例
router = MigrationRouter(source_db, target_db, read_ratio=0.1)
# 验证稳定后
router.read_ratio = 0.5
# 完全切换后
router.read_ratio = 1.0

3. 最终切换

当目标数据库稳定运行后,执行最终切换:

# 1. 暂停应用写入
# 2. 等待同步完成
mysql -h target_host -u root -p -e "SHOW SLAVE STATUS\G" | grep Seconds_Behind_Master

# 3. 将目标数据库设为主库
mysql -h target_host -u root -p -e "STOP SLAVE; RESET SLAVE ALL;"

# 4. 更新应用配置指向目标数据库
# 5. 恢复应用写入

常见风险及规避策略

风险一:数据不一致

问题描述:在迁移过程中,源数据库和目标数据库的数据可能出现不一致。

规避策略

  1. 定期数据校验:建立自动化校验机制
# 数据一致性校验脚本
def verify_data_consistency(source_conn, target_conn, table_name, primary_key):
    # 获取源库数据
    with source_conn.cursor() as source_cursor:
        source_cursor.execute(f"SELECT * FROM {table_name} ORDER BY {primary_key}")
        source_data = source_cursor.fetchall()
    
    # 获取目标库数据
    with target_conn.cursor() as target_cursor:
        target_cursor.execute(f"SELECT * FROM {table_name} ORDER BY {primary_key}")
        target_data = target_cursor.fetchall()
    
    # 对比数据
    if source_data != target_data:
        print(f"数据不一致: {table_name}")
        # 找出差异
        for i, (s, t) in enumerate(zip(source_data, target_data)):
            if s != t:
                print(f"行 {i}: 源={s}, 目标={t}")
        return False
    return True

# 定时执行校验
import schedule
import time

def daily_check():
    tables = ['users', 'orders', 'products']
    for table in tables:
        verify_data_consistency(source_conn, target_conn, table, 'id')

schedule.every().hour.do(daily_check)
while True:
    schedule.run_pending()
    time.sleep(1)
  1. 使用GTID:启用全局事务标识符,确保binlog位置精确对应
-- 在源数据库配置
SET GLOBAL gtid_mode=ON;
SET GLOBAL enforce_gtid_consistency=ON;

-- 在目标数据库配置
CHANGE MASTER TO
    MASTER_HOST='source_host',
    MASTER_USER='repl',
    MASTER_PASSWORD='secure_password',
    MASTER_AUTO_POSITION=1;

风险二:性能下降

问题描述:迁移过程中,源数据库的性能可能受到影响。

规避策略

  1. 控制导出速度:使用pv工具控制导出速度
# 使用pv控制导出速度,避免IO过高
mysqldump --single-transaction your_database | pv -L 10m | gzip > backup.sql.gz
  1. 错峰操作:在业务低峰期执行迁移操作
# 使用cron定时任务在凌晨执行
# crontab -e
0 2 * * * /path/to/migration_script.sh
  1. 监控数据库性能
-- 创建性能监控表
CREATE TABLE migration_performance_log (
    id INT AUTO_INCREMENT PRIMARY KEY,
    log_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    cpu_usage DECIMAL(5,2),
    memory_usage DECIMAL(5,2),
    disk_io BIGINT,
    queries_per_sec INT
);

-- 定期记录性能指标
INSERT INTO migration_performance_log 
SELECT 
    NULL,
    NOW(),
    (SELECT SUM(CPU_USAGE) FROM sys.cpu_usage),
    (SELECT SUM(memory_used) FROM sys.memory_usage),
    (SELECT SUM(io_requests) FROM sys.io_global_by_file_by_bytes),
    (SELECT COUNT(*) FROM information_schema.processlist WHERE COMMAND != 'Sleep');

风险三:同步延迟

问题描述:增量同步可能出现延迟,导致数据不一致。

规避策略

  1. 实时监控延迟
-- 创建延迟监控告警
DELIMITER //
CREATE EVENT check_replication_lag
ON SCHEDULE EVERY 1 MINUTE
DO
BEGIN
    DECLARE lag_seconds INT;
    SELECT Seconds_Behind_Master INTO lag_seconds
    FROM information_schema.processlist
    WHERE USER = 'system user';
    
    IF lag_seconds > 60 THEN
        -- 发送告警(这里可以调用外部API)
        INSERT INTO alert_log (alert_type, message, created_at)
        VALUES ('CRITICAL', CONCAT('Replication lag: ', lag_seconds, ' seconds'), NOW());
    END IF;
END //
DELIMITER ;
  1. 优化同步性能
-- 在目标数据库调整参数
SET GLOBAL slave_parallel_workers = 4;  -- 并行复制
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL binlog_group_commit_sync_delay = 1000;

风险四:业务中断

问题描述:切换过程中可能出现业务中断。

规避策略

  1. 准备回滚方案
#!/bin/bash
# rollback.sh - 紧急回滚脚本

# 1. 停止应用
systemctl stop your_app

# 2. 切换数据库连接
cp /etc/app/config_source.json /etc/app/config.json

# 3. 清理目标库的写入(如果需要)
mysql -h target_host -u root -p -e "RESET SLAVE ALL;"

# 4. 启动应用
systemctl start your_app

echo "回滚完成,已切换回源数据库"
  1. 灰度切换:使用Feature Flag控制切换比例
# 使用Feature Flag控制数据库切换
from featureflags import FeatureFlag

ff = FeatureFlag()

def get_db_connection():
    if ff.is_enabled('use_target_db', user_id=None):
        return connect_to_target_db()
    else:
        return connect_to_source_db()

# 逐步开启
# 第一天:10%流量
# 第三天:50%流量
# 第五天:100%流量

最佳实践建议

1. 制定详细的迁移计划

  • 明确迁移的时间窗口
  • 准备详细的检查清单
  • 分配明确的人员职责

2. 充分的测试

在生产环境迁移前,必须在测试环境进行完整演练:

# 测试环境验证脚本
#!/bin/bash

echo "开始迁移测试..."

# 1. 数据量测试
source_count=$(mysql -h source_test -u root -p -e "SELECT COUNT(*) FROM users" -s -N)
target_count=$(mysql -h target_test -u root -p -e "SELECT COUNT(*) FROM users" -s -N)

if [ "$source_count" -eq "$target_count" ]; then
    echo "✓ 数据量一致"
else
    echo "✗ 数据量不一致: source=$source_count, target=$target_count"
    exit 1
fi

# 2. 数据内容抽样验证
source_sample=$(mysql -h source_test -u root -p -e "SELECT MD5(GROUP_CONCAT(CONCAT(id,name,created_at))) FROM users ORDER BY id LIMIT 1000" -s -N)
target_sample=$(mysql -h target_test -u root -p -e "SELECT MD5(GROUP_CONCAT(CONCAT(id,name,created_at))) FROM users ORDER BY id LIMIT 1000" -s -N)

if [ "$source_sample" = "$target_sample" ]; then
    echo "✓ 数据内容一致"
else
    echo "✗ 数据内容不一致"
    exit 1
fi

echo "测试通过,可以执行生产迁移"

3. 建立回滚机制

确保在任何阶段都可以快速回滚:

# 回滚决策逻辑
def should_rollback(error_count, sync_lag, business_metrics):
    # 如果错误数超过阈值
    if error_count > 100:
        return True
    
    # 如果同步延迟超过5分钟
    if sync_lag > 300:
        return True
    
    # 如果业务指标异常(如订单成功率下降)
    if business_metrics['order_success_rate'] < 0.95:
        return True
    
    return False

4. 文档化所有操作

记录迁移过程中的每一步操作,便于问题追溯:

# 迁移日志模板

## 操作时间
2024-01-15 02:00:00

## 操作内容
执行全量数据导出

## 操作人员
张三

## 操作命令
mysqldump --single-transaction -u root -p your_database > full_backup.sql

## 执行结果
成功,导出数据量:15.2GB,耗时:25分钟

## 异常情况
无

## 后续步骤
等待全量导入完成

总结

存款移民是一种安全、可靠的数据库迁移策略,通过增量同步和灰度切换,可以最大程度地降低迁移风险。成功的关键在于:

  1. 充分准备:详细的环境评估和兼容性检查
  2. 严格测试:在测试环境进行完整演练
  3. 实时监控:建立完善的监控和告警机制
  4. 快速回滚:准备随时可以执行的回滚方案
  5. 逐步切换:采用灰度发布策略,逐步切换流量

通过遵循这些原则和最佳实践,您可以安全地完成数据库迁移,确保业务的连续性和数据的完整性。