File: local.py

package info (click to toggle)
python-os-collect-config 14.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 360 kB
  • sloc: python: 3,089; makefile: 19
file content (107 lines) | stat: -rw-r--r-- 3,730 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import locale
import os
import stat

from oslo_config import cfg
from oslo_log import log

from os_collect_config import exc

LOCAL_DEFAULT_PATHS = ['/var/lib/os-collect-config/local-data']
CONF = cfg.CONF

opts = [
    cfg.MultiStrOpt('path',
                    default=LOCAL_DEFAULT_PATHS,
                    help='Local directory to scan for Metadata files.')
]
name = 'local'
logger = log.getLogger(__name__)


def _dest_looks_insecure(local_path):
    '''We allow group writable so owner can let others write.'''
    looks_insecure = False
    uid = os.getuid()
    st = os.stat(local_path)
    if uid != st[stat.ST_UID]:
        logger.error('%s is owned by another user. This is a'
                     ' security risk.' % local_path)
        looks_insecure = True
    if st.st_mode & stat.S_IWOTH:
        logger.error('%s is world writable. This is a security risk.'
                     % local_path)
        looks_insecure = True
    return looks_insecure


class Collector:
    def __init__(self, requests_impl=None):
        pass

    def collect(self):
        if len(cfg.CONF.local.path) == 0:
            raise exc.LocalMetadataNotAvailable
        final_content = []
        for local_path in cfg.CONF.local.path:
            try:
                os.stat(local_path)
            except OSError:
                logger.warn("%s not found. Skipping", local_path)
                continue
            if _dest_looks_insecure(local_path):
                raise exc.LocalMetadataNotAvailable
            for data_file in os.listdir(local_path):
                if data_file.startswith('.'):
                    continue
                data_file = os.path.join(local_path, data_file)
                if os.path.isdir(data_file):
                    continue
                st = os.stat(data_file)
                if st.st_mode & stat.S_IWOTH:
                    logger.error(
                        '%s is world writable. This is a security risk.' %
                        data_file)
                    raise exc.LocalMetadataNotAvailable
                with open(data_file) as metadata:
                    try:
                        value = json.loads(metadata.read())
                    except ValueError as e:
                        logger.error(
                            '{} is not valid JSON ({})'.format(data_file, e))
                        raise exc.LocalMetadataNotAvailable
                    basename = os.path.basename(data_file)
                    final_content.append((basename, value))
        if not final_content:
            logger.info('No local metadata found (%s)' %
                        cfg.CONF.local.path)

        # Now sort specifically by C locale
        def locale_aware_by_first_item(data):
            return locale.strxfrm(data[0])

        save_locale = locale.getlocale()
        try:
            locale.setlocale(locale.LC_ALL, 'C')
            sorted_content = sorted(
                final_content, key=locale_aware_by_first_item)
        finally:
            locale.setlocale(locale.LC_ALL, save_locale)

        return sorted_content