Skip to content

Commit

Permalink
[OKEX] Add websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
f0cii committed Apr 13, 2020
1 parent 5d115e4 commit b803007
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 12 deletions.
24 changes: 24 additions & 0 deletions brokers/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,30 @@ func NewWS(brokerName string, accessKey string, secret string, testnet bool, par
wsURL = v
}
return hbdm_swap.NewWS(wsURL, accessKey, secret)
case OKEXFutures:
wsURL := "wss://real.okex.com:8443/ws/v3"
if v, ok := params["wsURL"]; ok {
wsURL = v
}
var passphrase string
if v, ok := params["passphrase"]; ok {
passphrase = v
} else {
log.Fatalf("passphrase missing")
}
return okex_futures.NewWS(wsURL, accessKey, secret, passphrase)
case OKEXSwap:
wsURL := "wss://real.okex.com:8443/ws/v3"
if v, ok := params["wsURL"]; ok {
wsURL = v
}
var passphrase string
if v, ok := params["passphrase"]; ok {
passphrase = v
} else {
log.Fatalf("passphrase missing")
}
return okex_swap.NewWS(wsURL, accessKey, secret, passphrase)
default:
panic(fmt.Sprintf("broker error [%v]", brokerName))
}
Expand Down
6 changes: 4 additions & 2 deletions brokers/hbdm-swap/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *WS) tradeCallback(trade *hbdmswap.WSTrade) {
direction = Sell
}
t := Trade{
ID: v.ID,
ID: fmt.Sprint(v.ID),
Direction: direction,
Price: v.Price,
Amount: float64(v.Amount),
Expand Down Expand Up @@ -144,6 +144,7 @@ func (s *WS) ordersCallback(order *hbdmswap.WSOrder) {

func (s *WS) positionsCallback(positions *hbdmswap.WSPositions) {
//log.Printf("positionsCallback")
var eventData []Position
for _, v := range positions.Data {
var o Position
o.Symbol = v.Symbol
Expand All @@ -156,8 +157,9 @@ func (s *WS) positionsCallback(positions *hbdmswap.WSPositions) {
o.Size = -v.Volume
}
o.AvgPrice = v.CostHold
s.emitter.Emit(WSEventPosition, &o)
eventData = append(eventData, o)
}
s.emitter.Emit(WSEventPosition, eventData)
}

func NewWS(wsURL string, accessKey string, secretKey string) *WS {
Expand Down
6 changes: 4 additions & 2 deletions brokers/hbdm/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *WS) tradeCallback(trade *hbdm.WSTrade) {
direction = Sell
}
t := Trade{
ID: v.ID,
ID: fmt.Sprint(v.ID),
Direction: direction,
Price: v.Price,
Amount: float64(v.Amount),
Expand Down Expand Up @@ -160,6 +160,7 @@ func (s *WS) ordersCallback(order *hbdm.WSOrder) {

func (s *WS) positionsCallback(positions *hbdm.WSPositions) {
//log.Printf("positionsCallback")
var eventData []Position
for _, v := range positions.Data {
var o Position
o.Symbol = v.Symbol
Expand All @@ -172,8 +173,9 @@ func (s *WS) positionsCallback(positions *hbdm.WSPositions) {
o.Size = -v.Volume
}
o.AvgPrice = v.CostHold
s.emitter.Emit(WSEventPosition, &o)
eventData = append(eventData, o)
}
s.emitter.Emit(WSEventPosition, eventData)
}

func NewWS(wsURL string, accessKey string, secretKey string) *WS {
Expand Down
201 changes: 201 additions & 0 deletions brokers/okex-futures/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package okex_futures

import (
"github.com/chuckpreslar/emission"
. "github.com/coinrust/crex"
"github.com/coinrust/crex/util"
"github.com/frankrap/okex-api"
"time"
)

type WS struct {
ws *okex.FuturesWS
emitter *emission.Emitter
}

func (s *WS) On(event WSEvent, listener interface{}) {
s.emitter.On(event, listener)
}

func (s *WS) SubscribeTrades(market Market) {
s.ws.SubscribeTrade("trade_1", market.ID)
}

func (s *WS) SubscribeLevel2Snapshots(market Market) {
s.ws.SubscribeDepthL2Tbt("depthL2_1", market.ID)
}

func (s *WS) SubscribeOrders(market Market) {
s.ws.SubscribeOrder("order_1", market.ID)
}

func (s *WS) SubscribePositions(market Market) {
s.ws.SubscribePosition("position_1", market.ID)
}

func (s *WS) depth20SnapshotCallback(obRaw *okex.OrderBook) {
// log.Printf("depthCallback %#v", *depth)
// ch: market.BTC_CQ.depth.step0
ob := &OrderBook{
Symbol: obRaw.InstrumentID,
Time: time.Now(),
Asks: nil,
Bids: nil,
}
for _, v := range obRaw.Asks {
ob.Asks = append(ob.Asks, Item{
Price: v.Price,
Amount: v.Amount,
})
}
for _, v := range obRaw.Bids {
ob.Bids = append(ob.Bids, Item{
Price: v.Price,
Amount: v.Amount,
})
}
s.emitter.Emit(WSEventL2Snapshot, ob)
}

func (s *WS) tradeCallback(_trades []okex.WSTrade) {
// log.Printf("tradeCallback")
var result []Trade
for _, v := range _trades {
var direction Direction
if v.Side == "buy" {
direction = Buy
} else if v.Side == "sell" {
direction = Sell
}
t := Trade{
ID: v.TradeID,
Direction: direction,
Price: util.ParseFloat64(v.Price),
Amount: util.ParseFloat64(v.Side),
Ts: v.Timestamp.UnixNano() / 1e6,
Symbol: v.InstrumentID,
}
result = append(result, t)
}
s.emitter.Emit(WSEventTrade, result)
}

func (s *WS) ordersCallback(orders []okex.WSOrder) {
//log.Printf("ordersCallback")
for _, v := range orders {
o := s.convertOrder(&v)
s.emitter.Emit(WSEventOrder, o)
}
}

func (s *WS) convertOrder(order *okex.WSOrder) *Order {
o := &Order{}
o.ID = order.OrderID
o.Symbol = order.InstrumentID
o.Price = util.ParseFloat64(order.Price)
o.AvgPrice = util.ParseFloat64(order.PriceAvg)
// o.StopPx = 0
o.Size = util.ParseFloat64(order.Size)
o.FilledAmount = util.ParseFloat64(order.FilledQty)
switch order.Type {
case "1":
o.Direction = Buy
case "2":
o.Direction = Sell
case "3":
o.Direction = Sell
o.ReduceOnly = true
case "4":
o.Direction = Buy
o.ReduceOnly = true
}
/*
0:普通委托
1:只做Maker(Post only)
2:全部成交或立即取消(FOK)
3:立即成交并取消剩余(IOC)
4:市价委托
*/
switch order.OrderType {
case "0":
o.Type = OrderTypeLimit
case "1":
o.Type = OrderTypeMarket
o.PostOnly = true
case "2":
o.Type = OrderTypeLimit
case "3":
o.Type = OrderTypeLimit
case "4":
o.Type = OrderTypeMarket
default:
o.Type = OrderTypeLimit
}
/*
-2:失败
-1:撤单成功
0:等待成交
1:部分成交
2:完全成交
3:下单中
4:撤单中
*/
switch order.State {
case "-2":
o.Status = OrderStatusRejected
case "-1":
o.Status = OrderStatusCancelled
case "0":
o.Status = OrderStatusNew
case "1":
o.Status = OrderStatusPartiallyFilled
case "2":
o.Status = OrderStatusFilled
case "3":
o.Status = OrderStatusCreated
case "4":
o.Status = OrderStatusCancelPending
}
return o
}

func (s *WS) positionsCallback(positions []okex.WSFuturesPosition) {
//log.Printf("positionsCallback")
var eventData []Position
for _, v := range positions {
longQty := util.ParseFloat64(v.LongQty)
shortQty := util.ParseFloat64(v.ShortQty)
if longQty > 0 {
var o Position
o.Symbol = v.InstrumentID
o.OpenTime = v.Timestamp
o.Size = longQty
o.OpenPrice = util.ParseFloat64(v.LongAvgCost)
o.AvgPrice = o.OpenPrice
eventData = append(eventData, o)
} else if shortQty > 0 {
var o Position
o.Symbol = v.InstrumentID
o.OpenTime = v.Timestamp
o.Size = -shortQty
o.OpenPrice = util.ParseFloat64(v.ShortAvgCost)
o.AvgPrice = o.OpenPrice
eventData = append(eventData, o)
}
}
s.emitter.Emit(WSEventPosition, eventData)
}

func NewWS(wsURL string, accessKey string, secretKey string, passphrase string) *WS {
s := &WS{
emitter: emission.NewEmitter(),
}
ws := okex.NewFuturesWS(wsURL, accessKey, secretKey, passphrase)
ws.SetDepth20SnapshotCallback(s.depth20SnapshotCallback)
ws.SetTradeCallback(s.tradeCallback)
ws.SetOrderCallback(s.ordersCallback)
ws.SetPositionCallback(s.positionsCallback)
ws.Start()
s.ws = ws
return s
}
62 changes: 62 additions & 0 deletions brokers/okex-futures/ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package okex_futures

import (
. "github.com/coinrust/crex"
"github.com/spf13/viper"
"log"
"testing"
)

func newTestWS() *WS {
viper.SetConfigName("test_config")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
log.Panic(err)
}

accessKey := viper.GetString("access_key")
secretKey := viper.GetString("secret_key")
passphrase := viper.GetString("passphrase")
wsURL := "wss://real.okex.com:8443/ws/v3"
ws := NewWS(wsURL,
accessKey, secretKey, passphrase)
return ws
}

func TestWS_AllInOne(t *testing.T) {
ws := newTestWS()

ws.On(WSEventL2Snapshot, func(ob *OrderBook) {
log.Printf("ob: %#v", ob)
})
ws.On(WSEventTrade, func(trades []Trade) {
log.Printf("trades: %#v", trades)
})

ws.SubscribeLevel2Snapshots(Market{
ID: "BTC-USD-200626",
Params: "",
})
ws.SubscribeTrades(Market{
ID: "BTC-USD-200626",
Params: "",
})

select {}
}

func TestWS_SubscribeOrders(t *testing.T) {
ws := newTestWS()

ws.On(WSEventOrder, func(order *Order) {
log.Printf("order: %#v", order)
})

ws.SubscribeOrders(Market{
ID: "BTC-USD-200626",
Params: "",
})

select {}
}
Loading

0 comments on commit b803007

Please sign in to comment.