加密货币高频交易回测框架。基于事件驱动架构,通过统一的策略接口支持历史回测、模拟盘和实盘三种运行模式,策略代码无需修改即可在三种模式间切换。
demo.mp4
┌─────────────────────┐
│ ExchangeFactory │ 享元工厂,根据配置创建交易所实例
└──────────┬──────────┘
│ 创建
┌────────────────────┼────────────────────┐
│ │ │
┌────────▼────────┐ ┌───────▼────────┐ ┌────────▼────────┐
│ BacktestExchange │ │ MockExchange │ │ RealExchange │
│ (数据库回放行情) │ │ (真实行情+模拟撮合)│ │ (真实交易所API) │
└────────┬────────┘ └───────┬────────┘ └────────┬────────┘
│ │ │
└───────────────────┼─────────────────────┘
│ 触发事件
┌─────────▼──────────┐
│ EventEngine │ 事件总线,注册/触发模式
└─────────┬──────────┘
│ 分发事件
┌───────────────────┼───────────────────┐
│ │ │
┌────────▼────────┐ ┌───────▼────────┐ ┌────────▼────────┐
│ Strategy │ │ Recorder │ │ BacktestIndicators │
│ (策略逻辑处理) │ │ (行情数据录制) │ │ (回测指标计算) │
└─────────────────┘ └────────────────┘ └──────────────────┘
| 模式 | 数据来源 | 撮合方式 | 用途 |
|---|---|---|---|
| backtest | InfluxDB 历史数据回放 | 模拟撮合 | 策略回测、参数优化 |
| sim | 交易所 WebSocket 实时行情 | 模拟撮合 | 模拟盘验证策略 |
| real | 交易所 WebSocket 实时行情 | 真实下单 | 实盘交易 |
录制和回测覆盖 orderbook(订单簿深度) 和 trades(逐笔成交) 两种数据:
- 录制:
Recorder监听事件引擎的onTrade和onDepth事件,将 trades 和 orderbook 数据写入 InfluxDB - 回放:
BacktestExchange通过query_combined从 InfluxDB 读取数据,按时间戳归并两种数据流,通过字段判断('bids' in data and 'asks' in data)区分 orderbook 和 trade,分别触发onDepth和onTrade事件
系统支持同时配置多个交易对(symbollist),但品种之间相互独立,没有联动关系:
- 策略基类
Strategy按exchange → symbol分层存储行情数据(exchange_symbol_dict)和账户持仓(exchange_dict),每个 symbol 的回调独立触发、独立处理 - 下单操作按具体 symbol 独立执行,品种间不存在信息共享或联动逻辑
- 本质上是同一个策略实例在多个品种上独立运行
如需跨品种联动(如品种间对冲、价差交易),需在策略层自行编写品种间的关联逻辑。
| 模块 | 文件 | 说明 |
|---|---|---|
| 事件引擎 | eventengine.py |
事件总线,所有组件通过事件解耦通信,支持同步/异步回调,单个回调异常不阻断后续执行 |
| 交易所基类 | exchange.py |
抽象交易所接口(Exchange)+ 享元工厂(ExchangeFactory),封装 ccxt.pro WebSocket 订阅和下单 |
| 模拟交易所 | simulator.py |
MockExchange:完整模拟撮合引擎,含保证金管理、手续费/滑点模拟、余额保护 |
| 回测引擎 | backtest.py |
BacktestExchange(数据库回放)+ BacktestIndicators(夏普比率、最大回撤、胜率等指标计算) |
| 策略基类 | mystrategy/strategy.py |
抽象策略类,封装行情状态管理、下单接口和风控检查 |
| 示例策略 | mystrategy/simple_Market_strategy.py |
基于成交量差的做市策略示例 |
| 数据客户端 | influxdb.py |
InfluxDB 异步客户端,存储/查询 orderbook 和 trades,支持按时间戳归并排序 |
| 行情录制 | recorder.py |
监听事件引擎的行情事件写入数据库 |
| 数据模型 | model.py |
ClientParams 配置类 |
| 交易所映射 | mapping.py |
交易所模块名到类名的映射注册表 |
| 日志工具 | tool.py |
日志初始化(文件滚动 + 控制台输出) |
所有交易所实例通过 EventEngine 触发以下事件:
| 事件 | 说明 | 数据格式 |
|---|---|---|
onTrade |
市场逐笔成交 | {exchange, symbol, trades} |
onDepth |
订单簿深度快照 | {exchange, symbol, depth} |
onOrder |
订单状态更新 | {exchange, symbol, orders} |
onMyTrade |
自身成交通知 | {exchange, symbol, trades} |
onAccount |
账户余额更新 | {exchange, account} |
onPosition |
持仓更新 | {exchange, positions} |
onCreateOrder |
下单确认 | {exchange, symbol, orders} |
onCancelOrder |
撤单确认 | {exchange, symbol, orders} |
money_pos |
平仓盈亏(仅模拟/回测) | {exchange, money_pos, total_equity} |
onContractSize |
合约面值 | {symbol, contractSize} |
- Python 3.10+
- InfluxDB 2.x(回测和行情录制需要)
pip install -r requirements.txt核心依赖:
ccxt— 交易所 API 统一接口influxdb_client— InfluxDB 异步客户端numpy— 数值计算
参考 ExchangeConfig.example.json 创建配置文件:
cp ExchangeConfig.example.json ExchangeConfig.json[
{
"access_key": "YOUR_ACCESS_KEY",
"secret_key": "YOUR_SECRET_KEY",
"uid": "YOUR_UID",
"password": "YOUR_PASSWORD",
"exchangename": "gateio",
"exchangeType": "swap",
"defaultType": "swap",
"symbollist": ["BTC/USDT:USDT"],
"ping": "futures.ping",
"mode": "backtest",
"start_time": "2024-08-11T09:00:00+00:00",
"end_time": "2024-08-11T10:00:00+00:00"
}
]配置项说明:
| 字段 | 说明 | 示例 |
|---|---|---|
access_key |
API Key | — |
secret_key |
API Secret | — |
uid |
用户 ID(部分交易所需要) | — |
password |
API 密码(部分交易所需要) | — |
exchangename |
交易所名称,对应 ccxt 中的名称 | gateio |
exchangeType |
交易类型 | swap(永续合约) |
defaultType |
ccxt 默认类型 | swap |
symbollist |
交易对列表 | ["BTC/USDT:USDT"] |
ping |
WebSocket 心跳方法名(可选) | futures.ping |
mode |
运行模式 | backtest / sim / real |
start_time |
回测开始时间(仅 backtest 模式) | RFC3339 格式 |
end_time |
回测结束时间(仅 backtest 模式) | RFC3339 格式 |
多交易所配置: 数组中可添加多个配置条目,每个条目可以有不同的交易所和运行模式。
参考 DBConfig.example.json 创建配置文件:
cp DBConfig.example.json DBConfig.json然后编辑 DBConfig.json,填入你的 InfluxDB 连接信息:
{
"url": "http://localhost:8086",
"token": "YOUR_REMOTE_INFLUXDB_TOKEN",
"local_token": "YOUR_LOCAL_INFLUXDB_TOKEN",
"org": "YOUR_ORG",
"bucket": "swap"
}配置项说明:
| 字段 | 说明 |
|---|---|
url |
InfluxDB 服务器地址,默认本地 |
token |
远程 InfluxDB 的访问令牌 |
local_token |
本地 InfluxDB 的访问令牌(回测和行情录制优先使用) |
org |
InfluxDB 组织名 |
bucket |
数据存储的 bucket 名称 |
注意:
DBConfig.json包含数据库凭据,已在.gitignore中排除,不会被提交到版本控制。
python backtest_run.py- 将
ExchangeConfig.json中的mode设为"backtest" - 设置
start_time和end_time指定回测时间范围 - 确保 InfluxDB 中有对应时间范围的行情数据(通过行情录制获取)
- 运行后自动回放行情,回测结束输出指标报告
回测指标说明:
| 指标 | 说明 |
|---|---|
returns |
总收益率(最终权益 / 初始权益) |
sharpe |
夏普比率(每笔交易维度,ddof=1,未年化) |
max_drawdown |
最大回撤 |
win_rate |
胜率 |
profit_loss_ratio |
盈亏比(平均盈利 / 平均亏损) |
avg_returns |
平均每笔收益率 |
std_returns |
收益率标准差 |
注意: 夏普比率未年化,使用时需乘
sqrt(年交易笔数)进行年化。初始资金默认 100,000 USDT。
python main_run.py- 将
ExchangeConfig.json中的mode设为"sim" - 需要有效的 API Key(用于连接 WebSocket 获取行情,不会真实下单)
- 模拟撮合引擎会根据真实 trade 数据判断限价单是否成交
- 模拟参数:杠杆 20x、手续费率 0.05%(taker)、无滑点(默认值)
python main_run.py- 将
ExchangeConfig.json中的mode设为"real" - 需要有效的 API Key 和 Secret
- 连接真实交易所 WebSocket 并通过 API 下单
- 请注意: 实盘模式会使用真实资金,请确保策略经过充分回测和模拟盘验证
python record_data_run.py- 将
ExchangeConfig.json中的mode设为"real"或"sim" - 录制器会监听所有行情事件并将 orderbook 和 trades 数据写入 InfluxDB
- 录制的数据可供回测模式使用
- 确保本地 InfluxDB 服务已启动
- 在
realexchange/目录下创建 Python 文件,继承Exchange基类:
from model import ClientParams
from exchange import Exchange
class MyExchange(Exchange):
def __init__(self, cp: ClientParams, loggger, eventengine):
super().__init__(cp, loggger, eventengine)
# 按需覆写 watch_* 方法,添加交易所特定的数据处理逻辑- 在
mapping.py中注册映射(文件名 → 类名):
modules_classes = {
"gateio_swap": "GateioSwap",
"myexchange_swap": "MyExchange", # 新增
}- 在
ExchangeConfig.json中添加对应配置,exchangename和exchangeType拼接后应与映射的 key 一致。
- 在
mystrategy/目录下创建 Python 文件,继承Strategy基类:
from mystrategy.strategy import Strategy
class MyStrategy(Strategy):
async def On_trade(self, data):
"""处理市场成交数据"""
await super().On_trade(data) # 调用父类方法更新内部状态
# 在此添加策略逻辑
def On_depth(self, data):
"""处理订单簿数据"""
super().On_depth(data)
# 在此添加策略逻辑
def On_order(self, data):
"""处理订单状态更新"""
super().On_order(data)
def On_mytrade(self, data):
"""处理自身成交通知"""
super().On_mytrade(data)
async def On_position(self, data):
"""处理持仓更新"""
await super().On_position(data)
def On_account(self, data):
"""处理账户余额更新"""
super().On_account(data)
def On_cancelorder(self, data):
"""处理撤单确认"""
super().On_cancelorder(data)
def On_createorder(self, data):
"""处理下单确认"""
super().On_createorder(data)- 策略可通过以下接口下单和撤单:
# 下单
await self.CreateOrder(exchange, symbol, 'buy', price, amount)
await self.CreateOrder(exchange, symbol, 'sell', price, amount, params={'cid': 'my-order-id'})
# 撤销所有挂单
await self.CancelAllOrders(exchange, symbol)- 策略可访问以下状态数据:
# 行情数据
self.exchange_symbol_dict[exchange][symbol]['orderbook'] # 当前订单簿(含 mid 中间价)
self.exchange_symbol_dict[exchange][symbol]['trades'] # 最近 1000 笔成交
self.exchange_symbol_dict[exchange][symbol]['trade_diff'] # 成交价偏离中间价的百分比数组
self.exchange_symbol_dict[exchange][symbol]['orders'] # 当前挂单字典
self.exchange_symbol_dict[exchange][symbol]['mytrades'] # 自身最近 10000 笔成交
# 账户和持仓
self.exchange_dict[exchange]['account'] # {'free': float, 'total': float, 'intitial': float}
self.exchange_dict[exchange]['position'] # {symbol: position_dict}- 在对应的
*_run.py中替换策略类:
from mystrategy.my_strategy import MyStrategy
strategy = MyStrategy(logger, eventengine)- 延迟监控: 自动监测公开/私有通道网络延迟,公开延迟超过 30 秒或私有延迟超过 3 秒时停止交易
- 资金回撤风控: 总资金低于初始资金 95% 时自动停止交易,需手动重启恢复
- 余额保护: 模拟撮合引擎在余额为负时停止后续成交,反向开仓余额不足时跳过
- 连接监控:
MonitorTasks每 30 秒检查数据更新,超过 30 秒无更新自动重连(仅 sim/real 模式)
MockExchange 支持以下可调参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
leverage |
20 | 杠杆倍数 |
fee_rate |
0.0005 | Taker 手续费率(0.05%) |
slippage_bps |
0.0 | 滑点(基点,1 bps = 0.01%) |
| 初始资金 | 100,000 USDT | 模拟账户初始余额 |
HFbacktest/
├── backtest_run.py # 回测模式入口
├── main_run.py # 模拟盘/实盘入口
├── record_data_run.py # 行情录制入口
├── eventengine.py # 事件引擎
├── exchange.py # 交易所基类 + 享元工厂
├── simulator.py # 模拟交易所(撮合引擎)
├── backtest.py # 回测交易所 + 指标计算
├── influxdb.py # InfluxDB 异步客户端
├── recorder.py # 行情录制器
├── model.py # 数据模型(ClientParams)
├── mapping.py # 交易所映射注册表
├── tool.py # 日志工具
├── mystrategy/
│ ├── strategy.py # 策略抽象基类
│ └── simple_Market_strategy.py # 做市策略示例
├── realexchange/
│ └── gateio_swap.py # Gate.io 合约交易所实现
├── ExchangeConfig.json # 交易所配置(已在 .gitignore 中)
├── ExchangeConfig.example.json # 交易所配置模板
├── DBConfig.json # InfluxDB 配置(已在 .gitignore 中)
├── DBConfig.example.json # 数据库配置模板
├── requirements.txt # Python 依赖
└── CLAUDE.md # 项目开发文档
- Python 3 + asyncio — 异步事件驱动架构
- ccxt.pro — 统一交易所 WebSocket/REST API 接口
- InfluxDB 2.x — 时序数据存储(行情数据)
- NumPy — 数值计算(策略指标、收益率统计)