지표계산을 위한 UDF
import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mpl_dates
x축형식 지정을 위한 UDF
def xaxis_form(n=20, format='%y/%m/%d', ax=False):
locator = mpl_dates.DayLocator(interval=n)
form = mpl_dates.DateFormatter(format)
if ax==False:
ax=plt.gca()
ax.xaxis.set_major_locator(locator)
ax.set_xticklabels([])
else:
ax=plt.gca()
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(form)
plt.xticks(rotation=45)
A
ADL
def calculate_adl(df, short_period=3, long_period=10):
high = df['High']
low = df['Low']
close = df['Close']
volume = df['Volume']
result=pd.DataFrame()
result["MFM"] = ((close - low) - (high - close)) / (high - low)
result["MFV"] = result["MFM"] * volume
result["ADL"] = result["MFV"].cumsum()
short_ema=result["ADL"].ewm(span=short_period, adjust=False).mean()
long_ema=result["ADL"].ewm(span=long_period, adjust=False).mean()
result["OSC"]=short_ema - long_ema
return result
#adl 그래프
def adl_plot(data , short_period=3, long_period=10):
adl=calculate_adl(data, short_period, long_period)
plt.plot(adl["ADL"], label='ADL', color='r')
plt.legend(loc="upper left")
plt.grid(True)
#adl oscillator 그래프
def adlOSC_plot(data , short_period=3, long_period=10):
adl=calculate_adl(data, short_period, long_period)
plt.plot(adl["OSC"], label='OSC', color='r')
plt.axhline(0, color="skyblue")
plt.legend(loc="upper left")
plt.grid(True)
ADX 계산
def calculate_adx(data, period=14):
high = data['High']
low = data['Low']
close = data['Close']
# True Range (TR) 계산
high_prev = high.shift(1)
low_prev = low.shift(1)
tr = pd.concat([high - low, abs(high - close.shift(1)), abs(low - close.shift(1))], axis=1).max(axis=1)
# Directional Movement (+DM, -DM) 계산
dm_plus = (high - high_prev).fillna(0)
dm_minus = (low_prev - low).fillna(0)
dm_plus[(dm_plus <= 0) | (dm_plus <= dm_minus)] = 0
dm_minus[(dm_minus <= 0) | (dm_minus <= dm_plus)] = 0
# Smooth TR, +DM, -DM
atr = tr.rolling(window=period).mean()
di_plus_smooth = dm_plus.rolling(window=period).mean()
di_minus_smooth = dm_minus.rolling(window=period).mean()
# Directional Index (+DI, -DI) 계산
di_plus = (di_plus_smooth / atr) * 100
di_minus = (di_minus_smooth / atr) * 100
# Directional Movement Index (DX) 계산
dx = (abs(di_plus - di_minus) / (di_plus + di_minus)) * 100
dx = dx.fillna(0) # 분모가 0인 경우 NaN 처리
# Average Directional Index (ADX) 계산
adx = dx.rolling(window=period).mean()
return pd.DataFrame({'ADX': adx, '+DI': di_plus, '-DI': di_minus})
#그래프 작성
def adx_plot(data, period=14):
adx=calculate_adx(data, period=period)
for i, j in zip(adx.columns, ["-", "--", "--"]):
plt.plot(adx[i], ls=j, label=i)
plt.axhline(25, color="gray")
plt.legend(loc="upper left")
plt.grid(True)
ATR 계산
def calculate_atr(df, n=14):
high_low = df['High'] - df['Low']
high_close = abs(df['High'] - df['Close'].shift())
low_close = abs(df['Low'] - df['Close'].shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
atr = true_range.rolling(window=n).mean()
return atr
awesome oscillator 계산
def calculate_ao(data, short=5, long=34):
high=data["High"]
low=data["Low"]
median_price=(high+low)/2
sma_short=median_price.rolling(window=short).mean()
sma_long=median_price.rolling(window=long).mean()
ao=sma_short - sma_long
return ao
def ao_color(data):
awesome_color=[]
#awesome_color.clear()
for i in range(len(data)):
if data.iloc[i]>=0 and data.iloc[i]>=data.iloc[i-1]:#양수, 직전보다 증가
awesome_color.append('orange')
elif data.iloc[i]>=0 and data.iloc[i]<data.iloc[i-1]: #양수, 직전보다 감소
awesome_color.append('skyblue')
elif data.iloc[i]<0 and data.iloc[i]<data.iloc[i-1]:#음수, 직전보다 감수
awesome_color.append('skyblue')
elif data.iloc[i]<0 and data.iloc[i]>=data.iloc[i-1]: #음수, 직전보다 증가
awesome_color.append('orange')
else:
awesome_color.append('k')
return awesome_color
B
Bolinger Band
볼린져 밴드 계산, 기본적으로 종가를 사용합니다.
def calculate_bollinger_bands(da, period=20, std_dev=2):
if type(da) == pd.core.series.Series:
data = da
else:
data = da["Close"]
result=pd.DataFrame()
# 중심선 (Moving Average) 계산
result["ma"] = data.rolling(window=period, min_periods=1).mean()
# 표준편차 계산
std = data.rolling(window=period, min_periods=1).std()
# 상한선 계산
result["upper"] = result["ma"] + (std * std_dev)
# 하한선 계산
result["lower"] = result["ma"] - (std * std_dev)
result["%B"]=(data-result["lower"])/(result["upper"]-result["lower"])*100
return result
bband 그래프 작성
def bband_plot(data):
bb=calculate_bollinger_bands(data)
for i in bb.columns[:-1]:
plt.plot(bb[i], label=i)
plt.legend(loc="upper left")
%B 그래프 작성
def percentB_plot(data):
bb=calculate_bollinger_bands(data)
plt.plot(bb["%B"], label="%B")
plt.axhline(80, ls="--", color="r")
plt.axhline(20, ls="--", color="b")
plt.legend(loc="upper left")
C
candle chart 작성
def candleChart(data):
df=data
df["Date"]=df.index
for i in range(len(df)):
open_val = df['Open'].iloc[i]
high_val = df['High'].iloc[i]
low_val = df['Low'].iloc[i]
close_val = df['Close'].iloc[i]
date_val = df['Date'].iloc[i]
if close_val >= open_val:
color = 'green'
else:
color = 'red'
# 몸통 그리기
plt.plot([date_val, date_val], [open_val, close_val], color=color, linewidth=3)
# 위쪽 꼬리 그리기
plt.plot([date_val, date_val], [high_val, max(open_val, close_val)], color=color, linewidth=0.7)
# 아래쪽 꼬리 그리기
plt.plot([date_val, date_val], [min(open_val, close_val), low_val], color=color, linewidth=0.7)
CCI
def calculate_cci(data, window= 14, constant= 0.015):
tp = (data['High'] + data['Low'] + data['Close']) / 3
sma_tp = tp.rolling(window=window, min_periods=1).mean()
mad = np.abs(tp - sma_tp).rolling(window=window, min_periods=1).mean()
cci = (tp - sma_tp) / (constant * mad)
return cci
CCI 그래프 그리기
def cci_plot(cci):
plt.plot(cci, label="cci")
for i, j in zip([100, 0, -100], ['brown', 'k', 'b']):
plt.axhline(i, ls="--", color=j)
plt.legend(loc="upper left")
D
dpo
def calculate_dpo(data, period=12):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
shift_price=df.shift(period//2 +1)
dpo = shift_price - df.rolling(window=period).mean()
return dpo
def dpo_plot(data, period=12):
dpo=calculate_dpo(data, period=period)
plt.plot(dpo, label="DPO")
plt.axhline(0, color="gray")
plt.legend(loc="upper left")
plt.grid(True)
E
EOM(Ease of Movement)
def calculate_eom(data, period=14, adjust_factor=100000):
midpoint=(data["High"]+data["Low"])/2
midpoint_change = midpoint.diff()
box_ratio = (data['Volume'] / ((data["High"]-data["Low"]) * adjust_factor)).replace([np.inf, -np.inf], 0) # ZeroDivisionError 처리
eom = midpoint_change / box_ratio
if period>1:
eom_ma=eom.rolling(window=period).mean()
return eom_ma
else:
return eom
#그래프 작성
def eom_plot(data, period=14, adjust_factor=100000):
eom=calculate_eom(data, period=period, adjust_factor=adjust_factor)
plt.plot(eom, label="EOM")
plt.axhline(0, color="gray")
plt.legend(loc="upper left")
plt.grid(True)
I
#ichimoku(일목균형표)
def calculate_ichimoku(data, n1=9, n2=26, n3=52):
df=data.copy()
high = df['High']
low = df['Low']
close = df['Close']
conversion_line = (high.rolling(window=n1).max() + low.rolling(window=n1).min()) / 2 # 전환선
base_line = (high.rolling(window=n2).max() + low.rolling(window=n2).min()) / 2 #기준선
leading_span_a = ((conversion_line + base_line) / 2).shift(n2) #선행스팬1
leading_span_b = ((high.rolling(window=n3).max() + low.rolling(window=n3).min()) / 2).shift(n2) #선행스팬2
lagging_span = close.shift(-n2) #후행스팬
df['Conversion Line'] = conversion_line
df['Base Line'] = base_line
df['Leading Span A'] = leading_span_a
df['Leading Span B'] = leading_span_b
df['Lagging Span'] = lagging_span
return df
#일목균형표 그래프 작성
def ichmoku_plot(data, n1=9, n2=26, n3=52):
df=calculate_ichimoku(data, n1=n1, n2=n2, n3=n3)
plt.plot(df.index, df['Conversion Line'], label='전환선', color='blue', linewidth=1)
plt.plot(df.index, df['Base Line'], label='기준선', color='red', linewidth=1)
plt.plot(df.index, df['Leading Span A'], label='선행스팬 1', color='green', linewidth=1, alpha=0.5)
plt.plot(df.index, df['Leading Span B'], label='선행스팬 2', color='brown', linewidth=1, alpha=0.5)
plt.plot(df.index, df['Lagging Span'], label='후행스팬', color='purple', linewidth=1)
plt.plot(df.index, df['Close'], label='종가', color='black', linewidth=0.7, alpha=0.7) # 종가도 함께 표시하면 좋음
# 구름대 채우기
plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'],
where=df['Leading Span A'] >= df['Leading Span B'],
color='green', alpha=0.5, label='구름대 (상승)')
plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'],
where=df['Leading Span A'] < df['Leading Span B'],
color='red', alpha=0.5, label='구름대 (하락)')
plt.grid(True, alpha=0.5)
plt.legend(loc="upper left")
M
MACD 계산
def calculate_macd(data, short=12, long=26, signal_window=9):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
result=pd.DataFrame()
short_ema=df.ewm(span=short, adjust=False).mean()
long_ema =df.ewm(span=long, adjust=False).mean()
result["MACD"] = short_ema - long_ema
result["Signal"] = result["MACD"].ewm(span = signal_window, adjust=False).mean()
result["Histogram"] = result["MACD"] - result["Signal"]
return result
MACD 그래프 작성
def macd_plot(data, short=12, long=26, signal_window=9):
macd=calculate_macd(data, short=short, long=long, signal_window=signal_window)
plt.plot(macd["MACD"], label="macd")
plt.plot(macd['Signal'], label="Signal")
col=['orange' if i>0 else 'skyblue' for i in macd["Histogram"]]
plt.bar(macd.index, macd["Histogram"], color=col)
plt.axhline(0, color="k", lw=0.7)
plt.legend(loc="upper left")
plt.grid(True)
MFI 계산
def calculate_mfi(df, period=14):
typical_price = (df['High'] + df['Low'] + df['Close']) / 3
money_flow = typical_price * df['Volume']
positive_flow = pd.Series([mf if tp > tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)])
negative_flow = pd.Series([mf if tp < tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)])
positive_mf = positive_flow.rolling(period).sum()
negative_mf = negative_flow.rolling(period).sum()
money_ratio = positive_mf / negative_mf
mfi = 100 - (100 / (1 + money_ratio))
return mfi
Momentum
def calculate_momentum(data, period=12):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
mom=df - df.shift(period)
mom.name='momentum'
return mom
#그래프 작성
def momentum_plot(data, period=12):
mom=calculate_momentum(data, period=period)
plt.plot(mom, label="momentum")
plt.axhline(0, color="gray")
plt.legend(loc="upper left")
plt.grid(True)
R
ROC
def calculate_ROC(data, period=12):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
roc=df/df.shift(period) * 100
roc.name="roc"
return roc
#그래프 작성
def ROC_plot(data, period=12):
mom=calculate_ROC(data, period=period)
plt.plot(mom, label="ROC")
plt.axhline(100, color="gray")
plt.legend(loc="upper left")
plt.grid(True)
RSI 계산
def calculate_rsi(data, period=14):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
delta = df.diff()
up, down = delta.copy(), delta.copy()
up[up < 0] = 0
down[down > 0] = 0
roll_up = up.ewm(span=period, adjust=False).mean()
roll_down = abs(down.ewm(span=period, adjust=False).mean())
rs = roll_up / roll_down
rsi = 100.0 - (100.0 / (1.0 + rs))
return rsi.fillna(0)
RSI 그림
def rsi_plot(data):
rsi=calculate_rsi(data)
plt.plot(rsi, label="rsi")
plt.axhline(70, ls="--", color="orange")
plt.axhline(50, ls="--", color="gray")
plt.axhline(30, ls="--", color="skyblue")
plt.legend(loc="upper left")
S
stdev 계산, data는 Series 형식.
def calculate_stdev(data, n=10):
if type(data) == pd.core.series.Series:
df = data
else:
df = data["Close"]
ma=df.rolling(window=n).mean()
std=ma.rolling(window=n).std()
return(std.fillna(0))
스토캐스틱 지표 계산
def calculate_stochastic(df, n=14, m=3):
low_list = df['Low'].rolling(window=n, min_periods=n).min()
high_list = df['High'].rolling(window=n, min_periods=n).max()
d = pd.DataFrame()
d['fast_k'] = (df['Close'] - low_list) / (high_list - low_list) * 100
d['fast_d'] = d['fast_k'].rolling(window=m, min_periods=m).mean()
d['slow_k'] = d['fast_d'].rolling(window=m, min_periods=m).mean()
d['slow_d'] = d['slow_k'].rolling(window=m, min_periods=m).mean()
return d
#그래프 작성
def stochastic_plot(data, n=14, m=3, total=True):
stch=calculate_stochastic(data, n=n, m=m)
if total:
for i, j in zip(stch.columns, [0.5, 0.5, 1, 1]):
plt.plot(stch[i], alpha=j, label=i)
else:
for i, j in zip(stch.columns[2:], [1, 1]):
plt.plot(stch[i], alpha=j, label=i)
plt.axhline(80, ls="--", color="r")
plt.axhline(20, ls="--", color="b")
plt.legend(loc="upper left")
plt.grid(True)
댓글
댓글 쓰기