지표계산을 위한 UDF
import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mpl_dates
x축형식 지정을 위한 UDF
def xaxis_form(n=20, format='%y/%m/%d', ax=False): locator = mpl_dates.DayLocator(interval=n) form = mpl_dates.DateFormatter(format) if ax==False: ax=plt.gca() ax.xaxis.set_major_locator(locator) ax.set_xticklabels([]) else: ax=plt.gca() ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(form) plt.xticks(rotation=45)
A
ADL
def calculate_adl(df, short_period=3, long_period=10): high = df['High'] low = df['Low'] close = df['Close'] volume = df['Volume'] result=pd.DataFrame() result["MFM"] = ((close - low) - (high - close)) / (high - low) result["MFV"] = result["MFM"] * volume result["ADL"] = result["MFV"].cumsum() short_ema=result["ADL"].ewm(span=short_period, adjust=False).mean() long_ema=result["ADL"].ewm(span=long_period, adjust=False).mean() result["OSC"]=short_ema - long_ema return result #adl 그래프 def adl_plot(data , short_period=3, long_period=10): adl=calculate_adl(data, short_period, long_period) plt.plot(adl["ADL"], label='ADL', color='r') plt.legend(loc="upper left") plt.grid(True) #adl oscillator 그래프 def adlOSC_plot(data , short_period=3, long_period=10): adl=calculate_adl(data, short_period, long_period) plt.plot(adl["OSC"], label='OSC', color='r') plt.axhline(0, color="skyblue") plt.legend(loc="upper left") plt.grid(True)
ADX 계산
def calculate_adx(data, period=14): high = data['High'] low = data['Low'] close = data['Close'] # True Range (TR) 계산 high_prev = high.shift(1) low_prev = low.shift(1) tr = pd.concat([high - low, abs(high - close.shift(1)), abs(low - close.shift(1))], axis=1).max(axis=1) # Directional Movement (+DM, -DM) 계산 dm_plus = (high - high_prev).fillna(0) dm_minus = (low_prev - low).fillna(0) dm_plus[(dm_plus <= 0) | (dm_plus <= dm_minus)] = 0 dm_minus[(dm_minus <= 0) | (dm_minus <= dm_plus)] = 0 # Smooth TR, +DM, -DM atr = tr.rolling(window=period).mean() di_plus_smooth = dm_plus.rolling(window=period).mean() di_minus_smooth = dm_minus.rolling(window=period).mean() # Directional Index (+DI, -DI) 계산 di_plus = (di_plus_smooth / atr) * 100 di_minus = (di_minus_smooth / atr) * 100 # Directional Movement Index (DX) 계산 dx = (abs(di_plus - di_minus) / (di_plus + di_minus)) * 100 dx = dx.fillna(0) # 분모가 0인 경우 NaN 처리 # Average Directional Index (ADX) 계산 adx = dx.rolling(window=period).mean() return pd.DataFrame({'ADX': adx, '+DI': di_plus, '-DI': di_minus}) #그래프 작성 def adx_plot(data, period=14): adx=calculate_adx(data, period=period) for i, j in zip(adx.columns, ["-", "--", "--"]): plt.plot(adx[i], ls=j, label=i) plt.axhline(25, color="gray") plt.legend(loc="upper left") plt.grid(True)
ATR 계산
def calculate_atr(df, n=14): high_low = df['High'] - df['Low'] high_close = abs(df['High'] - df['Close'].shift()) low_close = abs(df['Low'] - df['Close'].shift()) true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) atr = true_range.rolling(window=n).mean() return atr
awesome oscillator 계산
def calculate_ao(data, short=5, long=34): high=data["High"] low=data["Low"] median_price=(high+low)/2 sma_short=median_price.rolling(window=short).mean() sma_long=median_price.rolling(window=long).mean() ao=sma_short - sma_long return ao def ao_color(data): awesome_color=[] #awesome_color.clear() for i in range(len(data)): if data.iloc[i]>=0 and data.iloc[i]>=data.iloc[i-1]:#양수, 직전보다 증가 awesome_color.append('orange') elif data.iloc[i]>=0 and data.iloc[i]<data.iloc[i-1]: #양수, 직전보다 감소 awesome_color.append('skyblue') elif data.iloc[i]<0 and data.iloc[i]<data.iloc[i-1]:#음수, 직전보다 감수 awesome_color.append('skyblue') elif data.iloc[i]<0 and data.iloc[i]>=data.iloc[i-1]: #음수, 직전보다 증가 awesome_color.append('orange') else: awesome_color.append('k') return awesome_color
B
Bolinger Band
볼린져 밴드 계산, 기본적으로 종가를 사용합니다.
def calculate_bollinger_bands(da, period=20, std_dev=2): if type(da) == pd.core.series.Series: data = da else: data = da["Close"] result=pd.DataFrame() # 중심선 (Moving Average) 계산 result["ma"] = data.rolling(window=period, min_periods=1).mean() # 표준편차 계산 std = data.rolling(window=period, min_periods=1).std() # 상한선 계산 result["upper"] = result["ma"] + (std * std_dev) # 하한선 계산 result["lower"] = result["ma"] - (std * std_dev) result["%B"]=(data-result["lower"])/(result["upper"]-result["lower"])*100 return result
bband 그래프 작성
def bband_plot(data): bb=calculate_bollinger_bands(data) for i in bb.columns[:-1]: plt.plot(bb[i], label=i) plt.legend(loc="upper left")
%B 그래프 작성
def percentB_plot(data): bb=calculate_bollinger_bands(data) plt.plot(bb["%B"], label="%B") plt.axhline(80, ls="--", color="r") plt.axhline(20, ls="--", color="b") plt.legend(loc="upper left")
C
candle chart 작성
def candleChart(data): df=data df["Date"]=df.index for i in range(len(df)): open_val = df['Open'].iloc[i] high_val = df['High'].iloc[i] low_val = df['Low'].iloc[i] close_val = df['Close'].iloc[i] date_val = df['Date'].iloc[i] if close_val >= open_val: color = 'green' else: color = 'red' # 몸통 그리기 plt.plot([date_val, date_val], [open_val, close_val], color=color, linewidth=3) # 위쪽 꼬리 그리기 plt.plot([date_val, date_val], [high_val, max(open_val, close_val)], color=color, linewidth=0.7) # 아래쪽 꼬리 그리기 plt.plot([date_val, date_val], [min(open_val, close_val), low_val], color=color, linewidth=0.7)
CCI
def calculate_cci(data, window= 14, constant= 0.015): tp = (data['High'] + data['Low'] + data['Close']) / 3 sma_tp = tp.rolling(window=window, min_periods=1).mean() mad = np.abs(tp - sma_tp).rolling(window=window, min_periods=1).mean() cci = (tp - sma_tp) / (constant * mad) return cci
CCI 그래프 그리기
def cci_plot(cci): plt.plot(cci, label="cci") for i, j in zip([100, 0, -100], ['brown', 'k', 'b']): plt.axhline(i, ls="--", color=j) plt.legend(loc="upper left")
D
dpo
def calculate_dpo(data, period=12): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] shift_price=df.shift(period//2 +1) dpo = shift_price - df.rolling(window=period).mean() return dpo def dpo_plot(data, period=12): dpo=calculate_dpo(data, period=period) plt.plot(dpo, label="DPO") plt.axhline(0, color="gray") plt.legend(loc="upper left") plt.grid(True)
E
EOM(Ease of Movement)
def calculate_eom(data, period=14, adjust_factor=100000): midpoint=(data["High"]+data["Low"])/2 midpoint_change = midpoint.diff() box_ratio = (data['Volume'] / ((data["High"]-data["Low"]) * adjust_factor)).replace([np.inf, -np.inf], 0) # ZeroDivisionError 처리 eom = midpoint_change / box_ratio if period>1: eom_ma=eom.rolling(window=period).mean() return eom_ma else: return eom #그래프 작성 def eom_plot(data, period=14, adjust_factor=100000): eom=calculate_eom(data, period=period, adjust_factor=adjust_factor) plt.plot(eom, label="EOM") plt.axhline(0, color="gray") plt.legend(loc="upper left") plt.grid(True)
I
#ichimoku(일목균형표) def calculate_ichimoku(data, n1=9, n2=26, n3=52): df=data.copy() high = df['High'] low = df['Low'] close = df['Close'] conversion_line = (high.rolling(window=n1).max() + low.rolling(window=n1).min()) / 2 # 전환선 base_line = (high.rolling(window=n2).max() + low.rolling(window=n2).min()) / 2 #기준선 leading_span_a = ((conversion_line + base_line) / 2).shift(n2) #선행스팬1 leading_span_b = ((high.rolling(window=n3).max() + low.rolling(window=n3).min()) / 2).shift(n2) #선행스팬2 lagging_span = close.shift(-n2) #후행스팬 df['Conversion Line'] = conversion_line df['Base Line'] = base_line df['Leading Span A'] = leading_span_a df['Leading Span B'] = leading_span_b df['Lagging Span'] = lagging_span return df #일목균형표 그래프 작성 def ichmoku_plot(data, n1=9, n2=26, n3=52): df=calculate_ichimoku(data, n1=n1, n2=n2, n3=n3) plt.plot(df.index, df['Conversion Line'], label='전환선', color='blue', linewidth=1) plt.plot(df.index, df['Base Line'], label='기준선', color='red', linewidth=1) plt.plot(df.index, df['Leading Span A'], label='선행스팬 1', color='green', linewidth=1, alpha=0.5) plt.plot(df.index, df['Leading Span B'], label='선행스팬 2', color='brown', linewidth=1, alpha=0.5) plt.plot(df.index, df['Lagging Span'], label='후행스팬', color='purple', linewidth=1) plt.plot(df.index, df['Close'], label='종가', color='black', linewidth=0.7, alpha=0.7) # 종가도 함께 표시하면 좋음 # 구름대 채우기 plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'], where=df['Leading Span A'] >= df['Leading Span B'], color='green', alpha=0.5, label='구름대 (상승)') plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'], where=df['Leading Span A'] < df['Leading Span B'], color='red', alpha=0.5, label='구름대 (하락)') plt.grid(True, alpha=0.5) plt.legend(loc="upper left")
M
MACD 계산
def calculate_macd(data, short=12, long=26, signal_window=9): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] result=pd.DataFrame() short_ema=df.ewm(span=short, adjust=False).mean() long_ema =df.ewm(span=long, adjust=False).mean() result["MACD"] = short_ema - long_ema result["Signal"] = result["MACD"].ewm(span = signal_window, adjust=False).mean() result["Histogram"] = result["MACD"] - result["Signal"] return result
MACD 그래프 작성
def macd_plot(data, short=12, long=26, signal_window=9): macd=calculate_macd(data, short=short, long=long, signal_window=signal_window) plt.plot(macd["MACD"], label="macd") plt.plot(macd['Signal'], label="Signal") col=['orange' if i>0 else 'skyblue' for i in macd["Histogram"]] plt.bar(macd.index, macd["Histogram"], color=col) plt.axhline(0, color="k", lw=0.7) plt.legend(loc="upper left") plt.grid(True)
MFI 계산
def calculate_mfi(df, period=14): typical_price = (df['High'] + df['Low'] + df['Close']) / 3 money_flow = typical_price * df['Volume'] positive_flow = pd.Series([mf if tp > tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)]) negative_flow = pd.Series([mf if tp < tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)]) positive_mf = positive_flow.rolling(period).sum() negative_mf = negative_flow.rolling(period).sum() money_ratio = positive_mf / negative_mf mfi = 100 - (100 / (1 + money_ratio)) return mfi
Momentum
def calculate_momentum(data, period=12): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] mom=df - df.shift(period) mom.name='momentum' return mom #그래프 작성 def momentum_plot(data, period=12): mom=calculate_momentum(data, period=period) plt.plot(mom, label="momentum") plt.axhline(0, color="gray") plt.legend(loc="upper left") plt.grid(True)
R
ROC
def calculate_ROC(data, period=12): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] roc=df/df.shift(period) * 100 roc.name="roc" return roc #그래프 작성 def ROC_plot(data, period=12): mom=calculate_ROC(data, period=period) plt.plot(mom, label="ROC") plt.axhline(100, color="gray") plt.legend(loc="upper left") plt.grid(True)
RSI 계산
def calculate_rsi(data, period=14): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] delta = df.diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 roll_up = up.ewm(span=period, adjust=False).mean() roll_down = abs(down.ewm(span=period, adjust=False).mean()) rs = roll_up / roll_down rsi = 100.0 - (100.0 / (1.0 + rs)) return rsi.fillna(0)
RSI 그림
def rsi_plot(data): rsi=calculate_rsi(data) plt.plot(rsi, label="rsi") plt.axhline(70, ls="--", color="orange") plt.axhline(50, ls="--", color="gray") plt.axhline(30, ls="--", color="skyblue") plt.legend(loc="upper left")
S
stdev 계산, data는 Series 형식.
def calculate_stdev(data, n=10): if type(data) == pd.core.series.Series: df = data else: df = data["Close"] ma=df.rolling(window=n).mean() std=ma.rolling(window=n).std() return(std.fillna(0))
스토캐스틱 지표 계산
def calculate_stochastic(df, n=14, m=3): low_list = df['Low'].rolling(window=n, min_periods=n).min() high_list = df['High'].rolling(window=n, min_periods=n).max() d = pd.DataFrame() d['fast_k'] = (df['Close'] - low_list) / (high_list - low_list) * 100 d['fast_d'] = d['fast_k'].rolling(window=m, min_periods=m).mean() d['slow_k'] = d['fast_d'].rolling(window=m, min_periods=m).mean() d['slow_d'] = d['slow_k'].rolling(window=m, min_periods=m).mean() return d
#그래프 작성 def stochastic_plot(data, n=14, m=3, total=True): stch=calculate_stochastic(data, n=n, m=m) if total: for i, j in zip(stch.columns, [0.5, 0.5, 1, 1]): plt.plot(stch[i], alpha=j, label=i) else: for i, j in zip(stch.columns[2:], [1, 1]): plt.plot(stch[i], alpha=j, label=i) plt.axhline(80, ls="--", color="r") plt.axhline(20, ls="--", color="b") plt.legend(loc="upper left") plt.grid(True)
댓글
댓글 쓰기