Skip to content

Commit e2ca93e

Browse files
authored
🦙 feat(data): alpaca 🦙 (#241)
* add alpaca get_ohlc fx * add test * update hist script * update script and workflow #minor * use tokens in run * fix feed * try parenthesis * provider dir * test fixed
1 parent d94c6f6 commit e2ca93e

File tree

7 files changed

+212
-61
lines changed

7 files changed

+212
-61
lines changed

‎.github/workflows/ohlc.yml‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,6 @@ jobs:
4444
AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
4545
S3_BUCKET: ${{ secrets.S3_BUCKET }}
4646
POLYGON: ${{ secrets.POLYGON }}
47+
ALPACA: ${{ secrets.ALPACA }}
48+
ALPACA_SECRET: ${{ secrets.ALPACA_SECRET }}
4749
run: python scripts/update_ohlc.py

‎.github/workflows/sandbox.yml‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ env:
2727
PREF_EXCHANGE: ${{ secrets.PREF_EXCHANGE }}
2828
TEST: true
2929
GLASSNODE_PASS: ${{ secrets.GLASSNODE_PASS }}
30+
ALPACA_PAPER: ${{ secrets.ALPACA_PAPER }}
31+
ALPACA_PAPER_SECRET: ${{ secrets.ALPACA_PAPER_SECRET}}
3032

3133
jobs:
3234
build:

‎hyperdrive/Constants.py‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ def get_env_bool(var_name):
3434
IDX_DIR = 'indices'
3535
# providers
3636
POLY_DIR = 'polygon'
37+
ALPACA_DIR = 'alpaca'
3738
# models
3839
MODELS_DIR = 'models'
3940

4041
folders = {
4142
'polygon': POLY_DIR,
43+
'alpaca': ALPACA_DIR
4244
}
4345

4446
# Column Names
@@ -109,6 +111,8 @@ def get_env_bool(var_name):
109111
'X%3ALTCUSD', 'X%3AXMRUSD', 'X%3AIOTUSD'
110112
]
111113

114+
ALPC_CRYPTO_SYMBOLS = ['BTC/USD', 'ETH/USD', 'LTC/USD']
115+
112116
SENTIMENT_SYMBOLS_IGNORE = {
113117
'SPYD', 'VWDRY', 'BPMP',
114118
'FOX', 'YYY', 'SDIV',
@@ -124,6 +128,8 @@ def get_env_bool(var_name):
124128
FEW_DAYS = str(FEW) + 'd'
125129
SCRIPT_FAILURE_THRESHOLD = 0.95
126130

131+
ALPACA_FREE_DELAY = 0.5
132+
127133
# Exchanges
128134
BINANCE = 'BINANCE'
129135
KRAKEN = 'KRAKEN'

‎hyperdrive/DataSource.py‎

Lines changed: 110 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,16 @@ def save_ndx(self, **kwargs):
434434
if os.path.exists(filename):
435435
return filename
436436

437+
def log_api_call_time(self):
438+
self.last_api_call_time = time()
439+
440+
def obey_free_limit(self, free_delay):
441+
if self.free and hasattr(self, 'last_api_call_time'):
442+
time_since_last_call = time() - self.last_api_call_time
443+
delay = free_delay - time_since_last_call
444+
if delay > 0:
445+
sleep(delay)
446+
437447

438448
class Indices(MarketData):
439449
def __init__(self):
@@ -448,6 +458,102 @@ def get_ndx(self, date=datetime.now()):
448458
return self.standardize_ndx(df)
449459

450460

461+
class Alpaca(MarketData):
462+
# AlpacaData
463+
def __init__(
464+
self,
465+
token=os.environ.get('ALPACA'),
466+
secret=os.environ.get('ALPACA_SECRET'),
467+
free=True,
468+
paper=False
469+
):
470+
super().__init__()
471+
self.base = 'https://data.alpaca.markets'
472+
self.token = os.environ.get(
473+
'ALPACA_PAPER') if paper or C.TEST else token
474+
self.secret = os.environ.get(
475+
'ALPACA_PAPER_SECRET') if paper or C.TEST else secret
476+
if not (self.token and self.secret):
477+
raise Exception('missing Alpaca credentials')
478+
self.provider = 'alpaca'
479+
self.free = free
480+
481+
# def get_dividends(self, **kwargs):
482+
# pass
483+
# def get_splits(self, **kwargs):
484+
# pass
485+
486+
def get_ohlc(self, **kwargs):
487+
def _get_ohlc(symbol, timeframe='max'):
488+
is_crypto = symbol in C.ALPC_CRYPTO_SYMBOLS
489+
version = 'v1beta3' if is_crypto else 'v2'
490+
page_token = None
491+
start, _ = self.traveller.convert_dates(timeframe)
492+
parts = [
493+
self.base,
494+
version,
495+
'crypto/us' if is_crypto else 'stocks',
496+
'bars',
497+
]
498+
url = '/'.join(parts)
499+
pre_params = {
500+
'symbols': symbol,
501+
'timeframe': '1D',
502+
'start': start,
503+
'limit': 10000,
504+
} | ({} if is_crypto else {'adjustment': 'all', 'feed': 'iex'})
505+
headers = {
506+
'APCA-API-KEY-ID': self.token,
507+
'APCA-API-SECRET-KEY': self.secret
508+
}
509+
results = []
510+
while True:
511+
self.obey_free_limit(C.ALPACA_FREE_DELAY)
512+
try:
513+
post_params = {
514+
'page_token': page_token} if page_token else {}
515+
params = pre_params | post_params
516+
response = requests.get(url, params, headers=headers)
517+
if not response.ok:
518+
raise Exception(
519+
'Invalid response from Alpaca for OHLC',
520+
response.status_code,
521+
response.text
522+
)
523+
data = response.json()
524+
if data.get('bars') and data['bars'].get(symbol):
525+
results += data['bars'][symbol]
526+
finally:
527+
self.log_api_call_time()
528+
if data.get('next_page_token'):
529+
page_token = data['next_page_token']
530+
else:
531+
break
532+
df = pd.DataFrame(results)
533+
columns = {
534+
't': 'date',
535+
'o': 'open',
536+
'h': 'high',
537+
'l': 'low',
538+
'c': 'close',
539+
'v': 'volume',
540+
'vw': 'average',
541+
'n': 'trades'
542+
}
543+
df = df.rename(columns=columns)
544+
df['date'] = pd.to_datetime(df['date']).dt.tz_convert(
545+
C.TZ).dt.tz_localize(None)
546+
df = self.standardize_ohlc(symbol, df)
547+
return self.reader.data_in_timeframe(df, C.TIME, timeframe)
548+
return self.try_again(func=_get_ohlc, **kwargs)
549+
550+
# def get_intraday(self, **kwargs):
551+
# pass
552+
553+
# def get_news(self, **kwargs):
554+
# pass
555+
556+
451557
class Polygon(MarketData):
452558
def __init__(self, token=os.environ.get('POLYGON'), free=True):
453559
super().__init__()
@@ -465,19 +571,9 @@ def paginate(self, gen, apply):
465571
results.append(apply(item))
466572
return results
467573

468-
def obey_free_limit(self):
469-
if self.free and hasattr(self, 'last_api_call_time'):
470-
time_since_last_call = time() - self.last_api_call_time
471-
delay = C.POLY_FREE_DELAY - time_since_last_call
472-
if delay > 0:
473-
sleep(delay)
474-
475-
def log_api_call_time(self):
476-
self.last_api_call_time = time()
477-
478574
def get_dividends(self, **kwargs):
479575
def _get_dividends(symbol, timeframe='max'):
480-
self.obey_free_limit()
576+
self.obey_free_limit(C.POLY_FREE_DELAY)
481577
try:
482578
start, _ = self.traveller.convert_dates(timeframe)
483579
response = self.paginate(
@@ -495,8 +591,6 @@ def _get_dividends(symbol, timeframe='max'):
495591
'amount': div.cash_amount
496592
}
497593
)
498-
except Exception as e:
499-
raise e
500594
finally:
501595
self.log_api_call_time()
502596
raw = pd.DataFrame(response)
@@ -506,7 +600,7 @@ def _get_dividends(symbol, timeframe='max'):
506600

507601
def get_splits(self, **kwargs):
508602
def _get_splits(symbol, timeframe='max'):
509-
self.obey_free_limit()
603+
self.obey_free_limit(C.POLY_FREE_DELAY)
510604
try:
511605
start, _ = self.traveller.convert_dates(timeframe)
512606
response = self.paginate(
@@ -522,8 +616,6 @@ def _get_splits(symbol, timeframe='max'):
522616
'ratio': split.split_from / split.split_to
523617
}
524618
)
525-
except Exception as e:
526-
raise e
527619
finally:
528620
self.log_api_call_time()
529621
raw = pd.DataFrame(response)
@@ -536,14 +628,12 @@ def _get_ohlc(symbol, timeframe='max'):
536628
is_crypto = symbol.find('X%3A') == 0
537629
formatted_start, formatted_end = self.traveller.convert_dates(
538630
timeframe)
539-
self.obey_free_limit()
631+
self.obey_free_limit(C.POLY_FREE_DELAY)
540632
try:
541633
response = self.client.get_aggs(
542634
symbol, 1, 'day',
543635
from_=formatted_start, to=formatted_end, adjusted=True, limit=C.POLY_MAX_AGGS_LIMIT
544636
)
545-
except Exception as e:
546-
raise e
547637
finally:
548638
self.log_api_call_time()
549639

@@ -573,7 +663,7 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
573663
raise Exception(f'No dates in timeframe: {timeframe}.')
574664

575665
for _, date in enumerate(dates):
576-
self.obey_free_limit()
666+
self.obey_free_limit(C.POLY_FREE_DELAY)
577667
try:
578668
response = self.client.get_aggs(
579669
symbol, min, 'minute', from_=date, to=date,
@@ -582,8 +672,6 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True):
582672
except exceptions.NoResultsError:
583673
# This is to prevent breaking the loop over weekends
584674
continue
585-
except Exception as e:
586-
raise e
587675
finally:
588676
self.log_api_call_time()
589677

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
import os
2-
import sys
32
from multiprocessing import Process
4-
sys.path.append('hyperdrive')
5-
from DataSource import Polygon # noqa autopep8
6-
from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8
7-
3+
from hyperdrive.DataSource import Polygon, Alpaca
4+
from hyperdrive.Constants import PathFinder
5+
import hyperdrive.Constants as C
86

7+
alpc = Alpaca(paper=C.TEST)
98
poly = Polygon(os.environ['POLYGON'])
109
stock_symbols = poly.get_symbols()
11-
crypto_symbols = POLY_CRYPTO_SYMBOLS
12-
all_symbols = stock_symbols + crypto_symbols
10+
poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS
11+
alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols)
1312
timeframe = '2m'
1413

14+
# Double redundancy
15+
# 1st pass
16+
1517

1618
def update_poly_ohlc():
17-
for symbol in all_symbols:
19+
for symbol in poly_symbols:
1820
filename = PathFinder().get_ohlc_path(
1921
symbol=symbol, provider=poly.provider)
2022
try:
@@ -23,10 +25,29 @@ def update_poly_ohlc():
2325
print(f'Polygon.io OHLC update failed for {symbol}.')
2426
print(e)
2527
finally:
26-
if CI and os.path.exists(filename):
28+
if C.CI and os.path.exists(filename):
29+
os.remove(filename)
30+
31+
# 2nd pass
32+
33+
34+
def update_alpc_ohlc():
35+
for symbol in alpc_symbols:
36+
filename = PathFinder().get_ohlc_path(
37+
symbol=symbol, provider=alpc.provider)
38+
try:
39+
alpc.save_ohlc(symbol=symbol, timeframe=timeframe)
40+
except Exception as e:
41+
print(f'Alpaca OHLC update failed for {symbol}.')
42+
print(e)
43+
finally:
44+
if C.CI and os.path.exists(filename):
2745
os.remove(filename)
2846

2947

3048
p1 = Process(target=update_poly_ohlc)
49+
p2 = Process(target=update_alpc_ohlc)
3150
p1.start()
51+
p2.start()
3252
p1.join()
53+
p2.join()

‎scripts/update_ohlc.py‎

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
import os
2-
import sys
32
from multiprocessing import Process, Value
4-
sys.path.append('hyperdrive')
5-
from DataSource import Polygon # noqa autopep8
6-
from Constants import PathFinder, POLY_CRYPTO_SYMBOLS, FEW_DAYS # noqa autopep8
7-
import Constants as C # noqa autopep8
3+
from hyperdrive.DataSource import Polygon, Alpaca
4+
from hyperdrive.Constants import PathFinder
5+
import hyperdrive.Constants as C
86

97
counter = Value('i', 0)
8+
alpc = Alpaca(paper=C.TEST)
109
poly = Polygon(os.environ['POLYGON'])
1110
stock_symbols = poly.get_symbols()
12-
crypto_symbols = POLY_CRYPTO_SYMBOLS
13-
all_symbols = stock_symbols + crypto_symbols
11+
poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS
12+
alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols)
1413

1514

1615
def update_poly_ohlc():
17-
for symbol in all_symbols:
16+
for symbol in poly_symbols:
1817
try:
1918
filename = poly.save_ohlc(
20-
symbol=symbol, timeframe=FEW_DAYS, retries=1)
19+
symbol=symbol, timeframe=C.FEW_DAYS, retries=1)
2120
with counter.get_lock():
2221
counter.value += 1
2322
except Exception as e:
@@ -30,9 +29,29 @@ def update_poly_ohlc():
3029
os.remove(filename)
3130

3231

32+
def update_alpc_ohlc():
33+
for symbol in alpc_symbols:
34+
try:
35+
filename = alpc.save_ohlc(
36+
symbol=symbol, timeframe=C.FEW_DAYS, retries=1)
37+
with counter.get_lock():
38+
counter.value += 1
39+
except Exception as e:
40+
print(f'Alpaca OHLC update failed for {symbol}.')
41+
print(e)
42+
finally:
43+
filename = PathFinder().get_ohlc_path(
44+
symbol=symbol, provider=alpc.provider)
45+
if C.CI and os.path.exists(filename):
46+
os.remove(filename)
47+
48+
3349
p1 = Process(target=update_poly_ohlc)
50+
p2 = Process(target=update_alpc_ohlc)
3451
p1.start()
52+
p2.start()
3553
p1.join()
54+
p2.join()
3655

37-
if counter.value / len(all_symbols) < 0.95:
56+
if counter.value / (len(poly_symbols) + len(alpc_symbols)) < 0.95:
3857
exit(1)

0 commit comments

Comments
 (0)