Skip to content

mmletgo/HFbacktest

Repository files navigation

HFbacktest

加密货币高频交易回测框架。基于事件驱动架构,通过统一的策略接口支持历史回测模拟盘实盘三种运行模式,策略代码无需修改即可在三种模式间切换。

演示

demo.mp4

架构

                        ┌─────────────────────┐
                        │   ExchangeFactory    │  享元工厂,根据配置创建交易所实例
                        └──────────┬──────────┘
                                   │ 创建
              ┌────────────────────┼────────────────────┐
              │                    │                     │
     ┌────────▼────────┐  ┌───────▼────────┐  ┌────────▼────────┐
     │ BacktestExchange │  │  MockExchange   │  │  RealExchange   │
     │ (数据库回放行情) │  │ (真实行情+模拟撮合)│  │ (真实交易所API) │
     └────────┬────────┘  └───────┬────────┘  └────────┬────────┘
              │                   │                     │
              └───────────────────┼─────────────────────┘
                                  │ 触发事件
                        ┌─────────▼──────────┐
                        │    EventEngine      │  事件总线,注册/触发模式
                        └─────────┬──────────┘
                                  │ 分发事件
              ┌───────────────────┼───────────────────┐
              │                   │                     │
     ┌────────▼────────┐  ┌───────▼────────┐  ┌────────▼────────┐
     │    Strategy      │  │    Recorder     │  │  BacktestIndicators │
     │ (策略逻辑处理)   │  │ (行情数据录制)  │  │ (回测指标计算)      │
     └─────────────────┘  └────────────────┘  └──────────────────┘

三种运行模式

模式 数据来源 撮合方式 用途
backtest InfluxDB 历史数据回放 模拟撮合 策略回测、参数优化
sim 交易所 WebSocket 实时行情 模拟撮合 模拟盘验证策略
real 交易所 WebSocket 实时行情 真实下单 实盘交易

数据流

录制和回测覆盖 orderbook(订单簿深度)trades(逐笔成交) 两种数据:

  • 录制Recorder 监听事件引擎的 onTradeonDepth 事件,将 trades 和 orderbook 数据写入 InfluxDB
  • 回放BacktestExchange 通过 query_combined 从 InfluxDB 读取数据,按时间戳归并两种数据流,通过字段判断('bids' in data and 'asks' in data)区分 orderbook 和 trade,分别触发 onDepthonTrade 事件

多品种执行模型

系统支持同时配置多个交易对(symbollist),但品种之间相互独立,没有联动关系

  • 策略基类 Strategyexchange → symbol 分层存储行情数据(exchange_symbol_dict)和账户持仓(exchange_dict),每个 symbol 的回调独立触发、独立处理
  • 下单操作按具体 symbol 独立执行,品种间不存在信息共享或联动逻辑
  • 本质上是同一个策略实例在多个品种上独立运行

如需跨品种联动(如品种间对冲、价差交易),需在策略层自行编写品种间的关联逻辑。

核心模块

模块 文件 说明
事件引擎 eventengine.py 事件总线,所有组件通过事件解耦通信,支持同步/异步回调,单个回调异常不阻断后续执行
交易所基类 exchange.py 抽象交易所接口(Exchange)+ 享元工厂(ExchangeFactory),封装 ccxt.pro WebSocket 订阅和下单
模拟交易所 simulator.py MockExchange:完整模拟撮合引擎,含保证金管理、手续费/滑点模拟、余额保护
回测引擎 backtest.py BacktestExchange(数据库回放)+ BacktestIndicators(夏普比率、最大回撤、胜率等指标计算)
策略基类 mystrategy/strategy.py 抽象策略类,封装行情状态管理、下单接口和风控检查
示例策略 mystrategy/simple_Market_strategy.py 基于成交量差的做市策略示例
数据客户端 influxdb.py InfluxDB 异步客户端,存储/查询 orderbook 和 trades,支持按时间戳归并排序
行情录制 recorder.py 监听事件引擎的行情事件写入数据库
数据模型 model.py ClientParams 配置类
交易所映射 mapping.py 交易所模块名到类名的映射注册表
日志工具 tool.py 日志初始化(文件滚动 + 控制台输出)

事件类型

所有交易所实例通过 EventEngine 触发以下事件:

事件 说明 数据格式
onTrade 市场逐笔成交 {exchange, symbol, trades}
onDepth 订单簿深度快照 {exchange, symbol, depth}
onOrder 订单状态更新 {exchange, symbol, orders}
onMyTrade 自身成交通知 {exchange, symbol, trades}
onAccount 账户余额更新 {exchange, account}
onPosition 持仓更新 {exchange, positions}
onCreateOrder 下单确认 {exchange, symbol, orders}
onCancelOrder 撤单确认 {exchange, symbol, orders}
money_pos 平仓盈亏(仅模拟/回测) {exchange, money_pos, total_equity}
onContractSize 合约面值 {symbol, contractSize}

快速开始

环境要求

  • Python 3.10+
  • InfluxDB 2.x(回测和行情录制需要)

安装依赖

pip install -r requirements.txt

核心依赖:

  • ccxt — 交易所 API 统一接口
  • influxdb_client — InfluxDB 异步客户端
  • numpy — 数值计算

配置文件

1. 交易所配置 ExchangeConfig.json

参考 ExchangeConfig.example.json 创建配置文件:

cp ExchangeConfig.example.json ExchangeConfig.json
[
    {
        "access_key": "YOUR_ACCESS_KEY",
        "secret_key": "YOUR_SECRET_KEY",
        "uid": "YOUR_UID",
        "password": "YOUR_PASSWORD",
        "exchangename": "gateio",
        "exchangeType": "swap",
        "defaultType": "swap",
        "symbollist": ["BTC/USDT:USDT"],
        "ping": "futures.ping",
        "mode": "backtest",
        "start_time": "2024-08-11T09:00:00+00:00",
        "end_time": "2024-08-11T10:00:00+00:00"
    }
]

配置项说明:

字段 说明 示例
access_key API Key
secret_key API Secret
uid 用户 ID(部分交易所需要)
password API 密码(部分交易所需要)
exchangename 交易所名称,对应 ccxt 中的名称 gateio
exchangeType 交易类型 swap(永续合约)
defaultType ccxt 默认类型 swap
symbollist 交易对列表 ["BTC/USDT:USDT"]
ping WebSocket 心跳方法名(可选) futures.ping
mode 运行模式 backtest / sim / real
start_time 回测开始时间(仅 backtest 模式) RFC3339 格式
end_time 回测结束时间(仅 backtest 模式) RFC3339 格式

多交易所配置: 数组中可添加多个配置条目,每个条目可以有不同的交易所和运行模式。

2. 数据库配置 DBConfig.json

参考 DBConfig.example.json 创建配置文件:

cp DBConfig.example.json DBConfig.json

然后编辑 DBConfig.json,填入你的 InfluxDB 连接信息:

{
    "url": "http://localhost:8086",
    "token": "YOUR_REMOTE_INFLUXDB_TOKEN",
    "local_token": "YOUR_LOCAL_INFLUXDB_TOKEN",
    "org": "YOUR_ORG",
    "bucket": "swap"
}

配置项说明:

字段 说明
url InfluxDB 服务器地址,默认本地
token 远程 InfluxDB 的访问令牌
local_token 本地 InfluxDB 的访问令牌(回测和行情录制优先使用)
org InfluxDB 组织名
bucket 数据存储的 bucket 名称

注意: DBConfig.json 包含数据库凭据,已在 .gitignore 中排除,不会被提交到版本控制。

运行

历史数据回测

python backtest_run.py
  1. ExchangeConfig.json 中的 mode 设为 "backtest"
  2. 设置 start_timeend_time 指定回测时间范围
  3. 确保 InfluxDB 中有对应时间范围的行情数据(通过行情录制获取)
  4. 运行后自动回放行情,回测结束输出指标报告

回测指标说明:

指标 说明
returns 总收益率(最终权益 / 初始权益)
sharpe 夏普比率(每笔交易维度,ddof=1,未年化)
max_drawdown 最大回撤
win_rate 胜率
profit_loss_ratio 盈亏比(平均盈利 / 平均亏损)
avg_returns 平均每笔收益率
std_returns 收益率标准差

注意: 夏普比率未年化,使用时需乘 sqrt(年交易笔数) 进行年化。初始资金默认 100,000 USDT。

模拟盘

python main_run.py
  1. ExchangeConfig.json 中的 mode 设为 "sim"
  2. 需要有效的 API Key(用于连接 WebSocket 获取行情,不会真实下单)
  3. 模拟撮合引擎会根据真实 trade 数据判断限价单是否成交
  4. 模拟参数:杠杆 20x、手续费率 0.05%(taker)、无滑点(默认值)

实盘交易

python main_run.py
  1. ExchangeConfig.json 中的 mode 设为 "real"
  2. 需要有效的 API Key 和 Secret
  3. 连接真实交易所 WebSocket 并通过 API 下单
  4. 请注意: 实盘模式会使用真实资金,请确保策略经过充分回测和模拟盘验证

行情数据录制

python record_data_run.py
  1. ExchangeConfig.json 中的 mode 设为 "real""sim"
  2. 录制器会监听所有行情事件并将 orderbook 和 trades 数据写入 InfluxDB
  3. 录制的数据可供回测模式使用
  4. 确保本地 InfluxDB 服务已启动

扩展指南

添加新交易所

  1. realexchange/ 目录下创建 Python 文件,继承 Exchange 基类:
from model import ClientParams
from exchange import Exchange

class MyExchange(Exchange):
    def __init__(self, cp: ClientParams, loggger, eventengine):
        super().__init__(cp, loggger, eventengine)

    # 按需覆写 watch_* 方法,添加交易所特定的数据处理逻辑
  1. mapping.py 中注册映射(文件名 → 类名):
modules_classes = {
    "gateio_swap": "GateioSwap",
    "myexchange_swap": "MyExchange",  # 新增
}
  1. ExchangeConfig.json 中添加对应配置,exchangenameexchangeType 拼接后应与映射的 key 一致。

添加新策略

  1. mystrategy/ 目录下创建 Python 文件,继承 Strategy 基类:
from mystrategy.strategy import Strategy

class MyStrategy(Strategy):
    async def On_trade(self, data):
        """处理市场成交数据"""
        await super().On_trade(data)  # 调用父类方法更新内部状态
        # 在此添加策略逻辑

    def On_depth(self, data):
        """处理订单簿数据"""
        super().On_depth(data)
        # 在此添加策略逻辑

    def On_order(self, data):
        """处理订单状态更新"""
        super().On_order(data)

    def On_mytrade(self, data):
        """处理自身成交通知"""
        super().On_mytrade(data)

    async def On_position(self, data):
        """处理持仓更新"""
        await super().On_position(data)

    def On_account(self, data):
        """处理账户余额更新"""
        super().On_account(data)

    def On_cancelorder(self, data):
        """处理撤单确认"""
        super().On_cancelorder(data)

    def On_createorder(self, data):
        """处理下单确认"""
        super().On_createorder(data)
  1. 策略可通过以下接口下单和撤单:
# 下单
await self.CreateOrder(exchange, symbol, 'buy', price, amount)
await self.CreateOrder(exchange, symbol, 'sell', price, amount, params={'cid': 'my-order-id'})

# 撤销所有挂单
await self.CancelAllOrders(exchange, symbol)
  1. 策略可访问以下状态数据:
# 行情数据
self.exchange_symbol_dict[exchange][symbol]['orderbook']  # 当前订单簿(含 mid 中间价)
self.exchange_symbol_dict[exchange][symbol]['trades']     # 最近 1000 笔成交
self.exchange_symbol_dict[exchange][symbol]['trade_diff'] # 成交价偏离中间价的百分比数组
self.exchange_symbol_dict[exchange][symbol]['orders']     # 当前挂单字典
self.exchange_symbol_dict[exchange][symbol]['mytrades']   # 自身最近 10000 笔成交

# 账户和持仓
self.exchange_dict[exchange]['account']  # {'free': float, 'total': float, 'intitial': float}
self.exchange_dict[exchange]['position'] # {symbol: position_dict}
  1. 在对应的 *_run.py 中替换策略类:
from mystrategy.my_strategy import MyStrategy

strategy = MyStrategy(logger, eventengine)

内置风控机制

  • 延迟监控: 自动监测公开/私有通道网络延迟,公开延迟超过 30 秒或私有延迟超过 3 秒时停止交易
  • 资金回撤风控: 总资金低于初始资金 95% 时自动停止交易,需手动重启恢复
  • 余额保护: 模拟撮合引擎在余额为负时停止后续成交,反向开仓余额不足时跳过
  • 连接监控: MonitorTasks 每 30 秒检查数据更新,超过 30 秒无更新自动重连(仅 sim/real 模式)

模拟撮合参数

MockExchange 支持以下可调参数:

参数 默认值 说明
leverage 20 杠杆倍数
fee_rate 0.0005 Taker 手续费率(0.05%)
slippage_bps 0.0 滑点(基点,1 bps = 0.01%)
初始资金 100,000 USDT 模拟账户初始余额

项目结构

HFbacktest/
├── backtest_run.py          # 回测模式入口
├── main_run.py              # 模拟盘/实盘入口
├── record_data_run.py       # 行情录制入口
├── eventengine.py           # 事件引擎
├── exchange.py              # 交易所基类 + 享元工厂
├── simulator.py             # 模拟交易所(撮合引擎)
├── backtest.py              # 回测交易所 + 指标计算
├── influxdb.py              # InfluxDB 异步客户端
├── recorder.py              # 行情录制器
├── model.py                 # 数据模型(ClientParams)
├── mapping.py               # 交易所映射注册表
├── tool.py                  # 日志工具
├── mystrategy/
│   ├── strategy.py          # 策略抽象基类
│   └── simple_Market_strategy.py  # 做市策略示例
├── realexchange/
│   └── gateio_swap.py       # Gate.io 合约交易所实现
├── ExchangeConfig.json      # 交易所配置(已在 .gitignore 中)
├── ExchangeConfig.example.json  # 交易所配置模板
├── DBConfig.json            # InfluxDB 配置(已在 .gitignore 中)
├── DBConfig.example.json    # 数据库配置模板
├── requirements.txt         # Python 依赖
└── CLAUDE.md                # 项目开发文档

技术栈

  • Python 3 + asyncio — 异步事件驱动架构
  • ccxt.pro — 统一交易所 WebSocket/REST API 接口
  • InfluxDB 2.x — 时序数据存储(行情数据)
  • NumPy — 数值计算(策略指标、收益率统计)

About

加密货币高频交易回测框架 | 基于事件驱动架构,支持实盘/模拟盘/历史回测三种模式

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages