Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix datetime index on resample #333

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion alpaca_backtrader_api/alpacadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
unicode_literals)

from datetime import timedelta
import logging
import pandas as pd
from backtrader.feed import DataBase
from backtrader import date2num, num2date
Expand Down Expand Up @@ -159,6 +160,7 @@ def __init__(self, **kwargs):
self._candleFormat = 'bidask' if self.p.bidask else 'midpoint'
self._timeframe = self.p.timeframe
self.do_qcheck(True, 0)
self.logger = logging.getLogger(self.__class__.__name__)
if self._timeframe not in [bt.TimeFrame.Ticks,
bt.TimeFrame.Minutes,
bt.TimeFrame.Days]:
Expand All @@ -179,6 +181,7 @@ def start(self):
contractdetails if it exists
"""
super(AlpacaData, self).start()
self.logger.info("Starting data feed: %s" % self.p.dataname)

# Create attributes as soon as possible
self._statelivereconn = False # if reconnecting in live state
Expand Down Expand Up @@ -268,9 +271,11 @@ def _load(self):
if self._state == self._ST_LIVE:
try:
msg = (self._storedmsg.pop(None, None) or
self.qlive.get(timeout=self._qcheck))
self.qlive.get(timeout=self.p.qcheck))
except queue.Empty:
return None # indicate timeout situation

self.logger.debug("Got msg: %s" % msg)
if msg is None: # Conn broken during historical/backfilling
self.put_notification(self.CONNBROKEN)
# Try to reconnect
Expand Down
23 changes: 20 additions & 3 deletions alpaca_backtrader_api/alpacastore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import logging
import os
import collections
import time
Expand Down Expand Up @@ -125,6 +126,7 @@ def __init__(
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())

self.logger = logging.getLogger(self.__class__.__name__)
self.conn = Stream(api_key,
api_secret,
base_url,
Expand All @@ -135,6 +137,7 @@ def __init__(
self.q = q

def run(self):
self.logger.info("Starting streamer for: %s %s" % (self.instrument, self.method.name))
if self.method == StreamingMethod.AccountUpdate:
self.conn.subscribe_trade_updates(self.on_trade)
elif self.method == StreamingMethod.MinuteAgg:
Expand All @@ -152,16 +155,20 @@ async def on_listen(self, conn, stream, msg):

async def on_quotes(self, msg):
msg._raw['time'] = msg.timestamp
self.logger.debug("Got: %s" % msg)
self.q.put(msg._raw)

async def on_agg_min(self, msg):
msg._raw['time'] = msg.timestamp
self.logger.debug("Got: %s" % msg)
self.q.put(msg._raw)

async def on_account(self, msg):
self.logger.debug("Got: %s" % msg)
self.q.put(msg)

async def on_trade(self, msg):
self.logger.debug("Got: %s" % msg)
self.q.put(msg)


Expand Down Expand Up @@ -193,6 +200,9 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)):

- ``account_tmout`` (default: ``10.0``): refresh period for account
value/cash refresh

- ``order_tmout`` (default: ``0.05``): how often the order creation queue
is checked within _t_create_order
'''

BrokerCls = None # broker class will autoregister
Expand All @@ -203,6 +213,7 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)):
('secret_key', ''),
('paper', False),
('account_tmout', 10.0), # account balance refresh timeout
('order_tmout', 0.05),
('api_version', None)
)

Expand All @@ -224,6 +235,7 @@ def getbroker(cls, *args, **kwargs):

def __init__(self):
super(AlpacaStore, self).__init__()
self.logger = logging.getLogger(self.__class__.__name__)

self.notifs = collections.deque() # store notifications for cerebro

Expand Down Expand Up @@ -492,6 +504,7 @@ def get_aggs_from_alpaca(self,
but we need to manipulate it to be able to work with it
smoothly
"""
self.logger.debug(f"Getting aggs for: {dataname} from: {start} to {end} by {compression} {granularity}")

def _granularity_to_timeframe(granularity):
if granularity in [Granularity.Minute, Granularity.Ticks]:
Expand Down Expand Up @@ -554,6 +567,8 @@ def _drop_early_samples(df):
return df[i:]

def _resample(df):
if df.empty:
return df
"""
samples returned with certain window size (1 day, 1 minute) user
may want to work with different window size (5min)
Expand Down Expand Up @@ -581,10 +596,11 @@ def _resample(df):
timeframe = _granularity_to_timeframe(granularity)
start = end - timedelta(days=1)
response = self.oapi.get_bars(dataname,
timeframe, start, end)._raw
timeframe, start, end).df
else:
response = _iterate_api_calls()
cdl = response
self.logger.debug(f"Got: {response}")
if granularity == Granularity.Minute:
cdl = _clear_out_of_market_hours(cdl)
cdl = _drop_early_samples(cdl)
Expand Down Expand Up @@ -753,9 +769,10 @@ def _check_if_transaction_occurred(order_id):

while True:
try:
if self.q_ordercreate.empty():
try:
msg = self.q_ordercreate.get(timeout=self.p.order_tmout)
except queue.Empty:
continue
msg = self.q_ordercreate.get()
if msg is None:
continue
oref, okwargs = msg
Expand Down