File: fsync.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (339 lines) | stat: -rwxr-xr-x 13,185 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""!

@brief Oscillatory Neural Network based on Kuramoto model in frequency domain.
@details Implementation based on paper @cite book::chemical_oscillatorions_waves.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


import numpy
import random
import pyclustering.utils

from scipy.integrate import odeint

from pyclustering.nnet import network, conn_type, conn_represent


class fsync_dynamic:
    """!
    @brief Represents output dynamic of Sync in frequency domain.
    
    """


    def __init__(self, amplitude, time):
        """!
        @brief Constructor of Sync dynamic in frequency domain.
        
        @param[in] amplitude (list): Dynamic of oscillators on each step of simulation.
        @param[in] time (list): Simulation time where each time-point corresponds to amplitude-point.
        
        """

        self.__amplitude = amplitude
        self.__time = time


    @property
    def output(self):
        """!
        @brief (list) Returns output dynamic of the Sync network (amplitudes of each oscillator in the network) during simulation.
        
        """

        return self.__amplitude


    @property
    def time(self):
        """!
        @brief (list) Returns time-points corresponds to dynamic-points points.
        
        """

        return self.__time


    def __len__(self):
        """!
        @brief (uint) Returns number of simulation steps that are stored in dynamic.
        
        """
        
        return len(self.__amplitude)


    def __getitem__(self, index):
        """!
        @brief Indexing of the dynamic.
        
        """
        if index == 0:
            return self.__time
        
        elif index == 1:
            return self.__amplitude
        
        else:
            raise NameError('Out of range ' + index + ': only indexes 0 and 1 are supported.')


    def allocate_sync_ensembles(self, tolerance=0.1):
        """!
        @brief Allocate clusters in line with ensembles of synchronous oscillators where each synchronous ensemble corresponds to only one cluster.
        
        @param[in] tolerance (double): Maximum error for allocation of synchronous ensemble oscillators.
        
        @return (list) Grours of indexes of synchronous oscillators, for example, [ [index_osc1, index_osc3], [index_osc2], [index_osc4, index_osc5] ].
        
        """
        
        return pyclustering.utils.allocate_sync_ensembles(self.__amplitude, tolerance, 0.0)


    def extract_number_oscillations(self, index, amplitude_threshold):
        """!
        @brief Extracts number of oscillations of specified oscillator.
        
        @param[in] index (uint): Index of oscillator whose dynamic is considered.
        @param[in] amplitude_threshold (double): Amplitude threshold when oscillation is taken into account, for example,
                    when oscillator amplitude is greater than threshold then oscillation is incremented.
        
        @return (uint) Number of oscillations of specified oscillator.
        
        """
        
        return pyclustering.utils.extract_number_oscillations(self.__amplitude, index, amplitude_threshold);



class fsync_visualizer:
    """!
    @brief Visualizer of output dynamic of sync network in frequency domain.
    
    """

    @staticmethod
    def show_output_dynamic(fsync_output_dynamic):
        """!
        @brief Shows output dynamic (output of each oscillator) during simulation.
        
        @param[in] fsync_output_dynamic (fsync_dynamic): Output dynamic of the fSync network.
        
        @see show_output_dynamics
        
        """
        
        pyclustering.utils.draw_dynamics(fsync_output_dynamic.time, fsync_output_dynamic.output, x_title = "t", y_title = "amplitude");


    @staticmethod
    def show_output_dynamics(fsync_output_dynamics):
        """!
        @brief Shows several output dynamics (output of each oscillator) during simulation.
        @details Each dynamic is presented on separate plot.
        
        @param[in] fsync_output_dynamics (list): list of output dynamics 'fsync_dynamic' of the fSync network.
        
        @see show_output_dynamic
        
        """
        
        pyclustering.utils.draw_dynamics_set(fsync_output_dynamics, "t", "amplitude", None, None, False, False);



class fsync_network(network):
    """!
    @brief Model of oscillatory network that uses Landau-Stuart oscillator and Kuramoto model as a synchronization mechanism.
    @details Dynamic of each oscillator in the network is described by following differential Landau-Stuart equation with feedback:
    
    \f[
    \dot{z}_{i} = (i\omega_{i} + \rho^{2}_{i} - |z_{i}|^{2} )z_{i} + \frac{1}{N}\sum_{j=0}^{N}k_{ij}(z_{j} - z_{i});
    \f]
    
    Where left part of the equation is Landau-Stuart equation and the right is a Kuramoto model for synchronization.
    For solving this equation Runge-Kutta 4 method is used by default.
    
    Example:
    @code
        # Prepare oscillatory network parameters.
        amount_oscillators = 3;
        frequency = 1.0;
        radiuses = [1.0, 2.0, 3.0];
        coupling_strength = 1.0;
        
        # Create oscillatory network
        oscillatory_network = fsync_network(amount_oscillators, frequency, radiuses, coupling_strength);
        
        # Simulate network during 200 steps on 10 time-units of time-axis.
        output_dynamic = oscillatory_network.simulate(200, 10, True);    # True is to collect whole output dynamic.
        
        # Visualize output result
        fsync_visualizer.show_output_dynamic(output_dynamic);
    @endcode
    
    Example of output dynamic of the network:
    @image html fsync_sync_examples.png
    
    """
    
    __DEFAULT_FREQUENCY_VALUE = 1.0;
    __DEFAULT_RADIUS_VALUE = 1.0;
    __DEFAULT_COUPLING_STRENGTH = 1.0;


    def __init__(self, num_osc, factor_frequency = 1.0, factor_radius = 1.0, factor_coupling = 1.0, type_conn = conn_type.ALL_TO_ALL, representation = conn_represent.MATRIX):
        """!
        @brief Constructor of oscillatory network based on synchronization Kuramoto model and Landau-Stuart oscillator.
        
        @param[in] num_osc (uint): Amount oscillators in the network.
        @param[in] factor_frequency (double|list): Frequency of oscillators, it can be specified as common value for all oscillators by
                    single double value and for each separately by list.
        @param[in] factor_radius (double|list): Radius of oscillators that affects amplitude, it can be specified as common value for all oscillators by
                    single double value and for each separately by list.
        @param[in] factor_coupling (double): Coupling strength between oscillators.
        @param[in] type_conn (conn_type): Type of connection between oscillators in the network (all-to-all, grid, bidirectional list, etc.).
        @param[in] representation (conn_represent): Internal representation of connection in the network: matrix or list.
        
        """
        
        super().__init__(num_osc, type_conn, representation);
        
        self.__frequency = factor_frequency if isinstance(factor_frequency, list) else [ fsync_network.__DEFAULT_FREQUENCY_VALUE * factor_frequency for _ in range(num_osc) ];
        self.__radius = factor_radius if isinstance(factor_radius, list) else [ fsync_network.__DEFAULT_RADIUS_VALUE * factor_radius for _ in range(num_osc) ];
        self.__coupling_strength = fsync_network.__DEFAULT_COUPLING_STRENGTH * factor_coupling;
        self.__properties = [ self.__oscillator_property(index) for index in range(self._num_osc) ];
        
        random.seed();
        self.__amplitude = [ random.random() for _ in range(num_osc) ];


    def simulate(self, steps, time, collect_dynamic = False):
        """!
        @brief Performs static simulation of oscillatory network.
        
        @param[in] steps (uint): Number simulation steps.
        @param[in] time (double): Time of simulation.
        @param[in] collect_dynamic (bool): If True - returns whole dynamic of oscillatory network, otherwise returns only last values of dynamics.
        
        @return (list) Dynamic of oscillatory network. If argument 'collect_dynamic' is True, than return dynamic for the whole simulation time,
                 otherwise returns only last values (last step of simulation) of output dynamic.
        
        @see simulate()
        @see simulate_dynamic()
        
        """
        
        dynamic_amplitude, dynamic_time = ([], []) if collect_dynamic is False else ([self.__amplitude], [0]);
        
        step = time / steps;
        int_step = step / 10.0;
        
        for t in numpy.arange(step, time + step, step):
            self.__amplitude = self.__calculate(t, step, int_step);
            
            if collect_dynamic is True:
                dynamic_amplitude.append([ numpy.real(amplitude)[0] for amplitude in self.__amplitude ]);
                dynamic_time.append(t);
        
        if collect_dynamic is False:
            dynamic_amplitude.append([ numpy.real(amplitude)[0] for amplitude in self.__amplitude ]);
            dynamic_time.append(time);

        output_sync_dynamic = fsync_dynamic(dynamic_amplitude, dynamic_time);
        return output_sync_dynamic;


    def __calculate(self, t, step, int_step):
        """!
        @brief Calculates new amplitudes for oscillators in the network in line with current step.
        
        @param[in] t (double): Time of simulation.
        @param[in] step (double): Step of solution at the end of which states of oscillators should be calculated.
        @param[in] int_step (double): Step differentiation that is used for solving differential equation.
        
        @return (list) New states (phases) for oscillators.
        
        """
        
        next_amplitudes = [0.0] * self._num_osc;
        
        for index in range (0, self._num_osc, 1):
            z = numpy.array(self.__amplitude[index], dtype = numpy.complex128, ndmin = 1);
            result = odeint(self.__calculate_amplitude, z.view(numpy.float64), numpy.arange(t - step, t, int_step), (index , ));
            next_amplitudes[index] = (result[len(result) - 1]).view(numpy.complex128);
        
        return next_amplitudes;


    def __oscillator_property(self, index):
        """!
        @brief Calculate Landau-Stuart oscillator constant property that is based on frequency and radius.
        
        @param[in] index (uint): Oscillator index whose property is calculated.
        
        @return (double) Oscillator property.
        
        """
        
        return numpy.array(1j * self.__frequency[index] + self.__radius[index]**2, dtype = numpy.complex128, ndmin = 1);


    def __landau_stuart(self, amplitude, index):
        """!
        @brief Calculate Landau-Stuart state.
        
        @param[in] amplitude (double): Current amplitude of oscillator.
        @param[in] index (uint): Oscillator index whose state is calculated. 
        
        @return (double) Landau-Stuart state.
        
        """
        
        return (self.__properties[index] - numpy.absolute(amplitude) ** 2) * amplitude;


    def __synchronization_mechanism(self, amplitude, index):
        """!
        @brief Calculate synchronization part using Kuramoto synchronization mechanism.
        
        @param[in] amplitude (double): Current amplitude of oscillator.
        @param[in] index (uint): Oscillator index whose synchronization influence is calculated.
        
        @return (double) Synchronization influence for the specified oscillator.
        
        """
        
        sync_influence = 0.0;
        
        for k in range(self._num_osc):
            if self.has_connection(index, k) is True:
                amplitude_neighbor = numpy.array(self.__amplitude[k], dtype = numpy.complex128, ndmin = 1);
                sync_influence += amplitude_neighbor - amplitude;
        
        return sync_influence * self.__coupling_strength / self._num_osc;


    def __calculate_amplitude(self, amplitude, t, argv):
        """!
        @brief Returns new amplitude value for particular oscillator that is defined by index that is in 'argv' argument.
        @details The method is used for differential calculation.
        
        @param[in] amplitude (double): Current amplitude of oscillator.
        @param[in] t (double): Current time of simulation.
        @param[in] argv (uint): Index of the current oscillator.
        
        @return (double) New amplitude of the oscillator.
        
        """
        
        z = amplitude.view(numpy.complex);
        dzdt = self.__landau_stuart(z, argv) + self.__synchronization_mechanism(z, argv);
        
        return dzdt.view(numpy.float64);