기본 콘텐츠로 건너뛰기

[matplotlib]quiver()함수

[stock] 지표계산을 위한 UDF

지표계산을 위한 UDF

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mpl_dates

x축형식 지정을 위한 UDF

def xaxis_form(n=20, format='%y/%m/%d', ax=False):
    locator = mpl_dates.DayLocator(interval=n)
    form = mpl_dates.DateFormatter(format)
    if ax==False:
        ax=plt.gca()
        ax.xaxis.set_major_locator(locator)
        ax.set_xticklabels([])
    else:
        ax=plt.gca()
        ax.xaxis.set_major_locator(locator)
        ax.xaxis.set_major_formatter(form)
        plt.xticks(rotation=45)   

A

ADL

def calculate_adl(df, short_period=3, long_period=10):
  high = df['High']
  low = df['Low']
  close = df['Close']
  volume = df['Volume']
  result=pd.DataFrame()
  result["MFM"] = ((close - low) - (high - close)) / (high - low)
  result["MFV"] = result["MFM"] * volume
  result["ADL"] = result["MFV"].cumsum()
  short_ema=result["ADL"].ewm(span=short_period, adjust=False).mean()
  long_ema=result["ADL"].ewm(span=long_period, adjust=False).mean()
  result["OSC"]=short_ema - long_ema
  return result  
  
#adl 그래프
def adl_plot(data , short_period=3, long_period=10):
  adl=calculate_adl(data, short_period, long_period)
  plt.plot(adl["ADL"], label='ADL', color='r')
  plt.legend(loc="upper left")
  plt.grid(True)
  
#adl oscillator 그래프   
def adlOSC_plot(data , short_period=3, long_period=10):
    adl=calculate_adl(data, short_period, long_period)
    plt.plot(adl["OSC"], label='OSC', color='r')
    plt.axhline(0, color="skyblue")
    plt.legend(loc="upper left")
    plt.grid(True)

ADX 계산

def calculate_adx(data, period=14):
    high = data['High']
    low = data['Low']
    close = data['Close']
    # True Range (TR) 계산
    high_prev = high.shift(1)
    low_prev = low.shift(1)
    tr = pd.concat([high - low, abs(high - close.shift(1)), abs(low - close.shift(1))], axis=1).max(axis=1)
    # Directional Movement (+DM, -DM) 계산
    dm_plus = (high - high_prev).fillna(0)
    dm_minus = (low_prev - low).fillna(0)
    dm_plus[(dm_plus <= 0) | (dm_plus <= dm_minus)] = 0
    dm_minus[(dm_minus <= 0) | (dm_minus <= dm_plus)] = 0
    # Smooth TR, +DM, -DM
    atr = tr.rolling(window=period).mean()
    di_plus_smooth = dm_plus.rolling(window=period).mean()
    di_minus_smooth = dm_minus.rolling(window=period).mean()
    # Directional Index (+DI, -DI) 계산
    di_plus = (di_plus_smooth / atr) * 100
    di_minus = (di_minus_smooth / atr) * 100
    # Directional Movement Index (DX) 계산
    dx = (abs(di_plus - di_minus) / (di_plus + di_minus)) * 100
    dx = dx.fillna(0) # 분모가 0인 경우 NaN 처리
    # Average Directional Index (ADX) 계산
    adx = dx.rolling(window=period).mean()
    return pd.DataFrame({'ADX': adx, '+DI': di_plus, '-DI': di_minus})
#그래프 작성
def adx_plot(data, period=14):
    adx=calculate_adx(data, period=period)
    for i, j in zip(adx.columns, ["-", "--", "--"]):
        plt.plot(adx[i], ls=j, label=i)
    plt.axhline(25, color="gray")
    plt.legend(loc="upper left")
    plt.grid(True)

ATR 계산

def calculate_atr(df, n=14):
    high_low = df['High'] - df['Low']
    high_close = abs(df['High'] - df['Close'].shift())
    low_close = abs(df['Low'] - df['Close'].shift())
    true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
    atr = true_range.rolling(window=n).mean()
    return atr

awesome oscillator 계산

def calculate_ao(data, short=5, long=34):
    high=data["High"]
    low=data["Low"]
    median_price=(high+low)/2
    sma_short=median_price.rolling(window=short).mean()
    sma_long=median_price.rolling(window=long).mean()
    ao=sma_short - sma_long
    return ao

def ao_color(data):
    awesome_color=[]
    #awesome_color.clear()
    for i in range(len(data)):
        if data.iloc[i]>=0 and data.iloc[i]>=data.iloc[i-1]:#양수, 직전보다 증가
            awesome_color.append('orange')
        elif data.iloc[i]>=0 and data.iloc[i]<data.iloc[i-1]: #양수, 직전보다 감소
            awesome_color.append('skyblue')
        elif data.iloc[i]<0 and data.iloc[i]<data.iloc[i-1]:#음수, 직전보다 감수
            awesome_color.append('skyblue')
        elif data.iloc[i]<0 and data.iloc[i]>=data.iloc[i-1]: #음수, 직전보다 증가 
            awesome_color.append('orange')
        else:
            awesome_color.append('k')
    return awesome_color

B

Bolinger Band
볼린져 밴드 계산, 기본적으로 종가를 사용합니다.

def calculate_bollinger_bands(da, period=20, std_dev=2):
    if type(da) == pd.core.series.Series:
        data = da
    else:
        data = da["Close"] 
    result=pd.DataFrame()
    # 중심선 (Moving Average) 계산
    result["ma"] = data.rolling(window=period, min_periods=1).mean()
    # 표준편차 계산
    std = data.rolling(window=period, min_periods=1).std()
    # 상한선 계산
    result["upper"] = result["ma"] + (std * std_dev)
    # 하한선 계산
    result["lower"] = result["ma"] - (std * std_dev)
    result["%B"]=(data-result["lower"])/(result["upper"]-result["lower"])*100
    return result

bband 그래프 작성

def bband_plot(data):
    bb=calculate_bollinger_bands(data)
    for i in bb.columns[:-1]:
        plt.plot(bb[i], label=i)
    plt.legend(loc="upper left")

%B 그래프 작성

def percentB_plot(data):
    bb=calculate_bollinger_bands(data)
    plt.plot(bb["%B"], label="%B")
    plt.axhline(80, ls="--", color="r")
    plt.axhline(20, ls="--", color="b")
    plt.legend(loc="upper left")

C

candle chart 작성

def candleChart(data):
    df=data
    df["Date"]=df.index
    for i in range(len(df)):
        open_val = df['Open'].iloc[i]
        high_val = df['High'].iloc[i]
        low_val = df['Low'].iloc[i]
        close_val = df['Close'].iloc[i]
        date_val = df['Date'].iloc[i]

        if close_val >= open_val:
            color = 'green'
        else:
            color = 'red'
        # 몸통 그리기
        plt.plot([date_val, date_val], [open_val, close_val], color=color, linewidth=3)
        # 위쪽 꼬리 그리기
        plt.plot([date_val, date_val], [high_val, max(open_val, close_val)], color=color, linewidth=0.7)
        # 아래쪽 꼬리 그리기
        plt.plot([date_val, date_val], [min(open_val, close_val), low_val], color=color, linewidth=0.7)

CCI

def calculate_cci(data, window= 14, constant= 0.015):
    tp = (data['High'] + data['Low'] + data['Close']) / 3
    sma_tp = tp.rolling(window=window, min_periods=1).mean()
    mad = np.abs(tp - sma_tp).rolling(window=window, min_periods=1).mean()
    cci = (tp - sma_tp) / (constant * mad)
    return cci

CCI 그래프 그리기

def cci_plot(cci):
    plt.plot(cci, label="cci")
    for i, j in zip([100, 0, -100], ['brown', 'k', 'b']):
        plt.axhline(i, ls="--", color=j)
    plt.legend(loc="upper left")

D

dpo

def calculate_dpo(data, period=12):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"]
    shift_price=df.shift(period//2 +1)
    dpo = shift_price - df.rolling(window=period).mean()
    return dpo 

def dpo_plot(data, period=12):
    dpo=calculate_dpo(data, period=period)
    plt.plot(dpo, label="DPO")
    plt.axhline(0, color="gray")
    plt.legend(loc="upper left")
    plt.grid(True)

E

EOM(Ease of Movement)

def calculate_eom(data, period=14, adjust_factor=100000):
    midpoint=(data["High"]+data["Low"])/2
    midpoint_change = midpoint.diff()
    box_ratio = (data['Volume'] / ((data["High"]-data["Low"]) * adjust_factor)).replace([np.inf, -np.inf], 0) # ZeroDivisionError 처리
    eom = midpoint_change / box_ratio
    if period>1:
        eom_ma=eom.rolling(window=period).mean()
        return eom_ma
    else:
        return eom

#그래프 작성 
def eom_plot(data, period=14, adjust_factor=100000):
    eom=calculate_eom(data, period=period, adjust_factor=adjust_factor)
    plt.plot(eom, label="EOM")
    plt.axhline(0, color="gray")
    plt.legend(loc="upper left")
    plt.grid(True)

I

#ichimoku(일목균형표)
def calculate_ichimoku(data, n1=9, n2=26, n3=52):
    df=data.copy()
    high = df['High']
    low = df['Low']
    close = df['Close']
    conversion_line = (high.rolling(window=n1).max() + low.rolling(window=n1).min()) / 2 # 전환선
    base_line = (high.rolling(window=n2).max() + low.rolling(window=n2).min()) / 2 #기준선
    leading_span_a = ((conversion_line + base_line) / 2).shift(n2) #선행스팬1
    leading_span_b = ((high.rolling(window=n3).max() + low.rolling(window=n3).min()) / 2).shift(n2) #선행스팬2
    lagging_span = close.shift(-n2) #후행스팬
    df['Conversion Line'] = conversion_line
    df['Base Line'] = base_line
    df['Leading Span A'] = leading_span_a
    df['Leading Span B'] = leading_span_b
    df['Lagging Span'] = lagging_span
    return df
#일목균형표 그래프 작성   
def ichmoku_plot(data, n1=9, n2=26, n3=52):
    df=calculate_ichimoku(data, n1=n1, n2=n2, n3=n3)
    plt.plot(df.index, df['Conversion Line'], label='전환선', color='blue', linewidth=1)
    plt.plot(df.index, df['Base Line'], label='기준선', color='red', linewidth=1)
    plt.plot(df.index, df['Leading Span A'], label='선행스팬 1', color='green', linewidth=1, alpha=0.5)
    plt.plot(df.index, df['Leading Span B'], label='선행스팬 2', color='brown', linewidth=1, alpha=0.5)
    plt.plot(df.index, df['Lagging Span'], label='후행스팬', color='purple', linewidth=1)
    plt.plot(df.index, df['Close'], label='종가', color='black', linewidth=0.7, alpha=0.7) # 종가도 함께 표시하면 좋음
    # 구름대 채우기
    plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'],
                    where=df['Leading Span A'] >= df['Leading Span B'],
                color='green', alpha=0.5, label='구름대 (상승)')
    plt.fill_between(df.index, df['Leading Span A'], df['Leading Span B'],
                    where=df['Leading Span A'] < df['Leading Span B'],
                color='red', alpha=0.5, label='구름대 (하락)')
    plt.grid(True, alpha=0.5)
    plt.legend(loc="upper left")

M

MACD 계산

def calculate_macd(data, short=12, long=26, signal_window=9):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"]        
    result=pd.DataFrame()
    short_ema=df.ewm(span=short, adjust=False).mean()
    long_ema =df.ewm(span=long, adjust=False).mean()
    result["MACD"] = short_ema - long_ema
    result["Signal"] = result["MACD"].ewm(span = signal_window, adjust=False).mean()
    result["Histogram"] = result["MACD"] - result["Signal"]
    return result   

MACD 그래프 작성

def macd_plot(data, short=12, long=26, signal_window=9):   
    macd=calculate_macd(data, short=short, long=long, signal_window=signal_window)
    plt.plot(macd["MACD"], label="macd")
    plt.plot(macd['Signal'], label="Signal")
    col=['orange' if i>0 else 'skyblue'  for i in macd["Histogram"]]
    plt.bar(macd.index, macd["Histogram"], color=col)
    plt.axhline(0, color="k", lw=0.7)
    plt.legend(loc="upper left")
    plt.grid(True)

MFI 계산

def calculate_mfi(df, period=14):
    typical_price = (df['High'] + df['Low'] + df['Close']) / 3
    money_flow = typical_price * df['Volume']
    positive_flow = pd.Series([mf if tp > tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)])
    negative_flow = pd.Series([mf if tp < tp_prev else 0 for tp, tp_prev, mf in zip(typical_price, typical_price.shift(1), money_flow)])
    positive_mf = positive_flow.rolling(period).sum()
    negative_mf = negative_flow.rolling(period).sum()
    money_ratio = positive_mf / negative_mf
    mfi = 100 - (100 / (1 + money_ratio))
    return mfi

Momentum

def calculate_momentum(data, period=12):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"]
    mom=df - df.shift(period)    
    mom.name='momentum'
    return mom   
    
   #그래프 작성
def momentum_plot(data, period=12):
    mom=calculate_momentum(data, period=period)
    plt.plot(mom, label="momentum")
    plt.axhline(0, color="gray")
    plt.legend(loc="upper left")
    plt.grid(True)

R

ROC

def calculate_ROC(data, period=12):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"]
    roc=df/df.shift(period) * 100
    roc.name="roc"
    return roc  
    
 #그래프 작성
def ROC_plot(data, period=12):
    mom=calculate_ROC(data, period=period)
    plt.plot(mom, label="ROC")
    plt.axhline(100, color="gray")
    plt.legend(loc="upper left")
    plt.grid(True)

RSI 계산

def calculate_rsi(data, period=14):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"] 
    delta = df.diff()
    up, down = delta.copy(), delta.copy()
    up[up < 0] = 0
    down[down > 0] = 0
    roll_up = up.ewm(span=period, adjust=False).mean()
    roll_down = abs(down.ewm(span=period, adjust=False).mean())
    rs = roll_up / roll_down
    rsi = 100.0 - (100.0 / (1.0 + rs))
    return rsi.fillna(0)

RSI 그림

def rsi_plot(data):
    rsi=calculate_rsi(data)
    plt.plot(rsi, label="rsi")
    plt.axhline(70, ls="--", color="orange")
    plt.axhline(50, ls="--", color="gray")
    plt.axhline(30, ls="--", color="skyblue")
    plt.legend(loc="upper left")

S

stdev 계산, data는 Series 형식.

def calculate_stdev(data, n=10):
    if type(data) == pd.core.series.Series:
        df = data
    else:
        df = data["Close"] 
    ma=df.rolling(window=n).mean()
    std=ma.rolling(window=n).std()
    return(std.fillna(0))

스토캐스틱 지표 계산

def calculate_stochastic(df, n=14, m=3):
    low_list = df['Low'].rolling(window=n, min_periods=n).min()
    high_list = df['High'].rolling(window=n, min_periods=n).max()
    d = pd.DataFrame()
    d['fast_k'] = (df['Close'] - low_list) / (high_list - low_list) * 100
    d['fast_d'] = d['fast_k'].rolling(window=m, min_periods=m).mean()
    d['slow_k'] = d['fast_d'].rolling(window=m, min_periods=m).mean()
    d['slow_d'] = d['slow_k'].rolling(window=m, min_periods=m).mean()
    return d
#그래프 작성 
def stochastic_plot(data, n=14, m=3, total=True):
    stch=calculate_stochastic(data, n=n, m=m)
    if total:
        for i, j in zip(stch.columns, [0.5, 0.5, 1, 1]):
            plt.plot(stch[i], alpha=j, label=i)
    else:
        for i, j in zip(stch.columns[2:], [1, 1]):
            plt.plot(stch[i], alpha=j, label=i)
    plt.axhline(80,  ls="--", color="r")
    plt.axhline(20,  ls="--", color="b")
    plt.legend(loc="upper left")
    plt.grid(True)

댓글

이 블로그의 인기 게시물

[Linear Algebra] 유사변환(Similarity transformation)

유사변환(Similarity transformation) n×n 차원의 정방 행렬 A, B 그리고 가역 행렬 P 사이에 식 1의 관계가 성립하면 행렬 A와 B는 유사행렬(similarity matrix)이 되며 행렬 A를 가역행렬 P와 B로 분해하는 것을 유사 변환(similarity transformation) 이라고 합니다. $$\tag{1} A = PBP^{-1} \Leftrightarrow P^{-1}AP = B $$ 식 2는 식 1의 양변에 B의 고유값을 고려한 것입니다. \begin{align}\tag{식 2} B - \lambda I &= P^{-1}AP – \lambda P^{-1}P\\ &= P^{-1}(AP – \lambda P)\\ &= P^{-1}(A - \lambda I)P \end{align} 식 2의 행렬식은 식 3과 같이 정리됩니다. \begin{align} &\begin{aligned}\textsf{det}(B - \lambda I ) & = \textsf{det}(P^{-1}(AP – \lambda P))\\ &= \textsf{det}(P^{-1}) \textsf{det}((A – \lambda I)) \textsf{det}(P)\\ &= \textsf{det}(P^{-1}) \textsf{det}(P) \textsf{det}((A – \lambda I))\\ &= \textsf{det}(A – \lambda I)\end{aligned}\\ &\begin{aligned}\because \; \textsf{det}(P^{-1}) \textsf{det}(P) &= \textsf{det}(P^{-1}P)\\ &= \textsf{det}(I)\end{aligned}\end{align} 유사행렬의 특성 유사행렬인 두 정방행렬 A와 B는 'A ~ B' 와 같...

[sympy] Sympy객체의 표현을 위한 함수들

Sympy객체의 표현을 위한 함수들 General simplify(x): 식 x(sympy 객체)를 간단히 정리 합니다. import numpy as np from sympy import * x=symbols("x") a=sin(x)**2+cos(x)**2 a $\sin^{2}{\left(x \right)} + \cos^{2}{\left(x \right)}$ simplify(a) 1 simplify(b) $\frac{x^{3} + x^{2} - x - 1}{x^{2} + 2 x + 1}$ simplify(b) x - 1 c=gamma(x)/gamma(x-2) c $\frac{\Gamma\left(x\right)}{\Gamma\left(x - 2\right)}$ simplify(c) $\displaystyle \left(x - 2\right) \left(x - 1\right)$ 위의 예들 중 객체 c의 감마함수(gamma(x))는 확률분포 등 여러 부분에서 사용되는 표현식으로 다음과 같이 정의 됩니다. 감마함수는 음이 아닌 정수를 제외한 모든 수에서 정의됩니다. 식 1과 같이 자연수에서 감마함수는 factorial(!), 부동소수(양의 실수)인 경우 적분을 적용하여 계산합니다. $$\tag{식 1}\Gamma(n) =\begin{cases}(n-1)!& n:\text{자연수}\\\int^\infty_0x^{n-1}e^{-x}\,dx& n:\text{부동소수}\end{cases}$$ x=symbols('x') gamma(x).subs(x,4) $\displaystyle 6$ factorial 계산은 math.factorial() 함수를 사용할 수 있습니다. import math math.factorial(3) 6 a=gamma(x).subs(x,4.5) a.evalf(3) 11.6 simpilfy() 함수의 알고리즘은 식에서 공통사항을 찾아 정리하...

sympy.solvers로 방정식해 구하기

sympy.solvers로 방정식해 구하기 대수 방정식을 해를 계산하기 위해 다음 함수를 사용합니다. sympy.solvers.solve(f, *symbols, **flags) f=0, 즉 동차방정식에 대해 지정한 변수의 해를 계산 f : 식 또는 함수 symbols: 식의 해를 계산하기 위한 변수, 변수가 하나인 경우는 생략가능(자동으로 인식) flags: 계산 또는 결과의 방식을 지정하기 위한 인수들 dict=True: {x:3, y:1}같이 사전형식, 기본값 = False set=True :{(x,3),(y,1)}같이 집합형식, 기본값 = False ratioal=True : 실수를 유리수로 반환, 기본값 = False positive=True: 해들 중에 양수만을 반환, 기본값 = False 예 $x^2=1$의 해를 결정합니다. solve() 함수에 적용하기 위해서는 다음과 같이 식의 한쪽이 0이 되는 형태인 동차식으로 구성되어야 합니다. $$x^2-1=0$$ import numpy as np from sympy import * x = symbols('x') solve(x**2-1, x) [-1, 1] 위 식은 계산 과정은 다음과 같습니다. $$\begin{aligned}x^2-1=0 \rightarrow (x+1)(x-1)=0 \\ x=1 \; \text{or}\; -1\end{aligned}$$ 예 $x^4=1$의 해를 결정합니다. solve() 함수의 인수 set=True를 지정하였으므로 결과는 집합(set)형으로 반환됩니다. eq=x**4-1 solve(eq, set=True) ([x], {(-1,), (-I,), (1,), (I,)}) 위의 경우 I는 복소수입니다.즉 위 결과의 과정은 다음과 같습니다. $$x^4-1=(x^2+1)(x+1)(x-1)=0 \rightarrow x=\pm \sqrt{-1}, \; \pm 1=\pm i,\; \pm1$$ 실수...