1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
# Copyright (C) 2016 EDF
# All Rights Reserved
# This code is published under the GNU Lesser General Public License (GNU LGPL)
import numpy as np
# Defines a fictitious swing on Ndim stocks : the valorisation will be equal to the valorisation of ndim swing
# where ndim is the number of stocks
class OptimizeFictitiousSwing:
# Constructor
# p_payoff pay off used
# p_nPointStock number of stock points
# p_ndim dimension of the problem (stock number)
# p_actu actualization factor
def __init__(self, p_payoff, p_nPointStock, p_ndim, p_actu):
self.m_payoff = p_payoff
self.m_nPointStock = p_nPointStock
self.m_ndim = p_ndim
self.m_actu = p_actu
self.m_actuStep = 1.
# define the diffusion cone for parallelism
# p_regionByProcessor region (min max) treated by the processor for the different regimes treated
# return returns in each dimension the min max values in the stock that can be reached from the grid p_gridByProcessor for each regime
def getCone(self, p_regionByProcessor):
extrGrid = np.zeros(p_regionByProcessor)
# only a single exercise
for id in range(self.m_ndim):
extrGrid[id][1] += 1.
return extrGrid
# defines a step in optimization
# Notice that this implementation is not optimal. In fact no interpolation is necessary for this asset.
# This implementation is for test and example purpose
# p_grid grid at arrival step after command
# p_stock coordinate of the stock point to treat
# p_condEsp conditional expectation operator
# p_phiIn for each regime gives the solution calculated at the previous step ( next time step by Dynamic Programming resolution)
# return for each regimes (column) gives the solution for each particle (row)
def stepOptimize(self, p_grid, p_stock, p_condEsp, p_phiIn):
nbSimul = p_condEsp[0].getNbSimul()
solutionAndControl = list(range(2))
solutionAndControl[0] = np.zeros((nbSimul, 1))
solutionAndControl[1] = np.zeros((nbSimul, 1))
payOffVal = self.m_payoff.applyVec(p_condEsp[0].getParticles())
bConstraint = False
# calculation detailed for clarity
# create interpolator at current stock point
interpolatorCurrentStock = p_grid.createInterpolator(p_stock)
# cash flow at current stock and previous step
cashSameStock = interpolatorCurrentStock.applyVec(p_phiIn[0])
# conditional expectation at current stock point
condExpSameStock = self.m_actu * p_condEsp[0].getAllSimulations(interpolatorCurrentStock)
# number of possibilities for arbitrage with m_ndim stocks
nbArb = (0x01 << self.m_ndim)
# stock to add
stockToAdd = np.zeros(self.m_ndim)
# store optimal conditional expectation
condExpOpt = np.zeros(nbSimul) - 1e6
for j in range(nbArb):
nbArb += 1
ires = j
for id in range(1, self.m_ndim + 1)[::-1]:
id -= 1
idec = (ires >> id)
stockToAdd[id] = idec
ires -= (idec << id)
p_stock = p_stock.reshape(2)
# calculation detailed for clarity
# create interpolator at next stock point accessible
nextStock = p_stock + stockToAdd
# test that stock is possible
if np.amax(nextStock) <= self.m_nPointStock:
interpolatorNextStock = p_grid.createInterpolator(nextStock)
# cash flow at next stock previous step
cashNextStock = interpolatorNextStock.applyVec(p_phiIn[0]).squeeze()
# conditional expectation at next stock
condExpNextStock = self.m_actu * p_condEsp[0].getAllSimulations(interpolatorNextStock).squeeze()
# quantity exercised
qExercized = stockToAdd.sum()
# arbitrage
condExp = payOffVal * qExercized + condExpNextStock
solutionAndControl[0][:,0] = np.where(condExp > condExpOpt, payOffVal * qExercized + self.m_actu * cashNextStock, solutionAndControl[0][:,0])
condExpOpt = np.where(condExp > condExpOpt, condExp, condExpOpt)
solutionAndControl[1][:,0] = np.where(condExp > condExpOpt, qExercized, solutionAndControl[1][:,0])
return solutionAndControl
# defines a step in simulation
# Notice that this implementation is not optimal. In fact no interpolation is necessary for this asset.
# This implementation is for test and example purpose
# p_grid grid at arrival step after command
# p_continuation defines the continuation operator for each regime
# p_state defines the state value (modified)
# p_phiInOut defines the value function (modified)
def stepSimulate(self, p_grid, p_continuation, p_state, p_phiInOut):
# number of possibilities for arbitrage with m_ndim stocks
nbArb = (0x01 << self.m_ndim)
# stock to add
stockToAdd = np.zeros(self.m_ndim)
# initialize
phiAdd = -1e6
# max of conditional expectation
expectationMax = -1e6
ptStockMax = np.zeros(len(p_state.getPtStock()))
bStock = (p_state.getPtStock()[0] == p_state.getPtStock()[1])
payOffVal = self.m_payoff.apply(p_state.getStochasticRealization())
for j in range(nbArb):
ires = j
for id in range(self.m_ndim - 1)[::-1]:
idec = (ires >> id)
stockToAdd[id] = idec
ires -= (idec << id)
nextStock = p_state.getPtStock() + stockToAdd
# test that stock is possible
if nextStock.maxCoeff() <= self.m_nPointStock:
# quantity exercised
qExercized = stockToAdd.sum()
continuationValueNext = self.m_actu * p_continuation[0].getValue(nextStock, p_state.getStochasticRealization())
expectation = payOffVal * qExercized + continuationValueNext
if (expectation > expectationMax):
ptStockMax = nextStock
phiAdd = payOffVal * qExercized * self.m_actuStep
expectationMax = expectation
p_state.setPtStock(ptStockMax)
p_phiInOut += phiAdd
# get number of regimes
def getNbRegime(self):
return 1
def getNbControl(self):
return 1
# get actualization factor
def getActuStep(self):
return self.m_actuStep
# store the simulator
def setSimulator(self, p_simulator):
self.m_simulator = p_simulator
# get the simulator back
def getSimulator(self):
return self.m_simulator
# get size of the function to follow in simulation
def getSimuFuncSize(self):
return 1
|