Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variance swap for Heston model #175

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file added Zhu_and_Lian_MathFin_1.pdf
Binary file not shown.
129 changes: 129 additions & 0 deletions pyfeng/heston.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,135 @@ def strike_var_swap_analytic(self, texp, dt):

return strike

def strike_var_swap_analytic_pctreturn(self, texp, dt):
"""
Analytic fair strike of variance swap. Proposition3.2 in Bernard & Cui (2014)

Args:
texp: time to expiry
dt: observation time step (e.g., dt=1/12 for monthly) For continuous monitoring, set dt=0
must in the form of 1/n for some integer n

Returns:
Fair strike

References:
- Bernard C, Cui Z (2014) Prices and Asymptotics for Discrete Variance Swaps. Applied Mathematical Finance 21:140–173. https://doi.org/10.1080/1350486X.2013.820524

"""
n = int(texp/dt)
delta = dt
kappa = self.mr
theta = self.theta
gamma = self.vov
rho = self.rho
V0 = self.sigma
r = self.intr
S0 = 1


alpha = 2*kappa*theta/gamma**2-1

def d(u):
res = np.sqrt((kappa-gamma*rho*u)**2 + (u - u*2)*gamma**2)
return res

def g(u):
res = (kappa - gamma*rho*u - d(u))/(kappa - gamma*rho*u + d(u))
return res

def q(u):
res = ((kappa - gamma*rho*u - d(u))/(gamma**2))*(1 - np.exp(-d(u)*delta))/(1 - g(u)*np.exp(-d(u)*delta))
return res

def eta(u):
res = (2*kappa)/((gamma**2)*(1 - np.exp(-u*kappa)))
return res

def M(u,t):
e1 = np.exp((kappa*theta/gamma**2)*((kappa - gamma*rho*u - d(u))*t - 2*np.log((1 - g(u)*np.exp(-d(u)*t))/(1 - g(u)))))
e2 = np.exp(V0*(kappa-gamma*rho*u-d(u))*(1 - np.exp(-d(u)*t))/(gamma**2*(1 - g(u)*np.exp(-d(u)*t))))
res = (S0**u)*e1*e2
return res

def ai(ti):
e1 = np.exp(q(2)*V0*(eta(ti)*np.exp(-kappa*ti)/(eta(ti)-q(2))-1))
e2 = (eta(ti)/(eta(ti)-q(2)))**(alpha+1)
res = np.exp(2*r*delta)*M(2, delta)*e1*e2/(S0**2)
return res

def K(n):
res = 0
a = np.exp(2*r*delta)*M(2, delta)/S0**2
res += a
for i in range(1,n):
ti = i*delta
res += ai(ti)
res = res/texp + (n-2*n*np.exp(r*delta))/texp
return res

return K(n)

def strike_var_swap_analytic_ZhuLian(self, texp, dt):
"""
Analytic fair strike of variance swap. eq (2.34) Lian & Zhu (2011)

Args:
texp: time to expiry
dt: observation time step (e.g., dt=1/12 for monthly) For continuous monitoring, set dt=0
must in the form of 1/n for some integer n

Returns:
Fair strike

References:
- Song-Ping Zhu, Guang-Hua Lian (2011) A Closed-form Exact Solution for Pricing Variance Swaps with Stochastic Volatility

"""
kappa = self.mr
theta = self.theta
rho = self.rho
sigma_V = self.vov
v0 = self.sigma
r = self.intr
N = int(texp/dt)
T = texp

def C_D_calculation():
tilde_a = kappa - 2 * rho * sigma_V
tilde_b = np.sqrt(tilde_a ** 2 - 2 * sigma_V ** 2)
tilde_g = (tilde_a / sigma_V) ** 2 - 1 + (tilde_a / sigma_V) * np.sqrt((tilde_a / sigma_V) ** 2 - 2)

term1 = r * dt
term2 = (kappa * theta) / (sigma_V ** 2)
term3 = (tilde_a + tilde_b) * dt
term4 = 2 * np.log((1 - tilde_g * np.exp(tilde_b * dt)) / (1 - tilde_g))

C = term1 + term2 * (term3 - term4)

D = ((tilde_a + tilde_b) / sigma_V ** 2) * ((1 - np.exp(tilde_b * dt)) / (1 - tilde_g * np.exp(tilde_b * dt)))
return C, D

def f():
C, D = C_D_calculation()
return np.exp(C + D * v0) + np.exp(-r * dt) - 2

def sum_fi():
C, D = C_D_calculation()
sum = 0
for i in range(2, N + 1):
c_i = 2 * kappa / (sigma_V ** 2 * (1 - np.exp(-kappa * (i - 1) * dt)))
term1 = np.exp(C + c_i * np.exp(-kappa * (i - 1) * dt) / (c_i - D) * D * v0)
term2 = (c_i / (c_i - D)) ** (2 * kappa * theta / (sigma_V ** 2))
sum += term1 * term2 + np.exp(-r * dt) - 2
return sum


f_v0 = f()
sum_fi_v0 = sum_fi()
K_var = (np.exp(r * dt) / T) * (f_v0 + sum_fi_v0)
return K_var


class HestonUncorrBallRoma1994(HestonABC):
"""
Expand Down
115 changes: 114 additions & 1 deletion pyfeng/heston_mc.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def cond_log_return_var(self, dt, var_0, var_t, avgvar):
mean_ln = self.rho / self.vov * ((var_t - var_0) + self.mr * dt * (avgvar - self.theta)) \
+ (self.intr - self.divr - 0.5 * avgvar) * dt
sigma_ln2 = (1.0 - self.rho**2) * dt * avgvar
return mean_ln**2 + sigma_ln2
return mean_ln**2 + sigma_ln2

def draw_log_return(self, dt, var_0, var_t, avgvar):
"""
Expand All @@ -199,6 +199,25 @@ def draw_log_return(self, dt, var_0, var_t, avgvar):
ln_sig = np.sqrt((1.0 - self.rho**2) * dt * avgvar)
zn = self.rv_normal(spawn=5)
return ln_m + ln_sig * zn

def draw_pct_return(self, dt, var_0, var_t, avgvar):
"""
Samples log return, (S_t/S_0)-1

Args:
dt: time step
var_0: initial variance
var_t: final variance
avgvar: average variance

Returns:
pct return
"""
ln_m = self.rho/self.vov * ((var_t - var_0) + self.mr * dt * (avgvar - self.theta)) \
+ (self.intr - self.divr - 0.5 * avgvar) * dt
ln_sig = np.sqrt((1.0 - self.rho**2) * dt * avgvar)
zn = self.rv_normal(spawn=5)
return np.exp(ln_m + ln_sig * zn)-1

def return_var_realized(self, texp, cond=False):
"""
Expand Down Expand Up @@ -239,6 +258,46 @@ def return_var_realized(self, texp, cond=False):
var_0 = var_t

return var_r / texp # annualized

def return_pctvar_realized(self, texp, cond=False):
"""
Annualized realized return variance

Args:
texp: time to expiry
cond: use conditional expectation without simulating price

Returns:

"""
tobs = self.tobs(texp)
n_dt = len(tobs)
dt = np.diff(tobs, prepend=0)

var_r = np.zeros(self.n_path)
var_0 = np.full(self.n_path, self.sigma)

tmp = self.rho/self.vov*self.mr - 0.5

for i in range(n_dt):
var_t, avgvar_inc, extra = self.cond_states_step(dt[i], var_0)

if self.correct_martingale:
pois_avgvar_v = extra.get('pois_avgvar_v', None)
if pois_avgvar_v is not None: # missing variance:
var_r += (tmp*dt[i])**2 * pois_avgvar_v
qe_m_corr = extra.get('qe_m_corr', 0.0)
else:
qe_m_corr = 0.0

if cond:
var_r += np.exp(self.cond_log_return_var(dt[i], var_0, var_t, avgvar_inc))-1
else:
var_r += (np.exp(self.draw_log_return(dt[i], var_0, var_t, avgvar_inc))-1 + qe_m_corr) ** 2

var_0 = var_t

return var_r / texp # annualized

def gamma_lambda(self, dt, kk=0):
"""
Expand Down Expand Up @@ -772,6 +831,60 @@ def cond_states_step(self, dt, var_0):

return var_t, avgvar, {}

class HestonMC:
"""
Heston model with Monte-Carlo simulation

Naive MC for Heston model based on Euler-Maruyama discretization scheme

Underlying price follows a geometric Brownian motion, and variance of the price follows a CIR process.

References:
- Song-Ping Zhu∗, Guang-Hua Lian(2011) SA Closed-form Exact Solution for Pricing Variance Swaps with Stochastic Volatility
"""
def __init__(self, **kwargs) -> None:
default_kwargs = {'sigma': 0.04, 'vov': 0.618, 'rho': -0.64,\
'mr': 11.35, 'theta': 0.022, 'intr': 0.1}
for key, value in default_kwargs.items():
setattr(self, key, kwargs.get(key, value))

def set_num_params(self,dt=1/4,n_path=200000):
self.dt = dt
self.n_path = n_path

def return_pctvar_realized(self, texp):
"""
Annualized realized return variance

Args:
texp: time to expiry
Returns:
"""
n_dt = int(texp / self.dt)
dt = texp / n_dt

var_r = np.zeros(self.n_path)
var_0 = np.full(self.n_path, self.sigma)

for i in range(n_dt):
var_t, z1 = self.cond_states_step(dt, var_0)
var_r += (self.draw_pct_return(dt, var_0, z1) ) ** 2
var_0 = var_t

return var_r / texp # annualized

def cond_states_step(self, dt, var_0):
z1 = np.random.normal(size=self.n_path)
z2 = np.random.normal(size=self.n_path)
zz = self.rho * z1 + np.sqrt(1 - self.rho**2) * z2
var_t = var_0 + self.mr * (self.theta - var_0) * dt + np.sqrt(abs(var_0)) * self.vov * zz * np.sqrt(dt)
var_t = np.maximum(var_t, 0)
return var_t, z1

def draw_pct_return(self, dt, var_0, z1):
res = self.intr*dt+np.sqrt(abs(var_0))*z1*np.sqrt(dt)
return res


class HestonMcAndersen2008(HestonMcABC):
"""
Expand Down
Loading