File: OptimizeSwing.py

package info (click to toggle)
stopt 5.12%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 8,860 kB
  • sloc: cpp: 70,456; python: 5,950; makefile: 72; sh: 57
file content (139 lines) | stat: -rw-r--r-- 5,757 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright (C) 2016 EDF
# All Rights Reserved
# This code is published under the GNU Lesser General Public License (GNU LGPL)
import numpy as np

# Give an example for swing options  of the optimization object describing a step in optimization and in simulation


# classical swing option with a given number of exercises
class OptimizeSwing:
    
    # Constructor
    # p_payoff pay off used
    # p_nPointStock number of stock points
    # p_actu   actualization factor
    def __init__(self, p_payoff, p_nPointStock, p_actu):
        
        self.m_payoff = p_payoff
        self.m_nPointStock = p_nPointStock
        self.m_actu = p_actu
        self.m_actuStep = 1.
        
    # define the diffusion cone for parallelism
    # p_regionByProcessor             region (min max) treated by the processor for the differents regimes treated
    # returns in each dimension the min max values in the stock that can be reached from the grid p_gridByProcessor for each regime
    def getCone(self, p_regionByProcessor):
        
        extrGrid = np.zeros(p_regionByProcessor)
        #  only a single  exercise
	extrGrid[0][1] += 1
        
        return extrGrid
    
    # permits to actuallize the time (needed fo simulation)
    def incrementActuTimeStep(self):
        
        self.m_actuStep *= self.m_actu
        
    # defines a step in optimization
    # Notice that this implementation is not optimal. In fact no interpolation is necessary for this asset.
    # This implemnetation is for test and example purpose
    # p_grid      grid at arrival step after command
    # p_stock     coordinate of the stock point to treat
    # p_condEsp   continuation values for each regime
    # p_phiIn     for each regime  gives the solution calculated at the previous step ( next time step by Dynamic Programming resolution)
    # return   a pair  :
    #              - for each regimes (column) gives the solution for each particle (row)
    #              - for each control (column) gives the optimal control for each particule (rows)
    def stepOptimize(self, p_grid, p_stock, p_condEsp, p_phiIn, p_simulator):
        
        nbSimul = p_condEsp[0].getNbSimul()
	solutionAndControl = []
        solutionAndControl[0] = np.zeros((nbSimul, 1))
        solutionAndControl[1] = np.zeros((nbSimul, 1))
        payOffVal = self.m_payoff.applyVec(p_condEsp[0].getParticles())
        # create interpolator at current stock point
	interpolatorCurrentStock = p_grid.createInterpolator(p_stock)
        # cash flow at current stock and previous step
	cashSameStock = interpolatorCurrentStock.applyVec(p_phiIn[0])
	# conditional expectation at current stock point
        condExpSameStock = self.m_actu * p_condEsp[0].getAllSimulations(interpolatorCurrentStock)
        
        if p_stock[0] < self.m_nPointStock:
            # calculation detailed for clarity
            # create interapolator at next stock point accessible
	    nextStock = np.zeros(p_stock)
            nextStock[0] += 1
            interpolatorNextStock = p_grid.createInterpolator(nextStock)
            # cash flow at next stock previous step
	    cashNextStock = interpolatorNextStock.applyVec(p_phiIn[0])
            # conditional espectation at next stock
	    condExpNextStock = self.m_actu * p_condEsp[0].getAllSimulations(interpolatorNextStock)
            
	    # arbitrage
	    solutionAndControl[0][:,0] = np.where(payOffVal + condExpNextStock > condExpSameStock, payOffVal + self.m_actu * cashNextStock, self.m_actu * cashSameStock)
            solutionAndControl[1][:,0] = np.where(payOffVal + condExpNextStock > condExpSameStock, 1, 0.)

	else :
            solutionAndControl[0][:,0] = 0.
            solutionAndControl[1][:,0] = 0.

        return solutionAndControl
    
    # defines a step in simulation
    # Notice that this implementation is not optimal. In fact no interpolation is necessary for this asset.
    # This implementation is for test and example purpose
    # p_grid          grid at arrival step after command
    # p_continuation  defines the continuation operator for each regime
    # p_state         defines the state value (modified)
    # p_phiInOut      defines the value function (modified): size number of functions to follow
    def stepSimulate(self, p_grid, p_continuation, p_simulator, p_state, p_phiInOut):
        
        # only if stock not maximal
	if p_state.getPtStock()[0] < self.m_nPointStock:
            payOffVal = self.m_payoff.apply(p_state.getStochasticRealization())
            continuationValue = self.m_actu * p_continuation[0].getValue(p_state.getPtStock(), p_state.getStochasticRealization())
            
            if p_state.getPtStock()[0] < self.m_nPointStock:
                nextStock = np.zeros(p_state.getPtStock())
                nextStock[0] += 1
                continuationValueNext = self.m_actu * p_continuation[0].getValue(nextStock, p_state.getStochasticRealization())
                
                if payOffVal + continuationValueNext > continuationValue:
                    p_state.setPtStock(nextStock)
                    p_phiInOut += payOffVal * self.m_actuStep
                    
    # get number of regimes
    def getNbRegime(self):
        
        return 1

    # number of controls
    def getNbControl(self):
    
        return 1
    
    
    # get actualization factor
    def getActuStep(self):
        
        return self.m_actuStep

    # store the simulator
    def setSimulator(self, p_simulator):
    
        self.m_simulator = p_simulator
    

    # get the simulator back
    def getSimulator(self):
    
        return self.m_simulator
    

    # get size of the  function to follow in simulation
    def getSimuFuncSize(self):
    
        return 1