File: create.py

package info (click to toggle)
dune-common 2.11.0-1~exp2
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 6,056 kB
  • sloc: cpp: 54,404; python: 4,136; sh: 1,657; makefile: 17
file content (228 lines) | stat: -rw-r--r-- 9,289 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# SPDX-FileCopyrightInfo: Copyright © DUNE Project contributors, see file LICENSE.md in module root
# SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception

import importlib
import sys
import pkgutil
import importlib
import logging
import inspect
from urllib.parse import urlparse
from pathlib import Path
import json, os

import dune.common.module
import dune.common
import dune.generator

logger = logging.getLogger(__name__)
_create_map = dict()

package = dune
prefix = package.__name__ + "."
subpackages = []
logMsg = "Importing create registries from [ "

# first import all 'dune' subpackages and collect the 'registry' dicts
dunesubmodules = set()
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__, prefix):
    if ispkg: dunesubmodules.add(modname)
# add editable packages (newer Python versions)
try:
    from importlib.metadata import Distribution, packages_distributions
    mods = packages_distributions()['dune']
    for m in mods:
        direct_url = Distribution.from_name(m).read_text("direct_url.json")
        try:
            editablePath = json.loads(direct_url).get("url")
            editablePath = urlparse(editablePath).path
            modPath = Path(os.path.join(editablePath,"dune"))
            for item in modPath.iterdir():
                if item == "data": continue
                if os.path.isdir(item):
                    if "__init__.py" in os.listdir(item):
                        modname = os.path.basename(item)
                        dunesubmodules.add(f'dune.{modname}')
        except Exception as e:
            pass
except (ImportError, KeyError):  # no dune module was installed which can happen during packaging
    pass

for modname in dunesubmodules:
    # can just use modname here if registry is part of __init__ file
    try:
        # Note: modname.__init__ is imported so be aware of
        # possible side effects
        module = importlib.import_module(modname)
    except ImportError as e:
        logger.debug('failed to import ' + modname + ': ' + str(e) + '.')
        continue

    # read the registry
    try:
        moduleRegistry = module.registry.items()
        logMsg = logMsg + modname + " "
    except AttributeError:
        logger.debug('Module ' + modname + ' does not provide a registry.')
        continue

    # combine all registries
    for obj, registry in moduleRegistry:
        objmap = dict()
        try:
            objmap = _create_map[obj]
        except KeyError:
            _create_map[obj] = objmap
        for key, value in registry.items():
            if key in objmap:
                raise RuntimeError('Key \'' + key + '\' registered twice for \'' + obj + '\'.')
            else:
                objmap[key.upper().lower()] = [value,modname]

# the grids registry also provide view -
# so we will add them to the 'view' entry
# _create_map.setdefault("view",{}).update(_create_map["grid"])
_create_map.setdefault("view",{}).update(_create_map.get("grid",{}))

logMsg = logMsg + "]"
logger.debug(logMsg)

############################################################################
def get(category=None,entry=None):
    entry_ = _create_map.get(category,None)
    if entry_ is None:
        if category is not None:
            print("category '",category,"' not valid,",end="")
        print("available categories are:\n",
                ','.join(k for k in sorted(_create_map)))
        return
    if entry is None:
        print("available entries for this category are:")
        entries = []
        colLength = [0,0,0]
        for k,e in entry_.items():
            n = e[0].__module__.split(".")
            if n[-1][0]=="_":
                del n[-1]
            entries += [ [k,e[0].__name__, '.'.join(m for m in n)] ]
            colLength[0] = max(colLength[0],len(entries[-1][0]))
            colLength[1] = max(colLength[1],len(entries[-1][1]))
            colLength[2] = max(colLength[2],len(entries[-1][2]))
        entries.sort()
        print("entry".ljust(colLength[0]),
              "function".ljust(colLength[1]),
              "module".ljust(colLength[2]))
        print("-"*sum(colLength))
        for e in entries:
            print(e[0].ljust(colLength[0]),
                  e[1].ljust(colLength[1]),
                  e[2].ljust(colLength[2]))
        print("-"*sum(colLength))
    else:
        entry__ = entry_.get(entry,None)
        if entry__ is None:
            print("available entries are:",
                    ','.join(k for k in entry_))
            return
        return entry__[0]

############################################################################
## the second part is for the 'load' method

# a helper class
class Empty:
    pass

def signatureDict(func):
    # get signature from func, we simply fill a dictionary with the name of
    # all non var argument of the function as key and containing a either
    # Empty or the default argument provided by the function signature

    ret = {}
    sig = inspect.signature(func)
    for p,v in sig.parameters.items():
        # we only extract positional or keyword argument (i.e. not  *args,**kwargs)
        if v.kind == v.POSITIONAL_OR_KEYWORD:
            name = v.name
            default = v.default if not v.default is v.empty else Empty
            ret.update({name:default})
    return ret

def _creatorCall(create, usedKeys, *args, **kwargs):
    # get signature of create function to call
    signature = signatureDict(create)
    # check if any of the parameter names correspond to some creator -
    # if a creator exists for that function name and the value passed in by
    # the user for that parameter is a string, call the creator otherwise
    # use the object provided. If no creator exists use the value
    # provided by the user or the default value.
    for name in signature:
        # special treatment of 'view'/'grid' parameter since a 'grid' is
        # also a view
        if name=='view' and not name in kwargs and 'grid' in kwargs:
            kwargs.update({"view":kwargs["grid"]})
            usedKeys.update(["grid"])
        creator = globals().get(name)
        if creator: # a creator for this parameter name exists
            assert signature[name] == Empty, "argument in create method corresponding to creatibles should not have default values"
            argument = kwargs.get(name, Empty)
            assert not argument == Empty, "required creatable argument " + argument + " not provided"
            if isinstance(argument,str):
                # recursion
                argument = argument.upper().lower()
                paramCreator = creator.registry[argument][0]
                signature[name] = _creatorCall(paramCreator, usedKeys, *args,**kwargs)
                kwargs[name] = signature[name] # replace the string with the actual object
            else:
                signature[name] = argument # store the object provided
            usedKeys.update([name])
        else: # no creator available
            argument = kwargs.get(name, Empty)
            if argument == Empty:
                assert not signature[name] == Empty, "no value for argument " + name + " provided"
                kwargs[name] = argument
            else:
                signature[name] = argument
                usedKeys.update([name])
    return create(**signature)

def creatorCall(self, key, *args, **kwargs):
    key = key.upper().lower()
    try:
        create = self.registry[key][0]
    except KeyError:
        raise RuntimeError('No ' + self.obj + ' implementation: ' + key +\
                '. Available: ' + ' '.join(r for r in self.registry) + '.' )
    # the complex creation mechanism is only allowed with named arguments
    # if positional arguments have been used, call the original function directly
    # without further checking the parameters
    if len(args)>0:
        return create(*args, **kwargs)
    else:
        usedKeys = set()
        # make a fix here for grids/views
        if 'grid' in kwargs and not self.obj == 'view' and not 'view' in kwargs:
            kwargs.update({'view':kwargs['grid']})
            usedKeys.update(['grid'])
        instance = _creatorCall(create,usedKeys,*args,**kwargs)
        assert set(kwargs) == usedKeys, "some provided named parameters where not used"
        return instance

##########################################################################
## for each 'registry' entry add a function to this module
for obj, registry in _create_map.items():
    # docs = "\n".join(k+" from "+v[1] for k,v in registry.items())
    docs_format = "{:<25}" * (2)
    docs = docs_format.format("key", "module") + "\n"
    docs = docs + docs_format.format("----------", "----------") + "\n"
    for k,v in registry.items():
        docs = docs + docs_format.format(k,v[1]) + "\n"

    attribs = {k: staticmethod(v[0]) for k, v in registry.items()}
    attribs.update({"__call__": creatorCall, "registry": registry, "obj": obj})
    C = type(str(obj), (object,), attribs)
    c = C()
    c.__doc__ = "Create a dune grid instance, available choices are:\n"+docs
    setattr(sys.modules[__name__], obj, c)
    logger.debug("added create."+obj+" with keys: \n"+\
            "\n".join("   "+k+" from subpackage "+v[1] for k,v in registry.items()))