# JMemQueue **Repository Path**: Sunleader/jshared-memory-ipc ## Basic Information - **Project Name**: JMemQueue - **Description**: shared memory for ipc 一个基于共享内存的ipc通信队列 - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: http://sunleader.top:18080/ - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-09 - **Last Updated**: 2026-01-28 ## Categories & Tags **Categories**: message-server **Tags**: None ## README
#
JMemQueue
###
高性能跨进程共享内存队列系统
# JMemQueue - 高性能跨进程共享内存队列系统 JMemQueue 是一个基于 Java NIO 和共享内存技术构建的高性能跨进程通信(IPC)解决方案,利用内存映射文件(MappedByteBuffer)实现低延迟、高吞吐量的数据传输。 # 适用场景 (简而言之,在单机场景下替代 Kafka,要比kafka更快!传输数据不能局限于字符串) - 单机服务器 - 多进程多服务需要高效低延迟通信 ## 🚀 特性 - **零拷贝机制**:基于内存映射文件,避免不必要的内存复制 - **无锁并发**:使用 CAS(Compare-And-Swap)操作确保线程安全 - **高吞吐量**:单线程每秒可处理数百万条消息 - **负载均衡**:同一个GROUP不会消费到重复数据 - **跨进程通信**:支持不同 JVM 进程间的高效数据交换 - **持久化存储**:数据在操作系统页面缓存中,断电不丢失(如启用持久化) - **可扩展架构**:支持动态扩容和负载均衡 ## 📊 架构设计 ### 数据结构 - SMG (Shared Memory Grid) 每个数据单元(SMG)占用 1024 字节,结构如下: | 偏移量 | 大小 | 描述 | |-----------|-----------|------------------------------------------------------------------------| | 0-3 | 4 字节 | 状态字段(STATE_IDLE=0, STATE_WRITING=1, STATE_READABLE=2, STATE_READING=3) | | 4-7 | 4 字节 | 数据大小(实际内容长度) | | 8-Int.MAX | Int.MAX-8 | 实际数据内容(<2G) | ### 核心组件 #### 1. JSharedMemQueue - 主队列接口,提供 `enqueue()` 和 `createReader()` 方法 - 管理全局偏移量和车厢分配 #### 2. JSharedMemBaseInfo - 维护队列的基础信息(总偏移量、车厢容量等) - 使用 `VarHandle` 提供原子操作支持 #### 3. JSharedMemCarriage - 逻辑车厢,管理多个 SMG 段 - 按需加载和卸载内存映射文件 #### 4. JSharedMemSegment - 单个内存段,对应一个 SMG - 提供状态管理和数据读写功能 #### 5. JSharedMemReader - 消费者端读取器 - 支持多线程并发消费 ## 🛠️ 快速开始 ### 环境要求 - Java 17 或更高版本 - Maven 3.6+ (或使用 Maven Wrapper) ### 安装与构建 ```bash io.github.sunleader1997 JMemQueue 1.0.2 ``` ### 生产者示例 ```java import io.github.sunleader1997.jmemqueue.JSharedMemQueue; // 创建共享内存队列 JSharedMemQueue queue = new JSharedMemQueue("my-topic"); JSharedMemProducer producer = queue.createProducer(); // 如果需要定义数据清理机制 // producer.setTimeToLive(1, TimeUnit.DAYS); // 写入数据 String message = "Hello, Shared Memory Queue!"; byte[] data = message.getBytes(StandardCharsets.UTF_8); boolean success = producer.enqueue(data); ``` ### 消费者示例 ```java import io.github.sunleader1997.jmemqueue.JSharedMemQueue; import io.github.sunleader1997.jmemqueue.JSharedMemReader; // 创建队列实例 JSharedMemQueue queue = new JSharedMemQueue("my-topic"); // 创建读取器 JSharedMemReader reader = queue.createReader(); // 消费消息 byte[] data = reader.dequeue(); if(data !=null){ String message = new String(data, StandardCharsets.UTF_8); System.out.println("收到消息: "+message); } ``` ## ⚡ 性能基准 在典型硬件环境下(Intel i7, 16GB RAM),JMemQueue 达到以下性能指标: - **windows 吞吐量**: 单线程 200万+ 条消息/秒 4线程(多进程消费) 600万+ 条消息/秒 - **linux 吞吐量**: 4线程(多进程消费) 1800万+ 条消息/秒 - **延迟**: 平均 < 1 微秒 - **内存使用**: 每个 SMG 固定 1KB,支持动态扩展 - **并发**: 支持数百个生产者和消费者线程 ```shell windows ========== 消费者测试统计 ========== 车厢容量: 1024000 总消息容量: 1024000 成功消费消息数: 4096000 dequeue返回null次数: 0 总耗时: 686 ms 消费吞吐量: 5970845 msg/s 业务线程数: 4 =================================== ubuntu ========== 消费者测试统计 ========== 车厢容量: 1024000 总消息容量: 1024000 成功消费消息数: 4096000 dequeue返回null次数: 0 总耗时: 226 ms 消费吞吐量: 18123893 msg/s 业务线程数: 4 =================================== ``` ## 🔧 配置参数 ### 队列参数 - `topic`: 队列主题名称,用于区分不同的队列实例 - `capacity`: 队列容量(SMG 数量),影响内存使用和性能 - `overwrite`: 是否覆盖现有队列数据 ### 系统参数 - `Dictionary.PARENT_DIR`: 共享内存文件存储目录(默认 `${tmp}/JSMQ/`) ## 📁 文件结构 JMemQueue 在系统中创建以下文件: - `${tmp}/JSMQ/${topic}/${topic}.base` - 队列基础信息文件 - `${tmp}/JSMQ/${topic}/${n}.carriage` - 数据车厢文件 - `${tmp}/JSMQ/${topic}/${group}.reader` - 读取器状态文件 ## 🧪 测试套件 项目包含完整的性能测试用例,可验证吞吐量和正确性: ```bash # 运行测试 ./mvn test # 运行特定测试 ./mvn test -Dtest=ConsumerTest ``` ## 🔒 线程安全 - 所有状态变更使用 CAS 操作保证原子性 - 读写分离,避免锁竞争 - 支持多生产者多消费者模式 ## 📈 扩展性 - 动态车厢管理,支持 TB 级别数据存储 - 可配置的内存段大小 - 支持多种数据格式序列化 ## 🤝 贡献 欢迎提交 Issue 和 Pull Request 来改进 JMemQueue! ## 📄 许可证 Apache License 2.0