-
Notifications
You must be signed in to change notification settings - Fork 7
/
ott.py
102 lines (83 loc) · 3.29 KB
/
ott.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from collections import namedtuple
import numpy as np
import numba
import talib
from jesse.helpers import get_candle_source, slice_candles
import jessetk.indicators as cta
OTT = namedtuple('OTT', ['ott', 'mavg', 'long_stop', 'short_stop'])
def ott(candles: np.ndarray, length: int = 2, percent: float = 1.4, ma_type="kama", source_type="close",
sequential=False) -> OTT:
# created by: @Anil_Ozeksi
# pinescript developer: @kivancozbilgic
# https://www.tradingview.com/script/zVhoDQME/
#
# python port: github.com/ysdede
"""
:param candles: np.ndarray
:param length: int - default: 2
:param percent: int - default: 1.4
:param ma_type: str - default: var
:param source_type: str - default: close
:param sequential: bool - default: False
:return: Union[float, np.ndarray]
"""
if length < 1 or percent <= 0:
raise ValueError('Bad parameters.')
# Accept normal array too.
if len(candles.shape) == 1:
source = candles
else:
candles = slice_candles(candles, sequential)
source = get_candle_source(candles, source_type=source_type)
if ma_type == 'kamaf':
MAvg = cta.kamaf(source, length, sequential=True)
elif ma_type == 'kama':
MAvg = talib.KAMA(source, length)
elif ma_type == 'wma':
MAvg = talib.WMA(source, length)
else:
MAvg = cta.var(source, length, sequential=True)
ott_series, longStop, shortStop = ott_fast(MAvg, percent, length)
if sequential:
return OTT(ott_series, MAvg, longStop, shortStop)
else:
return OTT(ott_series[-1], MAvg[-1], longStop[-1], shortStop[-1])
@numba.njit
def ott_fast(MAvg, percent, length):
fark = np.multiply(np.multiply(MAvg, percent), 0.01)
# >>>>>>>
longStop = np.subtract(MAvg, fark)
longStopPrev = np.copy(longStop)
for i in range(length, longStop.size):
if MAvg[i] > longStop[i - 1]:
longStop[i] = max(longStop[i], longStop[i - 1])
longStopPrev[i] = longStop[i - 1]
for i in range(length, longStop.size):
if MAvg[i] > longStopPrev[i]:
longStop[i] = max(longStop[i], longStopPrev[i])
longStopPrev[i] = longStop[i - 1]
# <<<<<>>>>>>>
shortStop = np.add(MAvg, fark)
shortStopPrev = np.copy(shortStop)
for i in range(length, shortStop.size):
if MAvg[i] < shortStop[i - 1]:
shortStop[i] = min(shortStop[i], shortStop[i - 1])
shortStopPrev[i] = shortStop[i - 1]
for i in range(length, shortStop.size):
if MAvg[i] < shortStopPrev[i]:
shortStop[i] = min(shortStop[i], shortStopPrev[i])
shortStopPrev[i] = shortStop[i - 1]
# <<<<<>>>>>>>
dir = np.full_like(longStop, 1)
temp_dir = dir[length - 1]
for i in range(length, dir.size):
if temp_dir < 0 and MAvg[i] > shortStopPrev[i]:
temp_dir = dir[i] = 1
elif temp_dir < 0 and MAvg[i] < shortStopPrev[i]:
temp_dir = dir[i] = -1
elif temp_dir > 0 and MAvg[i] < longStopPrev[i]:
temp_dir = dir[i] = -1
# <<<<<>>>>>>>
MT = np.where(dir > 0, longStop, shortStop)
OTT = np.where(MAvg > MT, MT * (200 + percent) / 200, MT * (200 - percent) / 200)
return np.concatenate((np.full(2, 0), OTT[:-2])), longStop, shortStop