File: mixture.py

package info (click to toggle)
sasmodels 1.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 15,888 kB
  • sloc: python: 25,392; ansic: 7,377; makefile: 149; sh: 61
file content (341 lines) | stat: -rw-r--r-- 14,131 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""
Mixture model
-------------

The product model multiplies the structure factor by the form factor,
modulated by the effective radius of the form.  The resulting model
has a attributes of both the model description (with parameters, etc.)
and the module evaluator (with call, release, etc.).

To use it, first load form factor P and structure factor S, then create
*ProductModel(P, S)*.
"""
from __future__ import print_function

from copy import copy
from collections import OrderedDict

import numpy as np  # type: ignore

from .modelinfo import Parameter, ParameterTable, ModelInfo
from .kernel import KernelModel, Kernel
from .details import make_details

# pylint: disable=unused-import
try:
    from typing import List
except ImportError:
    pass
# pylint: enable=unused-import

def make_mixture_info(parts, operation='+'):
    # type: (List[ModelInfo]) -> ModelInfo
    """
    Create info block for mixture model.
    """
    # Build new parameter list
    combined_pars = []

    all_parts = copy(parts)
    is_flat = False
    while not is_flat:
        is_flat = True
        for part in all_parts:
            if part.composition and part.composition[0] == 'mixture' and \
                len(part.composition[1]) > 1:
                all_parts += part.composition[1]
                all_parts.remove(part)
                is_flat = False

    # When creating a mixture model that is a sum of product models (ie (1*2)+(3*4))
    # the parameters for models 1 & 2 will be prefixed with A & B respectively,
    # but so will the parameters for models 3 & 4. We need to rename models 3 & 4
    # so that they are prefixed with C & D to avoid overlap of parameter names.
    used_prefixes = []
    for part in parts:
        i = 0
        if part.composition and part.composition[0] == 'mixture':
            npars_list = [info.parameters.npars for info in part.composition[1]]
            for npars in npars_list:
                # List of params of one of the constituent models of part
                submodel_pars = part.parameters.kernel_parameters[i:i+npars]
                # Prefix of the constituent model
                prefix = submodel_pars[0].name[0]
                if prefix not in used_prefixes: # Haven't seen this prefix so far
                    used_prefixes.append(prefix)
                    i += npars
                    continue
                while prefix in used_prefixes:
                    # This prefix has been already used, so change it to the
                    # next letter that hasn't been used
                    prefix = chr(ord(prefix) + 1)
                used_prefixes.append(prefix)
                prefix += "_"
                # Update the parameters of this constituent model to use the
                # new prefix
                for par in submodel_pars:
                    par.id = prefix + par.id[2:]
                    par.name = prefix + par.name[2:]
                    if par.length_control is not None:
                        par.length_control = prefix + par.length_control[2:]
                i += npars

    for part in parts:
        # Parameter prefix per model, A_, B_, ...
        # Note that prefix must also be applied to id and length_control
        # to support vector parameters
        prefix = ''
        if not part.composition:
            # Model isn't a composition model, so it's parameters don't have a
            # a prefix. Add the next available prefix
            prefix = chr(ord('A')+len(used_prefixes))
            used_prefixes.append(prefix)
            prefix += '_'

        if operation == '+':
            # If model is a sum model, each constituent model gets its own scale parameter
            scale_prefix = prefix
            if prefix == '' and getattr(part, "operation", '') == '*':
                # `part` is a composition product model. Find the prefixes of
                # it's parameters to form a new prefix for the scale.
                # For example, a model with A*B*C will have ABC_scale.
                sub_prefixes = []
                for param in part.parameters.kernel_parameters:
                    # Prefix of constituent model
                    sub_prefix = param.id.split('_')[0]
                    if sub_prefix not in sub_prefixes:
                        sub_prefixes.append(sub_prefix)
                # Concatenate sub_prefixes to form prefix for the scale
                scale_prefix = ''.join(sub_prefixes) + '_'
            scale = Parameter(scale_prefix + 'scale', default=1.0,
                              description="model intensity for " + part.name)
            combined_pars.append(scale)
        for p in part.parameters.kernel_parameters:
            p = copy(p)
            p.name = prefix + p.name
            p.id = prefix + p.id
            if p.length_control is not None:
                p.length_control = prefix + p.length_control
            combined_pars.append(p)
    parameters = ParameterTable(combined_pars)
    # Allow for the scenario in which each component has all its PD parameters
    # active simultaneously.  details.make_details() will throw an error if
    # too many are used from any one component.
    parameters.max_pd = sum(part.parameters.max_pd for part in parts)

    def random():
        """Random set of model parameters for mixture model"""
        combined_pars = {}
        for k, part in enumerate(parts):
            prefix = chr(ord('A')+k) + '_'
            pars = part.random()
            combined_pars.update((prefix+k, v) for k, v in pars.items())
        return combined_pars

    model_info = ModelInfo()
    model_info.id = operation.join(part.id for part in parts)
    model_info.operation = operation
    model_info.name = '(' + operation.join(part.name for part in parts) + ')'
    model_info.filename = None
    model_info.title = 'Mixture model with ' + model_info.name
    model_info.description = model_info.title
    model_info.docs = model_info.title
    model_info.category = "custom"
    model_info.parameters = parameters
    model_info.random = random
    #model_info.single = any(part['single'] for part in parts)
    model_info.structure_factor = False
    model_info.variant_info = None
    #model_info.tests = []
    #model_info.source = []
    # Remember the component info blocks so we can build the model
    model_info.composition = ('mixture', parts)
    return model_info


class MixtureModel(KernelModel):
    """
    Model definition for mixture of models.
    """
    def __init__(self, model_info, parts):
        # type: (ModelInfo, List[KernelModel]) -> None
        self.info = model_info
        self.parts = parts
        self.dtype = parts[0].dtype

    def make_kernel(self, q_vectors):
        # type: (List[np.ndarray]) -> MixtureKernel
        # Note: may be sending the q_vectors to the n times even though they
        # are only needed once.  It would mess up modularity quite a bit to
        # handle this optimally, especially since there are many cases where
        # separate q vectors are needed (e.g., form in python and structure
        # in opencl; or both in opencl, but one in single precision and the
        # other in double precision).
        kernels = [part.make_kernel(q_vectors) for part in self.parts]
        return MixtureKernel(self.info, kernels, q_vectors)
    make_kernel.__doc__ = KernelModel.make_kernel.__doc__

    def release(self):
        # type: () -> None
        """Free resources associated with the model."""
        for part in self.parts:
            part.release()
    release.__doc__ = KernelModel.release.__doc__


def _intermediates(q, results):
    # type: (List[Tuple[Kernel, np.ndarray, Optional[Callable]]]) -> OrderedDict[str, Any]
    """
    Returns intermediate results for mixture model.
    """
    parts = OrderedDict()
    for part_num, (kernel, Iq, intermediates) in enumerate(results):
        part_id = chr(ord('A') + part_num) + "_" + kernel.info.name + "(Q)"
        parts[part_id] = (q, Iq)
        if intermediates is not None:
            parts[part_id + " parts"] = intermediates()
    return parts


class MixtureKernel(Kernel):
    """
    Instantiated kernel for mixture of models.
    """
    def __init__(self, model_info, kernels, q):
        # type: (ModelInfo, List[Kernel], Tuple[np.ndarray]) -> None
        self.dim = kernels[0].dim
        self.info = model_info
        self.q = q
        self.kernels = kernels
        self.dtype = self.kernels[0].dtype
        self.operation = model_info.operation
        self.results = None  # type: Callable[[], OrderedDict]

    def Iq(self, call_details, values, cutoff, magnetic):
        # type: (CallDetails, np.ndarray, np.ndarry, float, bool) -> np.ndarray
        scale, background = values[0:2]
        total = 0.0
        # remember the parts for plotting later
        results = []
        parts = _MixtureParts(self.info, self.kernels, call_details, values)
        for kernel, kernel_details, kernel_values in parts:
            #print("calling kernel", kernel.info.name)
            result = kernel(kernel_details, kernel_values, cutoff, magnetic)
            result = np.array(result).astype(kernel.dtype)
            # print(kernel.info.name, result)
            if self.operation == '+':
                total += result
            elif self.operation == '*':
                if np.all(total) == 0.0:
                    total = result
                else:
                    total *= result
            results.append((kernel, result, getattr(kernel, 'results', None)))

        self.results = lambda: _intermediates(self.q, results)

        return scale*total + background

    Iq.__doc__ = Kernel.Iq.__doc__
    __call__ = Iq

    def release(self):
        # type: () -> None
        """Free resources associated with the kernel."""
        for k in self.kernels:
            k.release()


# Note: _MixtureParts doesn't implement iteration correctly, and only allows
# a single iterator to be active at once.  It doesn't matter in this case
# since _MixtureParts is only used in one place, but it is not clean style.
class _MixtureParts(object):
    """
    Mixture component iterator.
    """
    def __init__(self, model_info, kernels, call_details, values):
        # type: (ModelInfo, List[Kernel], CallDetails, np.ndarray) -> None
        self.model_info = model_info
        self.parts = model_info.composition[1]
        self.kernels = kernels
        self.call_details = call_details
        self.values = values
        self.spin_index = model_info.parameters.npars + 2
        # The following are redefined by __iter__, but set them here so that
        # lint complains a little less.
        self.part_num = -1
        self.par_index = -1
        self.mag_index = -1
        #call_details.show(values)

    def __iter__(self):
        # type: () -> PartIterable
        self.part_num = 0
        self.par_index = 2
        self.mag_index = self.spin_index + 3
        return self

    def __next__(self):
        # type: () -> Tuple[List[Callable], CallDetails, np.ndarray]
        if self.part_num >= len(self.parts):
            raise StopIteration()
        info = self.parts[self.part_num]
        kernel = self.kernels[self.part_num]
        call_details = self._part_details(info, self.par_index)
        values = self._part_values(info, self.par_index, self.mag_index)
        values = values.astype(kernel.dtype)
        #call_details.show(values)

        self.part_num += 1
        self.par_index += info.parameters.npars
        if self.model_info.operation == '+':
            self.par_index += 1 # Account for each constituent model's scale param
        self.mag_index += 3 * len(info.parameters.magnetism_index)

        return kernel, call_details, values

    # CRUFT: py2 support
    next = __next__

    def _part_details(self, info, par_index):
        # type: (ModelInfo, int) -> CallDetails
        full = self.call_details
        # par_index is index into values array of the current parameter,
        # which includes the initial scale and background parameters.
        # We want the index into the weight length/offset for each parameter.
        # Exclude the initial scale and background, so subtract two. If we're
        # building an addition model, each component has its own scale factor
        # which we need to skip when constructing the details for the kernel, so
        # add one, giving a net subtract one.
        diff = 1 if self.model_info.operation == '+' else 2
        index = slice(par_index - diff, par_index - diff + info.parameters.npars)
        length = full.length[index]
        offset = full.offset[index]
        # The complete weight vector is being sent to each part so that
        # offsets don't need to be adjusted.
        part = make_details(info, length, offset, full.num_weights)
        return part

    def _part_values(self, info, par_index, mag_index):
        # type: (ModelInfo, int, int) -> np.ndarray
        # Set each constituent model's scale to 1 if this is a multiplication model
        scale = self.values[par_index] if self.model_info.operation == '+' else 1.0
        diff = 1 if self.model_info.operation == '+' else 0 # Skip scale if addition model
        pars = self.values[par_index + diff:par_index + info.parameters.npars + diff]
        nmagnetic = len(info.parameters.magnetism_index)
        if nmagnetic:
            spin_state = self.values[self.spin_index:self.spin_index + 3]
            mag_index = self.values[mag_index:mag_index + 3 * nmagnetic]
        else:
            spin_state = []
            mag_index = []
        nvalues = self.model_info.parameters.nvalues
        nweights = self.call_details.num_weights
        weights = self.values[nvalues:nvalues+2*nweights]
        zero = self.values.dtype.type(0.)
        values = [[scale, zero], pars, spin_state, mag_index, weights]
        # Pad value array to a 32 value boundary
        spacer = (32 - sum(len(v) for v in values)%32)%32
        values.append([zero]*spacer)
        values = np.hstack(values).astype(self.kernels[0].dtype)
        return values