1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
'''VAR and VARMA process
this doesn't actually do much, trying out a version for a time loop
alternative representation:
* textbook, different blocks in matrices
* Kalman filter
* VAR, VARX and ARX could be calculated with signal.lfilter
only tried some examples, not implemented
TODO: try minimizing sum of squares of (Y-Yhat)
Note: filter has smallest lag at end of array and largest lag at beginning,
be careful for asymmetric lags coefficients
check this again if it is consistently used
changes
2009-09-08 : separated from movstat.py
Author : josefpkt
License : BSD
'''
from __future__ import print_function
import numpy as np
from scipy import signal
#import matplotlib.pylab as plt
from numpy.testing import assert_array_equal, assert_array_almost_equal
#NOTE: this just returns that predicted values given the
#B matrix in polynomial form.
#TODO: make sure VAR class returns B/params in this form.
def VAR(x,B, const=0):
''' multivariate linear filter
Parameters
----------
x: (TxK) array
columns are variables, rows are observations for time period
B: (PxKxK) array
b_t-1 is bottom "row", b_t-P is top "row" when printing
B(:,:,0) is lag polynomial matrix for variable 1
B(:,:,k) is lag polynomial matrix for variable k
B(p,:,k) is pth lag for variable k
B[p,:,:].T corresponds to A_p in Wikipedia
const: float or array (not tested)
constant added to autoregression
Returns
-------
xhat: (TxK) array
filtered, predicted values of x array
Notes
-----
xhat(t,i) = sum{_p}sum{_k} { x(t-P:t,:) .* B(:,:,i) } for all i = 0,K-1, for all t=p..T
xhat does not include the forecasting observation, xhat(T+1),
xhat is 1 row shorter than signal.correlate
References
----------
http://en.wikipedia.org/wiki/Vector_Autoregression
http://en.wikipedia.org/wiki/General_matrix_notation_of_a_VAR(p)
'''
p = B.shape[0]
T = x.shape[0]
xhat = np.zeros(x.shape)
for t in range(p,T): #[p+2]:#
## print(p,T)
## print(x[t-p:t,:,np.newaxis].shape)
## print(B.shape)
#print(x[t-p:t,:,np.newaxis])
xhat[t,:] = const + (x[t-p:t,:,np.newaxis]*B).sum(axis=1).sum(axis=0)
return xhat
def VARMA(x,B,C, const=0):
''' multivariate linear filter
x (TxK)
B (PxKxK)
xhat(t,i) = sum{_p}sum{_k} { x(t-P:t,:) .* B(:,:,i) } +
sum{_q}sum{_k} { e(t-Q:t,:) .* C(:,:,i) }for all i = 0,K-1
'''
P = B.shape[0]
Q = C.shape[0]
T = x.shape[0]
xhat = np.zeros(x.shape)
e = np.zeros(x.shape)
start = max(P,Q)
for t in range(start,T): #[p+2]:#
## print(p,T
## print(x[t-p:t,:,np.newaxis].shape
## print(B.shape
#print(x[t-p:t,:,np.newaxis]
xhat[t,:] = const + (x[t-P:t,:,np.newaxis]*B).sum(axis=1).sum(axis=0) + \
(e[t-Q:t,:,np.newaxis]*C).sum(axis=1).sum(axis=0)
e[t,:] = x[t,:] - xhat[t,:]
return xhat, e
if __name__ == '__main__':
T = 20
K = 2
P = 3
#x = np.arange(10).reshape(5,2)
x = np.column_stack([np.arange(T)]*K)
B = np.ones((P,K,K))
#B[:,:,1] = 2
B[:,:,1] = [[0,0],[0,0],[0,1]]
xhat = VAR(x,B)
print(np.all(xhat[P:,0]==np.correlate(x[:-1,0],np.ones(P))*2))
#print(xhat)
T = 20
K = 2
Q = 2
P = 3
const = 1
#x = np.arange(10).reshape(5,2)
x = np.column_stack([np.arange(T)]*K)
B = np.ones((P,K,K))
#B[:,:,1] = 2
B[:,:,1] = [[0,0],[0,0],[0,1]]
C = np.zeros((Q,K,K))
xhat1 = VAR(x,B, const=const)
xhat2, err2 = VARMA(x,B,C, const=const)
print(np.all(xhat2 == xhat1))
print(np.all(xhat2[P:,0] == np.correlate(x[:-1,0],np.ones(P))*2+const))
C[1,1,1] = 0.5
xhat3, err3 = VARMA(x,B,C)
x = np.r_[np.zeros((P,K)),x] #prepend initial conditions
xhat4, err4 = VARMA(x,B,C)
C[1,1,1] = 1
B[:,:,1] = [[0,0],[0,0],[0,1]]
xhat5, err5 = VARMA(x,B,C)
#print(err5)
#in differences
#VARMA(np.diff(x,axis=0),B,C)
#Note:
# * signal correlate applies same filter to all columns if kernel.shape[1]<K
# e.g. signal.correlate(x0,np.ones((3,1)),'valid')
# * if kernel.shape[1]==K, then `valid` produces a single column
# -> possible to run signal.correlate K times with different filters,
# see the following example, which replicates VAR filter
x0 = np.column_stack([np.arange(T), 2*np.arange(T)])
B[:,:,0] = np.ones((P,K))
B[:,:,1] = np.ones((P,K))
B[1,1,1] = 0
xhat0 = VAR(x0,B)
xcorr00 = signal.correlate(x0,B[:,:,0])#[:,0]
xcorr01 = signal.correlate(x0,B[:,:,1])
print(np.all(signal.correlate(x0,B[:,:,0],'valid')[:-1,0]==xhat0[P:,0]))
print(np.all(signal.correlate(x0,B[:,:,1],'valid')[:-1,0]==xhat0[P:,1]))
#import error
#from movstat import acovf, acf
from statsmodels.tsa.stattools import acovf, acf
aav = acovf(x[:,0])
print(aav[0] == np.var(x[:,0]))
aac = acf(x[:,0])
|