# dsync **Repository Path**: edityworld-shann/dsync ## Basic Information - **Project Name**: dsync - **Description**: 数据同步 多个系统 多个数据库 强业务关联 - **Primary Language**: Python - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-02 - **Last Updated**: 2026-02-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README data_pusher/ ├── config/ │ ├── __init__.py │ ├── databases.py │ └── settings.py ├── extractors/ # 数据抽取(按业务) │ ├── device_info.py │ ├── device_alarm.py │ ├── device_event.py │ ├── device_abnormal.py │ ├── device_maintenance.py │ ├── device_inspection.py │ ├── device_maintenance_plan.py │ ├── device_performance.py │ ├── spare_part_info.py │ ├── spare_part_outbound.py │ └── spare_part_inbound.py ├── transformers/ # 数据转换(对接口规范) │ ├── device_info.py │ ├── device_alarm.py │ └── ...(同上) ├── services/ │ ├── __init__.py │ ├── token_manager.py │ ├── push_service.py │ └── sync_registry.py # ← 新增:同步任务注册中心 ├── tasks/ # ← 新增:每个任务是一个 callable │ ├── __init__.py │ ├── sync_device_info.py │ ├── sync_device_alarm.py │ └── ...(一一对应) ├── utils/ │ ├── __init__.py │ ├── logger.py │ ├── retry.py │ └── last_push_time.py # ← 新增:持久化最后推送时间 ├── test_window.py # ← 测试窗口:交互式测试工具 ├── quick_test.py # ← 快速测试:命令行测试工具 ├── scheduler.py # ← 精简:只负责调度 └── main.py SQLAlchemy==2.0.30 psycopg2-binary==2.9.9 pymssql==2.2.11 pandas==2.2.2 requests==2.32.3 APScheduler==3.10.4 # 设置环境变量(可选) export APP_ID=your_id export APP_SECRET=your_secret export PG_URL=postgresql://... export MSSQL_URL=mssql+... # 启动 # 数据同步服务 (Data Sync Service) 这是一个用于同步企业数据到目标系统的数据同步服务。该服务能够从源数据库提取数据,转换为API所需的格式,并推送到目标系统。 ## 功能特性 - **数据提取**: 从源数据库提取增量数据 - **数据转换**: 将数据转换为API所需格式 - **数据推送**: 将转换后的数据推送到目标API - **任务分类**: 支持按业务类别分类执行任务 - **增量同步**: 支持基于时间戳的增量数据同步 - **错误处理**: 完善的错误处理和日志记录机制 ## 任务分类 系统中的同步任务按照业务领域分为以下几类: ### 1. 设备相关任务 (Device Tasks) - `sync_device_info`: 设备信息同步 - `sync_device_alarm`: 设备报警信息同步 - `sync_device_event`: 设备事件信息同步 - `sync_device_abnormal`: 设备异常上报同步 - `sync_device_performance`: 设备绩效信息同步 ### 2. 维护相关任务 (Maintenance Tasks) - `sync_device_maintenance`: 设备保养信息同步 - `sync_device_inspection`: 设备巡检信息同步 - `sync_device_maintenance_record`: 设备维修信息同步 ### 3. 备件相关任务 (Parts Tasks) - `sync_spare_part_info`: 备件信息同步 - `sync_spare_part_outbound`: 备件出库信息同步 - `sync_spare_part_inbound`: 备件入库信息同步 ### 4. 通用任务 (General Tasks) - `sync_device_maintenance_plan`: 设备维保计划同步 - `sync_safety_risk_info`: 安全风险信息同步 - `sync_safety_accident_info`: 安全事故信息同步 - `sync_hidden_danger_info`: 隐患信息同步 ## 业务覆盖 ### 设备管理 (6.2) 根据接口文档,系统已实现以下设备管理功能: - 6.2.1 设备信息(每日增量) - 6.2.2 设备报警(每日增量) - 6.2.3 设备事件(每日增量) - 6.2.4 设备异常上报(每日增量) - 6.2.5 设备维修(每日增量) - 6.2.6 设备巡检(每日增量) - 6.2.7 设备保养(每日增量) - 6.2.8 设备绩效(每日增量) - 6.2.9 备件信息(每日增量) - 6.2.10 备件出库(每日增量) - 6.2.11 备件入库(每日增量) ### 双重预防 (3.3) 根据接口文档,系统已实现以下双重预防功能: - 3.3.1 安全风险信息(每日增量) - 3.3.2 安全事故信息(每日增量) - 3.3.3 隐患信息(每日增量) ## 架构设计 ### 核心组件 - `scheduler.py`: 任务调度器,支持按类别调度不同任务 - `services/token_manager.py`: 访问令牌管理 - `services/push_service.py`: 数据推送服务 - `services/sync_registry.py`: 同步任务注册表 - `extractors/`: 数据提取器模块 - `transformers/`: 数据转换器模块 - `utils/`: 工具模块 ### 数据流 1. **提取**: 从源数据库提取增量数据 2. **转换**: 将数据转换为API要求的格式 3. **推送**: 通过API将数据推送到目标系统 4. **记录**: 记录最后推送时间以支持增量同步 ## 配置 系统使用以下配置文件: - `config/settings.py`: 基础设置 - `config/databases.py`: 数据库配置 ## 调度策略 不同类别的任务有不同的调度频率: - **设备任务**: 每30分钟执行一次 - **维护任务**: 每小时执行一次 - **备件任务**: 每45分钟执行一次 - **通用任务**: 每2小时执行一次 ## 运行 ```bash cd /path/to/dsync/src python main.py ``` 主程序会启动分类调度器,根据预设的时间间隔执行不同类型的任务。 ## 扩展 要添加新的同步任务: 1. 创建新的提取器 (extractor) 2. 创建新的转换器 (transformer) 3. 创建新的任务文件并使用适当的注册装饰器: - `@register_device_sync_task`: 设备相关任务 - `@register_maintenance_sync_task`: 维护相关任务 - `@register_parts_sync_task`: 备件相关任务 - `@register_sync_task`: 通用任务 4. 在 `main.py` 中导入新任务模块 ## 依赖 - Python 3.7+ - APScheduler - SQLAlchemy - pandas - requests ## 安装 python -m pip install --upgrade pip pip install -r requirements.txt -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple (不需要) Package Version --------------- ----------- numpy 2.4.2 pandas 3.0.0 pip 26.0.1 python-dateutil 2.9.0.post0 six 1.17.0 tzdata 2025.3 ### 直接使用这个安装依赖 python -m pip install pymssql pymysql psycopg2 pandas requests apscheduler ### 从清华镜像安装依赖 python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple pymssql pymysql psycopg2 pandas requests apscheduler python -m pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple pymssql pymysql psycopg2 pandas requests apscheduler sqlalchemy ## 许可证 [请根据实际情况填写许可证信息]