# JMemQueue
**Repository Path**: Sunleader/jshared-memory-ipc
## Basic Information
- **Project Name**: JMemQueue
- **Description**: shared memory for ipc
一个基于共享内存的ipc通信队列
- **Primary Language**: Unknown
- **License**: MulanPSL-2.0
- **Default Branch**: master
- **Homepage**: http://sunleader.top:18080/
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-01-09
- **Last Updated**: 2026-01-28
## Categories & Tags
**Categories**: message-server
**Tags**: None
## README
# JMemQueue
### 高性能跨进程共享内存队列系统
# JMemQueue - 高性能跨进程共享内存队列系统
JMemQueue 是一个基于 Java NIO 和共享内存技术构建的高性能跨进程通信(IPC)解决方案,利用内存映射文件(MappedByteBuffer)实现低延迟、高吞吐量的数据传输。
# 适用场景
(简而言之,在单机场景下替代 Kafka,要比kafka更快!传输数据不能局限于字符串)
- 单机服务器
- 多进程多服务需要高效低延迟通信
## 🚀 特性
- **零拷贝机制**:基于内存映射文件,避免不必要的内存复制
- **无锁并发**:使用 CAS(Compare-And-Swap)操作确保线程安全
- **高吞吐量**:单线程每秒可处理数百万条消息
- **负载均衡**:同一个GROUP不会消费到重复数据
- **跨进程通信**:支持不同 JVM 进程间的高效数据交换
- **持久化存储**:数据在操作系统页面缓存中,断电不丢失(如启用持久化)
- **可扩展架构**:支持动态扩容和负载均衡
## 📊 架构设计
### 数据结构 - SMG (Shared Memory Grid)
每个数据单元(SMG)占用 1024 字节,结构如下:
| 偏移量 | 大小 | 描述 |
|-----------|-----------|------------------------------------------------------------------------|
| 0-3 | 4 字节 | 状态字段(STATE_IDLE=0, STATE_WRITING=1, STATE_READABLE=2, STATE_READING=3) |
| 4-7 | 4 字节 | 数据大小(实际内容长度) |
| 8-Int.MAX | Int.MAX-8 | 实际数据内容(<2G) |
### 核心组件
#### 1. JSharedMemQueue
- 主队列接口,提供 `enqueue()` 和 `createReader()` 方法
- 管理全局偏移量和车厢分配
#### 2. JSharedMemBaseInfo
- 维护队列的基础信息(总偏移量、车厢容量等)
- 使用 `VarHandle` 提供原子操作支持
#### 3. JSharedMemCarriage
- 逻辑车厢,管理多个 SMG 段
- 按需加载和卸载内存映射文件
#### 4. JSharedMemSegment
- 单个内存段,对应一个 SMG
- 提供状态管理和数据读写功能
#### 5. JSharedMemReader
- 消费者端读取器
- 支持多线程并发消费
## 🛠️ 快速开始
### 环境要求
- Java 17 或更高版本
- Maven 3.6+ (或使用 Maven Wrapper)
### 安装与构建
```bash
io.github.sunleader1997
JMemQueue
1.0.2
```
### 生产者示例
```java
import io.github.sunleader1997.jmemqueue.JSharedMemQueue;
// 创建共享内存队列
JSharedMemQueue queue = new JSharedMemQueue("my-topic");
JSharedMemProducer producer = queue.createProducer();
// 如果需要定义数据清理机制
// producer.setTimeToLive(1, TimeUnit.DAYS);
// 写入数据
String message = "Hello, Shared Memory Queue!";
byte[] data = message.getBytes(StandardCharsets.UTF_8);
boolean success = producer.enqueue(data);
```
### 消费者示例
```java
import io.github.sunleader1997.jmemqueue.JSharedMemQueue;
import io.github.sunleader1997.jmemqueue.JSharedMemReader;
// 创建队列实例
JSharedMemQueue queue = new JSharedMemQueue("my-topic");
// 创建读取器
JSharedMemReader reader = queue.createReader();
// 消费消息
byte[] data = reader.dequeue();
if(data !=null){
String message = new String(data, StandardCharsets.UTF_8);
System.out.println("收到消息: "+message);
}
```
## ⚡ 性能基准
在典型硬件环境下(Intel i7, 16GB RAM),JMemQueue 达到以下性能指标:
- **windows 吞吐量**: 单线程 200万+ 条消息/秒 4线程(多进程消费) 600万+ 条消息/秒
- **linux 吞吐量**: 4线程(多进程消费) 1800万+ 条消息/秒
- **延迟**: 平均 < 1 微秒
- **内存使用**: 每个 SMG 固定 1KB,支持动态扩展
- **并发**: 支持数百个生产者和消费者线程
```shell
windows
========== 消费者测试统计 ==========
车厢容量: 1024000
总消息容量: 1024000
成功消费消息数: 4096000
dequeue返回null次数: 0
总耗时: 686 ms
消费吞吐量: 5970845 msg/s
业务线程数: 4
===================================
ubuntu
========== 消费者测试统计 ==========
车厢容量: 1024000
总消息容量: 1024000
成功消费消息数: 4096000
dequeue返回null次数: 0
总耗时: 226 ms
消费吞吐量: 18123893 msg/s
业务线程数: 4
===================================
```
## 🔧 配置参数
### 队列参数
- `topic`: 队列主题名称,用于区分不同的队列实例
- `capacity`: 队列容量(SMG 数量),影响内存使用和性能
- `overwrite`: 是否覆盖现有队列数据
### 系统参数
- `Dictionary.PARENT_DIR`: 共享内存文件存储目录(默认 `${tmp}/JSMQ/`)
## 📁 文件结构
JMemQueue 在系统中创建以下文件:
- `${tmp}/JSMQ/${topic}/${topic}.base` - 队列基础信息文件
- `${tmp}/JSMQ/${topic}/${n}.carriage` - 数据车厢文件
- `${tmp}/JSMQ/${topic}/${group}.reader` - 读取器状态文件
## 🧪 测试套件
项目包含完整的性能测试用例,可验证吞吐量和正确性:
```bash
# 运行测试
./mvn test
# 运行特定测试
./mvn test -Dtest=ConsumerTest
```
## 🔒 线程安全
- 所有状态变更使用 CAS 操作保证原子性
- 读写分离,避免锁竞争
- 支持多生产者多消费者模式
## 📈 扩展性
- 动态车厢管理,支持 TB 级别数据存储
- 可配置的内存段大小
- 支持多种数据格式序列化
## 🤝 贡献
欢迎提交 Issue 和 Pull Request 来改进 JMemQueue!
## 📄 许可证
Apache License 2.0