:cn: MyDataHarbor是一个致力于解决异构数据源之间的分布式、高扩展性、高性能、至少一次保证,稳定可靠的数据同步中间件。 帮助用户可靠、快速、稳定的对海量数据进行准实时增量同步或者定时全量同步,轻量简单,主要定位是为实时交易系统服务,亦可用于大数据的数据同步(ETL领域)。
江苏南京
组织介绍


logo

2026-04-03 GitHub-CI Release Maven Central Downloads License 插件列表


📖 简介

MyDataHarbor 是一个分布式、高扩展、高性能的准实时数据同步中间件,致力于解决异构数据源之间的数据同步问题。

🎯 核心定位:为实时交易系统服务,支持海量数据的准实时增量同步和定时全量同步,亦可用于 ETL 领域。

典型应用场景:

  • 🔄 数据库 → 搜索引擎:MySQL → Elasticsearch
  • 🔄 数据库 → 缓存:MySQL → Redis
  • 🔄 消息队列 → 数据库:Kafka → MySQL
  • 🔄 跨部门数据同步:多系统间的数据共享与同步

✨ 核心特性

特性 说明
🚩 分布式设计 基于 ZooKeeper 构建,支持水平扩展,节点分组隔离,负载均衡与故障自动转移
🔌 插件化架构 高度抽象的接口设计,任何数据源/目标都可通过开发插件接入
🛡️ 数据不丢失 引入微事务机制,保障数据至少成功写入一次
📊 可视化监控 集成 JMX,实时查看任务运行状态、吞吐量、错误率等指标
🎨 自由组合 支持从不同插件复用组件,可视化配置 Pipeline 管道
高性能 支持批量提交、ForkJoin 并发处理,摩托变汽车,汽车变高铁
📝 插件自描述 自动识别插件能力,生成友好的可视化配置界面

🏗️ 架构设计

MyDataHarbor 唯一依赖的中间件是 ZooKeeper,共有两个核心组件:

集群架构设计

集群设计

节点任务设计

节点任务设计

系统架构(示意)

                    ┌─────────────────────────────────────────┐
                    │           MyDataHarbor Console          │
                    │  - 任务管理  - 插件管理  - 监控看板      │
                    └───────────────────┬─────────────────────┘

                              ┌─────────▼─────────┐
                              │    ZooKeeper      │
                              │  (配置中心/协调)   │
                              └─────────┬─────────┘

          ┌─────────────────────────────┼─────────────────────────────┐
          │                             │                             │
  ┌───────▼───────┐             ┌───────▼───────┐             ┌───────▼───────┐
  │   Server A    │             │   Server B    │             │   Server C    │
  │  (group: G1)  │             │  (group: G1)  │             │  (group: G2)  │
  │  - Task 1     │             │  - Task 2     │             │  - Task 3     │
  │  - Plugin     │             │  - Plugin     │             │  - Plugin     │
  └───────────────┘             └───────────────┘             └───────────────┘

核心组件

组件 说明
mydataharbor-console 管理控制台,提供任务管理、插件管理、监控看板等 Web 界面
mydataharbor-server 数据同步执行节点,负责运行任务 Pipeline
mydataharbor-core 核心接口与抽象实现,包含数据管道执行引擎
mydataharbor-plugin 插件体系,支持数据源、转换器、写入器等扩展

数据处理流程

┌──────────────┐    ┌───────────────┐    ┌───────────┐    ┌───────────────┐    ┌──────────────┐
│ DataSource   │ →  │ ProtocolData  │ →  │  Checker  │ →  │ DataConverter │ →  │  DataSink    │
│ (数据源)     │    │ (协议转换)    │    │ (数据校验) │    │ (数据转换)    │    │ (数据写入)   │
└──────────────┘    └───────────────┘    └───────────┘    └───────────────┘    └──────────────┘
     ↓                      ↓                                       ↓                    ↓
  MySQL                   JSON                                    POJO              Elasticsearch
  Kafka                  XML                                     Map               Redis
  RabbitMQ               ...                                     ...               ...

🚀 快速开始

环境要求

依赖 版本要求 说明
JDK 1.8+ 推荐使用 OpenJDK 8/11
ZooKeeper 3.4.x+ 必须,唯一外部依赖
Maven 3.6+ 如需源码编译

方式一:下载二进制包

1. 下载安装包

Releases 下载:

  • mydataharbor-console-{version}-bin.tar.gz
  • mydataharbor-server-{version}-bin.tar.gz
# Linux
wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-console-2.0.1-bin.tar.gz
wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-server-2.0.1-bin.tar.gz

# 解压
tar -xzf mydataharbor-console-2.0.1-bin.tar.gz
tar -xzf mydataharbor-server-2.0.1-bin.tar.gz

2. 配置 mydataharbor-console

编辑 config/application.yml

server:
  port: 8080                    # Console 服务端口
zk: 127.0.0.1:2181              # ZooKeeper 地址

3. 配置 mydataharbor-server

编辑 config/system.yml

zk: ["127.0.0.1:2181"]          # ZooKeeper 地址
port: 1299                      # Server 服务端口
group: biz001                   # 节点所属组名
pluginRepository: http://127.0.0.1:8080  # 插件仓库地址(Console)

4. 启动服务

# 启动 Console(管理台)
cd mydataharbor-console
./start.sh

# 启动 Server(数据同步节点)
cd mydataharbor-server
./start.sh

💡 start.sh 支持参数

  • ./start.sh jmx - 开启远程 JMX 监控
  • ./start.sh debug - 开启远程调试
  • ./start.sh status - 查看服务状态
  • ./stop.sh - 停止服务

5. 验证

访问管理控制台:http://127.0.0.1:8080


方式二:源码编译

# 克隆项目
git clone https://github.com/mydataharbor/mydataharbor.git
cd mydataharbor

# 编译打包
mvn clean package -DskipTests

# 产物位置
# mydataharbor-deploy/mydataharbor-console/target/mydataharbor-console-{version}-bin.tar.gz
# mydataharbor-deploy/mydataharbor-server/target/mydataharbor-server-{version}-bin.tar.gz

方式三:Docker 部署(推荐)

# 启动 ZooKeeper
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4

# 启动 Console
docker run -d --name mydataharbor-console \
  -e ZK_HOST=127.0.0.1:2181 \
  -p 8080:8080 \
  mydataharbor/mydataharbor-console:2.0.1

# 启动 Server
docker run -d --name mydataharbor-server \
  -e ZK_HOST=127.0.0.1:2181 \
  -e GROUP=biz001 \
  -e PLUGIN_REPO=http://127.0.0.1:8080 \
  -p 1299:1299 \
  mydataharbor/mydataharbor-server:2.0.1

📦 插件生态

已支持插件

插件名称 类型 说明
MySQL Plugin DataSource MySQL 数据库增量/全量同步
Elasticsearch Plugin DataSink 数据写入 ES 索引
Redis Plugin DataSink 数据写入 Redis
Kafka Plugin DataSource/Sink Kafka 消息消费与生产
Groovy Plugin Converter Groovy 脚本数据转换

完整插件清单:插件市场

开发自定义插件

1. 插件项目结构

myplugin/
├── pom.xml                          # Maven 配置
├── plugin.properties                # 插件描述文件
└── src/main/java/
    └── myplugin/
        ├── MyDataSource.java        # 数据源实现
        ├── MyDataSink.java          # 写入器实现
        ├── MyConverter.java         # 转换器实现
        └── MyPipelineCreator.java   # Pipeline 创建器

2. plugin.properties 配置

# 插件唯一标识
plugin.id=myplugin
# 插件版本
plugin.version=1.0.0
# 插件描述
plugin.desc=我的自定义插件
# 插件作者
plugin.author=yourname
# 依赖其他插件(可选)
plugin.dependencies=mydataharbor-base:1.0.0,mysql-plugin:2.0.0
# 插件依赖类加载器隔离
plugin.requiresIsolation=true

3. 数据源(DataSource)开发

@MyDataHarborMarker(title = "我的数据源")
@Extension
public class MyDataSource implements IDataSource<MyRecord, MySetting> {

    // 数据总数(用于进度展示)
    private long total;

    // 当前进度
    private long now = 0L;

    @Override
    public void init(MySetting setting) {
        // 初始化资源,如数据库连接
        this.total = queryTotalCount();
    }

    @Override
    public String dataSourceType() {
        return "MyDataSource";
    }

    @Override
    public Long total() {
        return total;
    }

    @Override
    public Iterable<MyRecord> poll(MySetting setting) throws TheEndException {
        // 当数据全部拉取完成时抛出 TheEndException
        if (now >= total) {
            throw new TheEndException("数据拉取完成");
        }

        List<MyRecord> batch = new ArrayList<>();
        // 批量拉取数据(建议每批 100-1000 条)
        while (now < total && batch.size() < 100) {
            MyRecord record = fetchData(now);
            batch.add(record);
            now++;
        }
        return batch;
    }

    @Override
    public void commit(MyRecord record, MySetting setting) {
        // 单条提交(记录成功位置,用于断点续传)
    }

    @Override
    public void commit(Iterable<MyRecord> records, MySetting setting) {
        // 批量提交(推荐实现此方法以提升性能)
        for (MyRecord record : records) {
            commit(record, setting);
        }
    }

    @Override
    public void rollback(MyRecord record, MySetting setting) {
        // 单条回滚(处理失败记录)
    }

    @Override
    public void close() throws IOException {
        // 释放资源
    }
}

4. 写入器(DataSink)开发

@MyDataHarborMarker(title = "我的写入器")
@Extension
@Slf4j
public class MyDataSink implements IDataSink<MyRecord, MySetting> {

    @Override
    public String name() {
        return "MyDataSink";
    }

    @Override
    public WriterResult write(MyRecord record, MySetting setting) throws ResetException {
        // 单条写入
        try {
            // 执行写入操作
            doWrite(record);
            return WriterResult.builder()
                    .commit(true)
                    .success(true)
                    .msg("success")
                    .build();
        } catch (Exception e) {
            log.error("写入失败", e);
            return WriterResult.builder()
                    .commit(false)
                    .success(false)
                    .msg(e.getMessage())
                    .build();
        }
    }

    @Override
    public WriterResult write(List<MyRecord> records, MySetting setting) throws ResetException {
        // 批量写入(推荐实现,性能更高)
        try {
            doBatchWrite(records);
            return WriterResult.builder()
                    .commit(true)
                    .success(true)
                    .msg("batch success")
                    .build();
        } catch (Exception e) {
            log.error("批量写入失败", e);
            return WriterResult.builder()
                    .commit(false)
                    .success(false)
                    .msg(e.getMessage())
                    .build();
        }
    }

    @Override
    public void close() throws IOException {
        // 释放资源
    }
}

5. 转换器(DataConverter)开发

@MyDataHarborMarker(title = "我的转换器")
@Extension
public class MyDataConverter implements IDataConverter<ProtocolData, MyRecord> {

    @Override
    public MyRecord convert(ProtocolData protocolData, BaseSettingContext settingContext) {
        // 将 ProtocolData 转换为业务 Record 对象
        MyRecord record = new MyRecord();
        record.setId(protocolData.getId());
        record.setData(protocolData.getContent());
        return record;
    }
}

6. 校验器(Checker)开发

@MyDataHarborMarker(title = "我的校验器")
@Extension
public class MyChecker implements IChecker<MyRecord> {

    @Override
    public CheckResult check(MyRecord record, BaseSettingContext settingContext) {
        // 数据校验逻辑
        if (record == null || record.getId() == null) {
            return CheckResult.fail("数据不完整");
        }
        return CheckResult.pass();
    }
}

7. Pipeline 创建器开发

@Extension
public class MyPipelineCreator extends AbstractAutoScanPipelineCreator<MyConfig, MySetting> {

    @Override
    public String type() {
        return "我的数据同步任务";
    }

    @Override
    public IDataPipeline createPipeline(MyConfig config, MySetting setting) throws Exception {
        return CommonDataPipeline.builder()
                .dataSource(new MyDataSource())
                .protocolDataConverter(new OriginalProtocolDataConverter())
                .checker(new MyChecker())
                .dataConverter(new MyDataConverter())
                .sink(new MyDataSink())
                .settingContext(setting)
                .build();
    }

    @Override
    public String scanPackage() {
        return "myplugin";
    }

    // 配置类(用于生成可视化配置界面)
    @Data
    public static class MyConfig {
        @MyDataHarborMarker(title = "数据源地址", defaultValue = "localhost:3306")
        private String dataSourceUrl;

        @MyDataHarborMarker(title = "用户名", defaultValue = "root")
        private String username;

        @MyDataHarborMarker(title = "密码", defaultValue = "", secret = true)
        private String password;

        @MyDataHarborMarker(title = "批处理大小", defaultValue = "1000")
        private Integer batchSize;
    }
}

8. 插件依赖声明

plugin.properties 中声明依赖:

# 依赖基础插件(版本 1.0.0 及以上)
plugin.dependencies=mydataharbor-base:1.0.0

# 依赖多个插件
plugin.dependencies=mysql-plugin:2.0.0,elasticsearch-plugin:1.5.0

# 可选依赖(不存在也不影响运行)
plugin.dependencies=mysql-plugin:2.0.0?optional=true

9. 插件打包与发布

# 编译打包
mvn clean package

# 产物位置
target/myplugin-1.0.0.zip

# 发布到插件仓库
# 方式 1:上传到 Console 管理台
curl -X POST http://localhost:8080/mydataharbor/plugin/upload \
  -F "file=@target/myplugin-1.0.0.zip"

# 方式 2:手动放置到插件仓库目录
cp target/myplugin-1.0.0.zip /path/to/plugin-repository/

10. 本地调试

# 方式 1:将插件 JAR 放到 Server 插件目录
cp target/myplugin-1.0.0.zip mydataharbor-server/plugins/
# 重启 Server

# 方式 2:Maven 依赖方式开发
# 在 Server 项目的 pom.xml 中添加:
<dependency>
    <groupId>com.mydataharbor</groupId>
    <artifactId>myplugin</artifactId>
    <version>1.0.0</version>
</dependency>

11. @MyDataHarborMarker 注解属性

属性 说明 示例
title 字段显示名称 @MyDataHarborMarker(title = "数据源地址")
defaultValue 默认值 defaultValue = "localhost:3306"
secret 是否为敏感信息(前端会隐藏显示) secret = true
required 是否必填 required = true
placeholder 输入框占位提示 placeholder = "请输入数据库地址"

💡 使用指南

创建数据同步任务

  1. 访问管理台http://localhost:8080

  2. 创建任务

    • 进入"任务管理" → "创建任务"
    • 选择 Pipeline 类型(如"MySQL 到 Elasticsearch")
    • 填写配置信息
  3. 配置调度策略

    • 增量同步:实时监听数据库变更
    • 全量同步:定时全量刷新(如每天凌晨 2 点)
    • Cron 表达式:0 0 2 * * ?
  4. 启动任务

    • 点击"启动"按钮
    • 选择执行节点组
    • 观察任务状态变为"运行中"

常见任务类型

MySQL → Elasticsearch

pipelineType: mysql-to-elasticsearch
settingContext:
  dataSource:
    jdbcUrl: jdbc:mysql://localhost:3306/mydb
    username: root
    password: "123456"
    tables: ["users", "orders"]
  sink:
    esHosts: ["localhost:9200"]
    indexPrefix: "myapp"
  batchCommit: true
  batchSize: 1000
  parallel: true
  threadNum: 4

Kafka → MySQL

pipelineType: kafka-to-mysql
settingContext:
  kafka:
    bootstrapServers: ["localhost:9092"]
    topic: "user-events"
    groupId: "mysql-sync-group"
  mysql:
    jdbcUrl: jdbc:mysql://localhost:3306/target_db
    username: root
    password: "123456"
    table: "events"
  batchCommit: true
  batchSize: 500

任务运维

查看任务状态

# API 方式
curl http://localhost:8080/mydataharbor/task/list?group=biz001

# JMX 方式
# 连接 JMX 端口 9999,查看 MyDataHarbor MBean

暂停/恢复任务

# 暂停任务
curl -X POST http://localhost:8080/mydataharbor/task/pause?taskId=task-001

# 恢复任务
curl -X POST http://localhost:8080/mydataharbor/task/resume?taskId=task-001

🔧 配置说明

Console 配置 (application.yml)

server:
  port: 8080                        # Web 服务端口
  servlet:
    multipart:
      max-file-size: 500MB          # 插件上传大小限制
      max-request-size: 500MB

zk: 127.0.0.1:2181                  # ZooKeeper 连接地址

spring:
  devtools:
    livereload:
      enabled: true                 # 开发环境热加载

Server 配置 (system.yml)

zk: ["127.0.0.1:2181"]              # ZooKeeper 集群地址
port: 1299                          # RPC 服务端口
group: biz001                       # 节点分组名(同组节点负载均衡)

pluginRepository: http://127.0.0.1:8080  # 插件仓库地址

# 可选:JMX 监控配置
jmx:
  enabled: true
  port: 9999

📊 监控与运维

JMX 监控

启动时开启 JMX:

./start.sh jmx

连接 JMX 后可查看:

  • 任务执行状态(运行中/暂停/已结束)
  • 数据吞吐量(条数/秒)
  • 错误统计
  • 各阶段耗时分析

API 接口

接口 说明
GET /mydataharbor/node/nodeList 获取所有节点列表
GET /mydataharbor/task/listTask 获取任务列表
POST /mydataharbor/task/create 创建同步任务
POST /mydataharbor/plugin/listPlugins 获取插件列表

完整 API 文档:http://localhost:8080/swagger-ui/


🎯 最佳实践

1. 节点分组隔离

将不同业务的任务分配到不同组:

  • group: order-sync - 订单数据同步
  • group: user-sync - 用户数据同步
  • group: log-sync - 日志数据同步

2. 批量提交优化

对于写入型任务,开启批量提交可显著提升性能:

# 任务配置
settingContext:
  batchCommit: true      # 批量提交
  batchSize: 1000        # 每批 1000 条
  parallel: true         # 开启并行处理
  threadNum: 4           # 4 线程并行

3. 故障转移配置

# 任务配置
enableRebalance: true    # 开启故障转移
enableLoadBalance: true  # 开启负载均衡

🤝 参与贡献

我们欢迎以下类型的贡献:

  1. 功能开发:新的数据源/写入器插件
  2. 文档完善:使用教程、最佳实践
  3. Bug 修复:提交 Issue 或 Pull Request
  4. 前端优化:Vue 开发人员改进 UI 体验

开发环境搭建

# 克隆项目
git clone https://github.com/mydataharbor/mydataharbor.git
cd mydataharbor

# 导入 IDE(IntelliJ IDEA 推荐)
# File -> Open -> 选择项目根目录

# 编译
mvn clean install

# 运行单元测试
mvn test

提交规范

feat: 新增 XXX 功能
fix: 修复 XXX 问题
docs: 文档更新
refactor: 代码重构
test: 测试用例
chore: 构建/配置变更

📞 社区与支持

渠道 链接/方式
📧 联系作者 [email protected]
📚 官方文档 语雀文档
🌐 官网 www.mydataharbor.com
💬 Demo 环境 demo.mydataharbor.com
👥 QQ 交流群 QQ 群
(加群需验证 Star 数)

📝 更新日志

v2.0.1(当前版本)

  • ✨ 优化 pipeline 可视化创建,支持 UI 自由组合组件
  • 🔧 插件支持调整位置,解决依赖加载顺序问题
  • 🔄 支持插件重复安装和版本更新
  • ⚖️ 负载均衡和故障转移可分开配置

v2.0.0

  • 💾 新增 ITaskStorage 接口,支持任务状态持久化
  • 📊 管理台实时展示任务监控信息
  • 🔄 支持任务修改重建功能
  • 🎯 优化 rebalance 算法

v1.x

  • 初始版本,基础数据同步功能

📄 许可证

本项目采用 GNU GPL v3.0 开源协议。

Copyright (C) 2020-2026 xulang <[email protected]>

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

如果项目对你有帮助,欢迎 ⭐Star 支持!

成就
212
Star
67
Fork
成员(1)
会飞的猪

搜索帮助