引言

随着全球数字化进程的加速,电子签证(e-Visa)系统已成为各国出入境管理的重要组成部分。一个稳定、安全、高效的支付系统是电子签证平台的核心组件。本文将深入探讨电子签证支付系统的代码开发流程、架构设计、安全实践以及常见问题的解决方案,为开发者提供全面的指导。

1. 系统架构设计

1.1 整体架构概述

电子签证支付系统通常采用微服务架构,以确保高可用性和可扩展性。主要组件包括:

  • 前端界面:用户提交签证申请和支付
  • 支付网关:处理支付请求和回调
  • 签证处理服务:管理申请状态和审批流程
  • 数据库:存储用户数据、申请记录和交易信息
  • 通知服务:发送支付确认和签证状态更新

1.2 技术栈选择

# 示例:Python Flask + PostgreSQL + Stripe 支付网关
from flask import Flask, request, jsonify
import stripe
import psycopg2
from datetime import datetime

app = Flask(__name__)

# 配置支付网关
stripe.api_key = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"

# 数据库连接
def get_db_connection():
    conn = psycopg2.connect(
        host="localhost",
        database="evisa",
        user="postgres",
        password="password"
    )
    return conn

1.3 数据库设计

-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    full_name VARCHAR(255) NOT NULL,
    passport_number VARCHAR(50) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 签证申请表
CREATE TABLE visa_applications (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    application_type VARCHAR(50) NOT NULL,
    entry_date DATE NOT NULL,
    exit_date DATE NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 支付交易表
CREATE TABLE payments (
    id SERIAL PRIMARY KEY,
    application_id INTEGER REFERENCES visa_applications(id),
    amount DECIMAL(10, 2) NOT NULL,
    currency VARCHAR(3) DEFAULT 'USD',
    payment_method VARCHAR(50),
    stripe_payment_intent_id VARCHAR(100),
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

2. 支付流程实现

2.1 支付请求处理

@app.route('/api/payment/create', methods=['POST'])
def create_payment():
    try:
        data = request.get_json()
        
        # 验证输入数据
        required_fields = ['application_id', 'amount', 'currency', 'payment_method']
        for field in required_fields:
            if field not in data:
                return jsonify({'error': f'Missing required field: {field}'}), 400
        
        # 检查应用状态
        conn = get_db_connection()
        cur = conn.cursor()
        cur.execute(
            "SELECT status FROM visa_applications WHERE id = %s",
            (data['application_id'],)
        )
        application_status = cur.fetchone()
        
        if not application_status or application_status[0] != 'pending':
            return jsonify({'error': 'Invalid application status'}), 400
        
        # 创建支付意图
        payment_intent = stripe.PaymentIntent.create(
            amount=int(data['amount'] * 100),  # Stripe使用美分
            currency=data['currency'],
            payment_method=data['payment_method'],
            confirmation_method='manual',
            confirm=True,
            metadata={
                'application_id': data['application_id'],
                'user_id': data.get('user_id')
            }
        )
        
        # 保存支付记录
        cur.execute(
            """INSERT INTO payments 
               (application_id, amount, currency, payment_method, stripe_payment_intent_id, status) 
               VALUES (%s, %s, %s, %s, %s, %s) 
               RETURNING id""",
            (data['application_id'], data['amount'], data['currency'], 
             data['payment_method'], payment_intent.id, 'pending')
        )
        payment_id = cur.fetchone()[0]
        
        conn.commit()
        cur.close()
        conn.close()
        
        return jsonify({
            'payment_id': payment_id,
            'client_secret': payment_intent.client_secret,
            'status': 'created'
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

2.2 支付回调处理

@app.route('/api/payment/webhook', methods=['POST'])
def payment_webhook():
    payload = request.data
    sig_header = request.headers.get('Stripe-Signature')
    
    try:
        # 验证Webhook签名
        event = stripe.Webhook.construct_event(
            payload, sig_header, 'whsec_your_webhook_secret'
        )
        
        # 处理支付成功事件
        if event['type'] == 'payment_intent.succeeded':
            payment_intent = event['data']['object']
            
            # 更新支付状态
            conn = get_db_connection()
            cur = conn.cursor()
            
            cur.execute(
                """UPDATE payments 
                   SET status = 'completed', updated_at = NOW() 
                   WHERE stripe_payment_intent_id = %s""",
                (payment_intent.id,)
            )
            
            # 更新签证申请状态
            cur.execute(
                """UPDATE visa_applications 
                   SET status = 'paid' 
                   WHERE id = (SELECT application_id FROM payments 
                               WHERE stripe_payment_intent_id = %s)""",
                (payment_intent.id,)
            )
            
            # 发送确认邮件
            send_confirmation_email(payment_intent.metadata.get('user_id'))
            
            conn.commit()
            cur.close()
            conn.close()
            
        # 处理支付失败事件
        elif event['type'] == 'payment_intent.payment_failed':
            payment_intent = event['data']['object']
            
            conn = get_db_connection()
            cur = conn.cursor()
            
            cur.execute(
                """UPDATE payments 
                   SET status = 'failed', updated_at = NOW() 
                   WHERE stripe_payment_intent_id = %s""",
                (payment_intent.id,)
            )
            
            conn.commit()
            cur.close()
            conn.close()
            
        return jsonify({'status': 'success'})
        
    except stripe.error.SignatureVerificationError as e:
        return jsonify({'error': 'Invalid signature'}), 400
    except Exception as e:
        return jsonify({'error': str(e)}), 500

3. 安全最佳实践

3.1 数据加密

from cryptography.fernet import Fernet
import os

# 生成密钥(生产环境应使用安全的密钥管理服务)
def generate_key():
    return Fernet.generate_key()

# 加密敏感数据
def encrypt_data(data, key):
    f = Fernet(key)
    return f.encrypt(data.encode()).decode()

# 解密数据
def decrypt_data(encrypted_data, key):
    f = Fernet(key)
    return f.decrypt(encrypted_data.encode()).decode()

# 示例:加密护照号码
passport_key = os.environ.get('PASSPORT_ENCRYPTION_KEY')
encrypted_passport = encrypt_data("AB1234567", passport_key)
print(f"加密后的护照号码: {encrypted_passport}")

3.2 输入验证和SQL注入防护

import re
from flask import abort

def validate_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(pattern, email):
        abort(400, description="Invalid email format")

def validate_passport_number(passport):
    # 护照号码格式验证(示例:9位数字)
    if not re.match(r'^\d{9}$', passport):
        abort(400, description="Invalid passport number format")

# 使用参数化查询防止SQL注入
def get_user_by_email(email):
    conn = get_db_connection()
    cur = conn.cursor()
    # 正确做法:使用参数化查询
    cur.execute("SELECT * FROM users WHERE email = %s", (email,))
    user = cur.fetchone()
    cur.close()
    conn.close()
    return user

# 错误做法(避免):
# cur.execute(f"SELECT * FROM users WHERE email = '{email}'")  # SQL注入风险!

3.3 API安全

from functools import wraps
import jwt
from datetime import datetime, timedelta

# JWT令牌验证装饰器
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': 'Token is missing'}), 401
        
        try:
            # 移除"Bearer "前缀
            token = token.split(' ')[1]
            payload = jwt.decode(token, 'your-secret-key', algorithms=['HS256'])
            current_user = payload['user_id']
        except jwt.ExpiredSignatureError:
            return jsonify({'error': 'Token has expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401
        
        return f(current_user, *args, **kwargs)
    
    return decorated

# 生成JWT令牌
def generate_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.utcnow() + timedelta(hours=24),
        'iat': datetime.utcnow()
    }
    return jwt.encode(payload, 'your-secret-key', algorithm='HS256')

# 使用示例
@app.route('/api/protected', methods=['GET'])
@token_required
def protected_route(current_user):
    return jsonify({'message': f'Hello user {current_user}'})

4. 常见问题解决方案

4.1 支付失败问题

问题描述:用户支付时出现”支付失败”错误。

解决方案

  1. 检查支付网关配置
def check_stripe_configuration():
    try:
        # 测试Stripe连接
        account = stripe.Account.retrieve()
        print(f"Stripe账户状态: {account.business_profile.get('support_email')}")
        
        # 检查API密钥
        if not stripe.api_key:
            raise ValueError("Stripe API key not configured")
            
        return True
    except Exception as e:
        print(f"Stripe配置错误: {e}")
        return False
  1. 处理支付失败重试逻辑
def handle_payment_failure(payment_id, max_retries=3):
    conn = get_db_connection()
    cur = conn.cursor()
    
    # 获取支付信息
    cur.execute(
        "SELECT stripe_payment_intent_id, status FROM payments WHERE id = %s",
        (payment_id,)
    )
    payment = cur.fetchone()
    
    if not payment:
        return False
    
    stripe_intent_id, status = payment
    
    # 检查是否可以重试
    if status == 'failed' and payment.get('retry_count', 0) < max_retries:
        # 更新重试计数
        cur.execute(
            """UPDATE payments 
               SET retry_count = COALESCE(retry_count, 0) + 1, 
                   updated_at = NOW() 
               WHERE id = %s""",
            (payment_id,)
        )
        
        # 重新创建支付意图
        try:
            payment_intent = stripe.PaymentIntent.retrieve(stripe_intent_id)
            new_intent = stripe.PaymentIntent.create(
                amount=payment_intent.amount,
                currency=payment_intent.currency,
                payment_method=payment_intent.payment_method,
                metadata=payment_intent.metadata
            )
            
            # 更新支付记录
            cur.execute(
                """UPDATE payments 
                   SET stripe_payment_intent_id = %s, 
                       status = 'pending', 
                       updated_at = NOW() 
                   WHERE id = %s""",
                (new_intent.id, payment_id)
            )
            
            conn.commit()
            return True
            
        except Exception as e:
            print(f"重试支付失败: {e}")
            return False
    
    cur.close()
    conn.close()
    return False

4.2 并发支付问题

问题描述:多个支付请求同时处理同一申请,导致重复支付。

解决方案:使用数据库锁或乐观锁。

import threading
from contextlib import contextmanager

# 使用数据库行级锁
@contextmanager
def application_lock(application_id):
    conn = get_db_connection()
    cur = conn.cursor()
    
    try:
        # 获取排他锁
        cur.execute(
            "SELECT * FROM visa_applications WHERE id = %s FOR UPDATE",
            (application_id,)
        )
        yield cur
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cur.close()
        conn.close()

# 在支付处理中使用锁
def process_payment_with_lock(application_id, amount):
    with application_lock(application_id):
        # 检查应用状态
        cur = get_db_connection().cursor()
        cur.execute(
            "SELECT status FROM visa_applications WHERE id = %s",
            (application_id,)
        )
        status = cur.fetchone()[0]
        
        if status != 'pending':
            raise Exception("Application is not in pending state")
        
        # 创建支付
        payment_intent = stripe.PaymentIntent.create(
            amount=int(amount * 100),
            currency='USD'
        )
        
        # 更新状态
        cur.execute(
            """UPDATE visa_applications 
               SET status = 'processing' 
               WHERE id = %s""",
            (application_id,)
        )
        
        return payment_intent

4.3 支付网关回调延迟

问题描述:支付成功后,系统状态更新不及时。

解决方案:实现轮询机制和状态同步。

import time
from threading import Thread

def poll_payment_status(payment_id, max_wait=300):
    """轮询支付状态,最长等待5分钟"""
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        try:
            # 查询支付状态
            conn = get_db_connection()
            cur = conn.cursor()
            cur.execute(
                "SELECT status FROM payments WHERE id = %s",
                (payment_id,)
            )
            status = cur.fetchone()[0]
            
            if status == 'completed':
                return True
            elif status == 'failed':
                return False
            
            # 等待2秒后重试
            time.sleep(2)
            
        except Exception as e:
            print(f"轮询错误: {e}")
            time.sleep(5)
    
    return False

# 异步轮询
def async_poll_payment(payment_id):
    thread = Thread(target=poll_payment_status, args=(payment_id,))
    thread.start()
    return thread

# Webhook + 轮询混合方案
@app.route('/api/payment/check/<int:payment_id>', methods=['GET'])
def check_payment_status(payment_id):
    # 首先检查数据库状态
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute(
        "SELECT status, stripe_payment_intent_id FROM payments WHERE id = %s",
        (payment_id,)
    )
    result = cur.fetchone()
    
    if not result:
        return jsonify({'error': 'Payment not found'}), 404
    
    status, stripe_id = result
    
    # 如果状态是pending,尝试从Stripe获取最新状态
    if status == 'pending' and stripe_id:
        try:
            intent = stripe.PaymentIntent.retrieve(stripe_id)
            if intent.status == 'succeeded':
                # 更新数据库
                cur.execute(
                    """UPDATE payments 
                       SET status = 'completed', updated_at = NOW() 
                       WHERE id = %s""",
                    (payment_id,)
                )
                cur.execute(
                    """UPDATE visa_applications 
                       SET status = 'paid' 
                       WHERE id = (SELECT application_id FROM payments WHERE id = %s)""",
                    (payment_id,)
                )
                conn.commit()
                status = 'completed'
        except Exception as e:
            print(f"从Stripe获取状态失败: {e}")
    
    cur.close()
    conn.close()
    
    return jsonify({
        'payment_id': payment_id,
        'status': status
    })

4.4 货币和汇率问题

问题描述:多币种支付时的汇率计算和显示问题。

解决方案:实现汇率缓存和实时更新。

import requests
from datetime import datetime, timedelta
import json

class CurrencyConverter:
    def __init__(self):
        self.cache = {}
        self.cache_expiry = {}  # 缓存过期时间
        self.base_currency = 'USD'
        
    def get_exchange_rate(self, from_currency, to_currency):
        """获取汇率,带缓存机制"""
        cache_key = f"{from_currency}_{to_currency}"
        
        # 检查缓存是否有效
        if cache_key in self.cache and cache_key in self.cache_expiry:
            if datetime.now() < self.cache_expiry[cache_key]:
                return self.cache[cache_key]
        
        # 从API获取最新汇率
        try:
            # 使用免费汇率API(示例)
            response = requests.get(
                f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
            )
            data = response.json()
            
            if to_currency in data['rates']:
                rate = data['rates'][to_currency]
                
                # 更新缓存(缓存1小时)
                self.cache[cache_key] = rate
                self.cache_expiry[cache_key] = datetime.now() + timedelta(hours=1)
                
                return rate
            else:
                raise ValueError(f"Currency {to_currency} not supported")
                
        except Exception as e:
            print(f"获取汇率失败: {e}")
            # 返回缓存的旧值或默认值
            return self.cache.get(cache_key, 1.0)
    
    def convert_amount(self, amount, from_currency, to_currency):
        """转换金额"""
        if from_currency == to_currency:
            return amount
        
        rate = self.get_exchange_rate(from_currency, to_currency)
        return amount * rate

# 使用示例
converter = CurrencyConverter()
usd_amount = 100
eur_amount = converter.convert_amount(usd_amount, 'USD', 'EUR')
print(f"{usd_amount} USD = {eur_amount:.2f} EUR")

# 在支付系统中集成
@app.route('/api/payment/convert', methods=['POST'])
def convert_currency():
    data = request.get_json()
    amount = data.get('amount')
    from_curr = data.get('from_currency', 'USD')
    to_curr = data.get('to_currency', 'USD')
    
    converter = CurrencyConverter()
    converted = converter.convert_amount(amount, from_curr, to_curr)
    
    return jsonify({
        'original_amount': amount,
        'original_currency': from_curr,
        'converted_amount': round(converted, 2),
        'converted_currency': to_curr,
        'exchange_rate': converter.get_exchange_rate(from_curr, to_curr)
    })

4.5 高并发下的性能问题

问题描述:在签证申请高峰期,系统响应缓慢。

解决方案:实现缓存和异步处理。

import redis
from rq import Queue
from rq.job import Job
import json

# Redis缓存配置
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
cache = redis.Redis(host='localhost', port=6379, db=1)

# 异步任务队列
queue = Queue(connection=redis_conn)

# 缓存装饰器
def cache_response(timeout=300):
    def decorator(func):
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
            
            # 尝试从缓存获取
            cached = cache.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # 执行函数
            result = func(*args, **kwargs)
            
            # 存入缓存
            cache.setex(cache_key, timeout, json.dumps(result))
            
            return result
        return wrapper
    return decorator

# 异步任务处理
def process_visa_application_async(application_id):
    """异步处理签证申请"""
    # 模拟耗时操作
    import time
    time.sleep(5)  # 模拟处理时间
    
    # 更新数据库
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute(
        """UPDATE visa_applications 
           SET status = 'processed' 
           WHERE id = %s""",
        (application_id,)
    )
    conn.commit()
    cur.close()
    conn.close()
    
    return f"Application {application_id} processed"

# 在支付成功后触发异步处理
@app.route('/api/payment/success/<int:payment_id>', methods=['POST'])
def handle_payment_success(payment_id):
    # 更新支付状态
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute(
        """UPDATE payments 
           SET status = 'completed' 
           WHERE id = %s 
           RETURNING application_id""",
        (payment_id,)
    )
    result = cur.fetchone()
    
    if result:
        application_id = result[0]
        
        # 异步处理签证申请
        job = queue.enqueue(process_visa_application_async, application_id)
        
        # 更新签证状态为processing
        cur.execute(
            """UPDATE visa_applications 
               SET status = 'processing' 
               WHERE id = %s""",
            (application_id,)
        )
        
        conn.commit()
        
        return jsonify({
            'message': 'Payment successful, application is being processed',
            'job_id': job.id,
            'application_id': application_id
        })
    
    cur.close()
    conn.close()
    
    return jsonify({'error': 'Payment not found'}), 404

5. 测试策略

5.1 单元测试

import unittest
from unittest.mock import patch, MagicMock
from your_app import app, get_db_connection

class TestPaymentSystem(unittest.TestCase):
    def setUp(self):
        self.app = app.test_client()
        self.app.testing = True
        
    @patch('your_app.stripe.PaymentIntent.create')
    @patch('your_app.get_db_connection')
    def test_create_payment_success(self, mock_db, mock_stripe):
        # 模拟数据库连接
        mock_conn = MagicMock()
        mock_cur = MagicMock()
        mock_db.return_value = mock_conn
        mock_conn.cursor.return_value = mock_cur
        mock_cur.fetchone.return_value = ['pending']
        mock_cur.execute.return_value = None
        mock_cur.fetchone.return_value = [123]
        
        # 模拟Stripe调用
        mock_stripe.return_value = MagicMock(
            id='pi_test_123',
            client_secret='secret_test_123'
        )
        
        # 测试API调用
        response = self.app.post('/api/payment/create', json={
            'application_id': 1,
            'amount': 100.00,
            'currency': 'USD',
            'payment_method': 'pm_card_visa'
        })
        
        # 验证响应
        self.assertEqual(response.status_code, 200)
        data = response.get_json()
        self.assertIn('payment_id', data)
        self.assertIn('client_secret', data)
        
    @patch('your_app.stripe.Webhook.construct_event')
    def test_webhook_success(self, mock_webhook):
        # 模拟Webhook事件
        mock_event = {
            'type': 'payment_intent.succeeded',
            'data': {
                'object': {
                    'id': 'pi_test_123',
                    'metadata': {'application_id': '1', 'user_id': '1'}
                }
            }
        }
        mock_webhook.return_value = mock_event
        
        # 模拟数据库更新
        with patch('your_app.get_db_connection') as mock_db:
            mock_conn = MagicMock()
            mock_cur = MagicMock()
            mock_db.return_value = mock_conn
            mock_conn.cursor.return_value = mock_cur
            
            # 测试Webhook
            response = self.app.post('/api/payment/webhook', 
                                   data='test_payload',
                                   headers={'Stripe-Signature': 'test_signature'})
            
            self.assertEqual(response.status_code, 200)
            # 验证数据库更新被调用
            self.assertTrue(mock_cur.execute.called)

if __name__ == '__main__':
    unittest.main()

5.2 集成测试

import pytest
from your_app import app, get_db_connection
import stripe

@pytest.fixture
def client():
    app.config['TESTING'] = True
    with app.test_client() as client:
        yield client

def test_full_payment_flow(client):
    """测试完整支付流程"""
    # 1. 创建用户
    user_response = client.post('/api/users', json={
        'email': 'test@example.com',
        'full_name': 'Test User',
        'passport_number': '123456789'
    })
    user_id = user_response.get_json()['id']
    
    # 2. 创建签证申请
    app_response = client.post('/api/applications', json={
        'user_id': user_id,
        'application_type': 'tourist',
        'entry_date': '2024-01-01',
        'exit_date': '2024-01-15'
    })
    application_id = app_response.get_json()['id']
    
    # 3. 创建支付
    payment_response = client.post('/api/payment/create', json={
        'application_id': application_id,
        'amount': 100.00,
        'currency': 'USD',
        'payment_method': 'pm_card_visa'
    })
    payment_data = payment_response.get_json()
    
    # 4. 模拟支付成功(使用Stripe测试卡)
    stripe.api_key = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
    payment_intent = stripe.PaymentIntent.retrieve(payment_data['client_secret'])
    
    # 使用测试卡完成支付
    stripe.PaymentIntent.confirm(
        payment_intent.id,
        payment_method='pm_card_visa'
    )
    
    # 5. 模拟Webhook回调
    # 在实际测试中,需要模拟Webhook签名
    webhook_response = client.post('/api/payment/webhook', 
                                 data='test_webhook_payload',
                                 headers={'Stripe-Signature': 'test_signature'})
    
    # 6. 验证状态更新
    status_response = client.get(f'/api/applications/{application_id}/status')
    status_data = status_response.get_json()
    
    assert status_data['status'] == 'paid'

6. 部署和监控

6.1 Docker部署配置

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 5000

# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "app:app"]
# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "5000:5000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/evisa
      - STRIPE_API_KEY=${STRIPE_API_KEY}
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs

  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=evisa
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - web

volumes:
  postgres_data:

6.2 监控和日志

import logging
from logging.handlers import RotatingFileHandler
import time
from prometheus_client import Counter, Histogram, generate_latest
from flask import Response

# 配置日志
def setup_logging():
    handler = RotatingFileHandler(
        'app.log', 
        maxBytes=10485760,  # 10MB
        backupCount=5
    )
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    
    logger = logging.getLogger('evisa')
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)
    
    return logger

# Prometheus指标
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests')
REQUEST_LATENCY = Histogram('http_request_latency_seconds', 'Request latency')
PAYMENT_COUNT = Counter('payment_requests_total', 'Total payment requests')
PAYMENT_FAILURES = Counter('payment_failures_total', 'Total payment failures')

# 中间件记录指标
@app.before_request
def before_request():
    request.start_time = time.time()

@app.after_request
def after_request(response):
    REQUEST_COUNT.inc()
    REQUEST_LATENCY.observe(time.time() - request.start_time)
    return response

# 支付相关指标
def record_payment_success():
    PAYMENT_COUNT.inc()

def record_payment_failure():
    PAYMENT_FAILURES.inc()

# 监控端点
@app.route('/metrics')
def metrics():
    return Response(generate_latest(), mimetype='text/plain')

# 错误处理和告警
@app.errorhandler(Exception)
def handle_exception(e):
    logger = setup_logging()
    logger.error(f"Unhandled exception: {str(e)}", exc_info=True)
    
    # 发送告警(示例:Slack通知)
    send_slack_alert(f"Payment system error: {str(e)}")
    
    return jsonify({'error': 'Internal server error'}), 500

def send_slack_alert(message):
    """发送Slack告警"""
    import requests
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    
    if webhook_url:
        payload = {
            'text': f'🚨 eVisa Payment System Alert\n{message}',
            'username': 'eVisa Monitor',
            'icon_emoji': ':warning:'
        }
        
        try:
            requests.post(webhook_url, json=payload)
        except Exception as e:
            print(f"Failed to send Slack alert: {e}")

7. 合规性和数据保护

7.1 GDPR合规

from datetime import datetime, timedelta

class GDPRCompliance:
    def __init__(self):
        self.data_retention_days = 365  # 数据保留期限
        
    def anonymize_user_data(self, user_id):
        """匿名化用户数据"""
        conn = get_db_connection()
        cur = conn.cursor()
        
        # 获取用户数据
        cur.execute(
            "SELECT email, full_name, passport_number FROM users WHERE id = %s",
            (user_id,)
        )
        user_data = cur.fetchone()
        
        if user_data:
            # 生成匿名标识符
            import hashlib
            anonymized_id = hashlib.sha256(str(user_id).encode()).hexdigest()[:16]
            
            # 更新用户记录
            cur.execute(
                """UPDATE users 
                   SET email = %s, 
                       full_name = 'Anonymized', 
                       passport_number = 'Anonymized',
                       anonymized_at = NOW() 
                   WHERE id = %s""",
                (f"anon_{anonymized_id}@anonymized.com", user_id)
            )
            
            # 更新相关记录
            cur.execute(
                """UPDATE visa_applications 
                   SET user_id = NULL 
                   WHERE user_id = %s""",
                (user_id,)
            )
            
            conn.commit()
            
        cur.close()
        conn.close()
    
    def schedule_data_retention(self):
        """定时清理过期数据"""
        conn = get_db_connection()
        cur = conn.cursor()
        
        # 查找超过保留期限的数据
        cutoff_date = datetime.now() - timedelta(days=self.data_retention_days)
        
        # 匿名化旧用户
        cur.execute(
            """SELECT id FROM users 
               WHERE created_at < %s 
               AND anonymized_at IS NULL""",
            (cutoff_date,)
        )
        
        users_to_anonymize = cur.fetchall()
        
        for user_id in users_to_anonymize:
            self.anonymize_user_data(user_id[0])
        
        # 删除已匿名化的旧记录
        cur.execute(
            """DELETE FROM visa_applications 
               WHERE created_at < %s 
               AND status IN ('rejected', 'expired')""",
            (cutoff_date,)
        )
        
        conn.commit()
        cur.close()
        conn.close()

7.2 PCI DSS合规

class PCIDSSCompliance:
    def __init__(self):
        self.sensitive_fields = ['credit_card_number', 'cvv', 'expiry_date']
        
    def mask_sensitive_data(self, data):
        """掩码敏感数据"""
        masked = data.copy()
        
        for field in self.sensitive_fields:
            if field in masked:
                # 保留前6位和后4位,中间用*代替
                value = str(masked[field])
                if len(value) >= 10:
                    masked[field] = value[:6] + '*' * (len(value) - 10) + value[-4:]
                else:
                    masked[field] = '*' * len(value)
        
        return masked
    
    def validate_pci_compliance(self, data):
        """验证PCI合规性"""
        errors = []
        
        # 检查是否存储了完整的卡号
        if 'credit_card_number' in data:
            card_number = data['credit_card_number']
            if len(card_number) >= 13 and card_number.isdigit():
                errors.append("Full credit card number should not be stored")
        
        # 检查CVV存储
        if 'cvv' in data:
            errors.append("CVV should never be stored")
        
        # 检查加密
        if 'encrypted_data' not in data:
            errors.append("Sensitive data must be encrypted")
        
        return errors

8. 总结

电子签证支付系统的开发需要综合考虑技术架构、安全性、性能和合规性。通过本文提供的指南和代码示例,开发者可以:

  1. 构建稳健的系统架构:采用微服务设计,确保可扩展性和可维护性
  2. 实现安全的支付流程:遵循最佳安全实践,防止常见攻击
  3. 处理常见问题:提供详细的解决方案,应对支付失败、并发问题等挑战
  4. 确保合规性:满足GDPR、PCI DSS等法规要求
  5. 实施监控和日志:建立完善的监控体系,及时发现和解决问题

记住,支付系统涉及敏感的金融数据,必须始终将安全性放在首位。定期进行安全审计、代码审查和性能测试,确保系统稳定可靠地运行。

9. 附录:常用工具和资源

  • 支付网关:Stripe, PayPal, Adyen
  • 数据库:PostgreSQL, MySQL
  • 缓存:Redis
  • 消息队列:RabbitMQ, Redis Queue
  • 监控:Prometheus, Grafana, ELK Stack
  • 测试:pytest, unittest, Postman
  • 部署:Docker, Kubernetes, AWS/Azure/GCP

通过遵循本指南,您将能够开发出一个专业、安全、高效的电子签证支付系统,为用户提供流畅的支付体验。