Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ stream.latestTradeDetail$.subscribe((v) => {})
- [ ] Delete Listen Key
* Socket API
* Market Data
- [ ] Subscribe Market Depth Data
- [x] Subscribe Market Depth Data
- [x] Subscribe the Latest Trade Detail
- [ ] Subscribe K-Line Data
- [x] Subscribe K-Line Data
* Account Data
- [x] Listen Key expired push
- [x] Account balance and position update push
Expand Down
27 changes: 27 additions & 0 deletions src/bingx-socket/bingx-market-socket-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ import {
} from 'rxjs';
import { HeartbeatInterface } from 'bingx-api/bingx-socket/interfaces/heartbeat.interface';
import {
KlineEvent,
KlineInterval,
LatestTradeEvent,
MarkerSubscription,
MarketDepthEvent,
MarketDepthLevel,
MarketWebsocketEvents,
SubscriptionType,
TradingPair,
} from 'bingx-api/bingx-socket/events/market-websocket-events';

export class BingxMarketSocketStream {
Expand All @@ -28,6 +33,8 @@ export class BingxMarketSocketStream {
public readonly onDisconnect$ = new Subject<CloseEvent>();
public readonly heartbeat$ = new ReplaySubject<HeartbeatInterface>(1);
public readonly latestTradeDetail$ = new Subject<LatestTradeEvent>();
public readonly marketDepth$ = new Subject<MarketDepthEvent>();
public readonly kline$ = new Subject<KlineEvent>();

constructor(
url: URL = new URL('/swap-market', 'wss://open-api-swap.bingx.com'),
Expand All @@ -53,6 +60,15 @@ export class BingxMarketSocketStream {
event.dataType.includes('trade'),
this.latestTradeDetail$,
),
filterAndEmitToSubject(
(event): event is MarketDepthEvent =>
event.dataType.includes('@depth'),
this.marketDepth$,
),
filterAndEmitToSubject(
(event): event is KlineEvent => event.dataType.includes('@kline_'),
this.kline$,
),
)
.subscribe();

Expand Down Expand Up @@ -92,4 +108,15 @@ export class BingxMarketSocketStream {
public subscribe(dataType: SubscriptionType) {
this.dataTypes$.next(dataType);
}

public subscribeMarketDepth(
symbol: TradingPair,
level: MarketDepthLevel = 20,
) {
this.subscribe(`${symbol}@depth${level}`);
}

public subscribeKline(symbol: TradingPair, interval: KlineInterval) {
this.subscribe(`${symbol}@kline_${interval}`);
}
}
126 changes: 126 additions & 0 deletions src/bingx-socket/bingx-market-socket-subscriptions.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { Server, WebSocket } from 'ws';
import { getPortFree } from 'bingx-api/get-port';
import * as zlib from 'zlib';
import { BingxMarketSocketStream } from 'bingx-api/bingx-socket/bingx-market-socket-stream';
import { KlineEvent, MarketDepthEvent } from 'bingx-api/bingx-socket/events';

describe('bingx market socket subscriptions', () => {
let wss: Server;
let port: number;
let stream: BingxMarketSocketStream | undefined;
const sockets: WebSocket[] = [];

const sendToSocket = (socket: WebSocket, msg: string) => {
zlib.gzip(msg, (err, result) => {
socket.send(result);
});
};

beforeEach(async () => {
port = await getPortFree();
wss = new Server({ port });
await new Promise<void>((resolve) => {
wss.on('listening', resolve);
wss.on('connection', (ws) => {
sockets[0] = ws;
ws.on('close', () => {
sockets.splice(0, 1);
});
});
});
});

afterEach((done) => {
stream?.disconnect();
wss.close(() => {
sockets.splice(0, sockets.length);
done();
});
});

it('subscribes to market depth and emits depth events', (done) => {
wss.once('connection', (socket) => {
socket.once('message', (message) => {
expect(JSON.parse(message.toString())).toStrictEqual({
id: 'listen-for-BTC-USDT@depth5',
reqType: 'sub',
dataType: 'BTC-USDT@depth5',
});

sendToSocket(
socket,
JSON.stringify({
code: 0,
dataType: 'BTC-USDT@depth5',
data: {
asks: [{ p: 30100.1, v: 2.5 }],
bids: [{ p: 30099.9, v: 1.25 }],
},
} as MarketDepthEvent),
);
});
});

stream = new BingxMarketSocketStream(new URL('', `ws://0.0.0.0:${port}`));
stream.marketDepth$.subscribe((event) => {
expect(event).toStrictEqual({
code: '0',
dataType: 'BTC-USDT@depth5',
data: {
asks: [{ p: '30100.1', v: '2.5' }],
bids: [{ p: '30099.9', v: '1.25' }],
},
});
done();
});

stream.subscribeMarketDepth('BTC-USDT', 5);
});

it('subscribes to kline and emits kline events', (done) => {
wss.once('connection', (socket) => {
socket.once('message', (message) => {
expect(JSON.parse(message.toString())).toStrictEqual({
id: 'listen-for-BTC-USDT@kline_1m',
reqType: 'sub',
dataType: 'BTC-USDT@kline_1m',
});

sendToSocket(
socket,
JSON.stringify({
code: 0,
dataType: 'BTC-USDT@kline_1m',
data: {
c: 30105,
h: 30120,
l: 30080,
o: 30090,
v: 42.5,
s: 'BTC-USDT',
},
} as KlineEvent),
);
});
});

stream = new BingxMarketSocketStream(new URL('', `ws://0.0.0.0:${port}`));
stream.kline$.subscribe((event) => {
expect(event).toStrictEqual({
code: '0',
dataType: 'BTC-USDT@kline_1m',
data: {
c: '30105',
h: '30120',
l: '30080',
o: '30090',
v: '42.5',
s: 'BTC-USDT',
},
});
done();
});

stream.subscribeKline('BTC-USDT', '1m');
});
});
9 changes: 9 additions & 0 deletions src/bingx-socket/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ export {
SubscriptionType,
LatestTradeEvent,
TradeDataType,
MarketDepthDataType,
MarketDepthEvent,
MarketDepthData,
MarketDepthEntry,
MarketDepthLevel,
KlineDataType,
KlineEvent,
KlineData,
KlineInterval,
MarketWebsocketEvents,
Price,
Volume,
Expand Down
59 changes: 56 additions & 3 deletions src/bingx-socket/events/market-websocket-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,34 @@ export enum MarkerWebsocketEventCode {
export type TransactionTimeInMillis = number;
export type TradingPair = `${string}-${string}`;
export type IsMarketMaker = boolean;
export type Price = number;
export type Volume = number;
export type Price = string | number;
export type Volume = string | number;

export type TradeDataType = `${TradingPair}@trade`;
export type MarketDepthLevel = 5 | 10 | 20 | 50 | 100;
export type MarketDepthDataType = `${TradingPair}@depth${MarketDepthLevel}`;
export type KlineInterval =
| '1m'
| '3m'
| '5m'
| '15m'
| '30m'
| '1h'
| '2h'
| '4h'
| '6h'
| '8h'
| '12h'
| '1d'
| '3d'
| '1w'
| '1M';
export type KlineDataType = `${TradingPair}@kline_${KlineInterval}`;

export type SubscriptionType = TradeDataType;
export type SubscriptionType =
| TradeDataType
| MarketDepthDataType
| KlineDataType;

export interface MarketWebsocketEvents {
code: MarkerWebsocketEventCode;
Expand All @@ -37,3 +59,34 @@ export interface LatestTradeEvent extends MarketWebsocketEvents {
dataType: TradeDataType;
data: TradeDetail[];
}

export interface MarketDepthEntry {
p: Price;
v: Volume;
}

export interface MarketDepthData {
asks: MarketDepthEntry[];
bids: MarketDepthEntry[];
}

export interface MarketDepthEvent extends MarketWebsocketEvents {
code: MarkerWebsocketEventCode;
dataType: MarketDepthDataType;
data: MarketDepthData;
}

export interface KlineData {
c: Price;
h: Price;
l: Price;
o: Price;
v: Volume;
s: TradingPair;
}

export interface KlineEvent extends MarketWebsocketEvents {
code: MarkerWebsocketEventCode;
dataType: KlineDataType;
data: KlineData;
}