File: csrf.py

package info (click to toggle)
python-fedora 1.1.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,436 kB
  • sloc: python: 3,362; xml: 107; makefile: 14
file content (307 lines) | stat: -rw-r--r-- 12,944 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#
# -*- coding: utf-8 -*-
#
# Copyright (C) 2008-2011  Red Hat, Inc.
# This file is part of python-fedora
#
# python-fedora is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# python-fedora is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with python-fedora; if not, see <http://www.gnu.org/licenses/>
#
'''
Cross-site Request Forgery Protection.

http://en.wikipedia.org/wiki/Cross-site_request_forgery


.. moduleauthor:: John (J5) Palmieri <johnp@redhat.com>
.. moduleauthor:: Luke Macken <lmacken@redhat.com>

.. versionadded:: 0.3.17
'''

from hashlib import sha1
import logging

from munch import Munch
from kitchen.text.converters import to_bytes
from webob import Request
try:
    # webob > 1.0
    from webob.headers import ResponseHeaders
except ImportError:
    # webob < 1.0
    from webob.headerdict import HeaderDict as ResponseHeaders
from paste.httpexceptions import HTTPFound
from paste.response import replace_header
from repoze.who.interfaces import IMetadataProvider
from zope.interface import implements

from fedora.urlutils import update_qs

log = logging.getLogger(__name__)


class CSRFProtectionMiddleware(object):
    '''
    CSRF Protection WSGI Middleware.

    A layer of WSGI middleware that is responsible for making sure
    authenticated requests originated from the user inside of the app's domain
    and not a malicious website.

    This middleware works with the :mod:`repoze.who` middleware, and requires
    that it is placed below :mod:`repoze.who` in the WSGI stack,
    since it relies upon ``repoze.who.identity`` to exist in the environ before
    it is called.

    To utilize this middleware, you can just add it to your WSGI stack below
    the :mod:`repoze.who` middleware.  Here is an example of utilizing the
    `CSRFProtectionMiddleware` within a TurboGears2 application.
    In your ``project/config/middleware.py``, you would wrap your main
    application with the `CSRFProtectionMiddleware`, like so:

    .. code-block:: python

        from fedora.wsgi.csrf import CSRFProtectionMiddleware
        def make_app(global_conf, full_stack=True, **app_conf):
            app = make_base_app(global_conf, wrap_app=CSRFProtectionMiddleware,
                                full_stack=full_stack, **app_conf)

    You then need to add the CSRF token to every url that you need to be
    authenticated for.  When used with TurboGears2, an overridden version of
    :func:`tg.url` is provided.  You can use it directly by calling::

        from fedora.tg2.utils import url
        [...]
        url = url('/authentication_needed')

    An easier and more portable way to use that is from within TG2 to set this
    up is to use :func:`fedora.tg2.utils.enable_csrf` when you setup your
    application.  This function will monkeypatch TurboGears2's :func:`tg.url`
    so that it adds a csrf token to urls.  This way, you can keep the same
    code in your templates and controller methods whether or not you configure
    the CSRF middleware to provide you with protection via
    :func:`~fedora.tg2.utils.enable_csrf`.
    '''

    def __init__(self, application, csrf_token_id='_csrf_token',
                 clear_env='repoze.who.identity repoze.what.credentials',
                 token_env='CSRF_TOKEN', auth_state='CSRF_AUTH_STATE'):
        '''
        Initialize the CSRF Protection WSGI Middleware.

        :csrf_token_id: The name of the CSRF token variable
        :clear_env: Variables to clear out of the `environ` on invalid token
        :token_env: The name of the token variable in the environ
        :auth_state: The environ key that will be set when we are logging in
        '''
        log.info('Creating CSRFProtectionMiddleware')
        self.application = application
        self.csrf_token_id = csrf_token_id
        self.clear_env = clear_env.split()
        self.token_env = token_env
        self.auth_state = auth_state

    def _clean_environ(self, environ):
        ''' Delete the ``keys`` from the supplied ``environ`` '''
        log.debug('clean_environ(%s)' % to_bytes(self.clear_env))
        for key in self.clear_env:
            if key in environ:
                log.debug('Deleting %(key)s from environ' %
                          {'key': to_bytes(key)})
                del(environ[key])

    def __call__(self, environ, start_response):
        '''
        This method is called for each request.  It looks for a user-supplied
        CSRF token in the GET/POST parameters, and compares it to the token
        attached to ``environ['repoze.who.identity']['_csrf_token']``.  If it
        does not match, or if a token is not provided, it will remove the
        user from the ``environ``, based on the ``clear_env`` setting.
        '''
        request = Request(environ)
        log.debug('CSRFProtectionMiddleware(%(r_path)s)' %
                  {'r_path': to_bytes(request.path)})

        token = environ.get('repoze.who.identity', {}).get(self.csrf_token_id)
        csrf_token = environ.get(self.token_env)

        if token and csrf_token and token == csrf_token:
            log.debug('User supplied CSRF token matches environ!')
        else:
            if not environ.get(self.auth_state):
                log.debug('Clearing identity')
                self._clean_environ(environ)
                if 'repoze.who.identity' not in environ:
                    environ['repoze.who.identity'] = Munch()
                if 'repoze.who.logins' not in environ:
                    # For compatibility with friendlyform
                    environ['repoze.who.logins'] = 0
                if csrf_token:
                    log.warning('Invalid CSRF token.  User supplied'
                                ' (%(u_token)s) does not match what\'s in our'
                                ' environ (%(e_token)s)' %
                                {'u_token': to_bytes(csrf_token),
                                 'e_token': to_bytes(token)})

        response = request.get_response(self.application)

        if environ.get(self.auth_state):
            log.debug('CSRF_AUTH_STATE; rewriting headers')
            token = environ.get('repoze.who.identity', {})\
                           .get(self.csrf_token_id)

            loc = update_qs(
                response.location, {self.csrf_token_id: str(token)})
            response.location = loc
            log.debug('response.location = %(r_loc)s' %
                      {'r_loc': to_bytes(response.location)})
            environ[self.auth_state] = None

        return response(environ, start_response)


class CSRFMetadataProvider(object):
    '''
    Repoze.who CSRF Metadata Provider Plugin.

    This metadata provider is called with an authenticated users identity
    automatically by repoze.who.  It will then take the SHA1 hash of the
    users session cookie, and set it as the CSRF token in
    ``environ['repoze.who.identity']['_csrf_token']``.

    This plugin will also set ``CSRF_AUTH_STATE`` in the environ if the user
    has just authenticated during this request.

    To enable this plugin in a TurboGears2 application, you can
    add the following to your ``project/config/app_cfg.py``

    .. code-block:: python

        from fedora.wsgi.csrf import CSRFMetadataProvider
        base_config.sa_auth.mdproviders = [('csrfmd', CSRFMetadataProvider())]

    Note: If you use the faswho plugin, this is turned on automatically.
    '''
    implements(IMetadataProvider)

    def __init__(self, csrf_token_id='_csrf_token', session_cookie='tg-visit',
                 clear_env='repoze.who.identity repoze.what.credentials',
                 login_handler='/post_login', token_env='CSRF_TOKEN',
                 auth_session_id='CSRF_AUTH_SESSION_ID',
                 auth_state='CSRF_AUTH_STATE'):
        '''
        Create the CSRF Metadata Provider Plugin.

        :kwarg csrf_token_id: The name of the CSRF token variable. The
            identity will contain an entry with this as key and the
            computed csrf_token as the value.
        :kwarg session_cookie: The name of the session cookie
        :kwarg login_handler: The path to the login handler, used to determine
            if the user logged in during this request
        :kwarg token_env: The name of the token variable in the environ.
            The environ will contain the token from the request
        :kwarg auth_session_id: The environ key containing an optional
            session id
        :kwarg auth_state: The environ key that indicates when we are
            logging in
        '''
        self.csrf_token_id = csrf_token_id
        self.session_cookie = session_cookie
        self.clear_env = clear_env
        self.login_handler = login_handler
        self.token_env = token_env
        self.auth_session_id = auth_session_id
        self.auth_state = auth_state

    def strip_script(self, environ, path):
        # Strips the script portion of a url path so the middleware works even
        # when mounted under a path other than root
        if path.startswith('/') and 'SCRIPT_NAME' in environ:
            prefix = environ.get('SCRIPT_NAME')
            if prefix.endswith('/'):
                prefix = prefix[:-1]

            if path.startswith(prefix):
                path = path[len(prefix):]

        return path

    def add_metadata(self, environ, identity):
        request = Request(environ)
        log.debug('CSRFMetadataProvider.add_metadata(%(r_path)s)'
                  % {'r_path': to_bytes(request.path)})

        session_id = environ.get(self.auth_session_id)
        if not session_id:
            session_id = request.cookies.get(self.session_cookie)
        log.debug('session_id = %(s_id)r' % {'s_id':
                                             to_bytes(session_id)})

        if session_id and session_id != 'Set-Cookie:':
            environ[self.auth_session_id] = session_id
            token = sha1(session_id).hexdigest()
            identity.update({self.csrf_token_id: token})
            log.debug('Identity updated with CSRF token')
            path = self.strip_script(environ, request.path)
            if path == self.login_handler:
                log.debug('Setting CSRF_AUTH_STATE')
                environ[self.auth_state] = True
                environ[self.token_env] = token
            else:
                environ[self.token_env] = self.extract_csrf_token(request)

            app = environ.get('repoze.who.application')
            if app:
                # This occurs during login in some application configurations
                if isinstance(app, HTTPFound) and environ.get(self.auth_state):
                    log.debug('Got HTTPFound(302) from'
                              ' repoze.who.application')
                    # What possessed people to make this a string or
                    # a function?
                    location = app.location
                    if hasattr(location, '__call__'):
                        location = location()
                    loc = update_qs(location, {self.csrf_token_id:
                                               str(token)})

                    headers = app.headers.items()
                    replace_header(headers, 'location', loc)
                    app.headers = ResponseHeaders(headers)
                    log.debug('Altered headers: %(headers)s' % {
                        'headers': to_bytes(app.headers)})
        else:
            log.warning('Invalid session cookie %(s_id)r, not setting CSRF'
                        ' token!' % {'s_id': to_bytes(session_id)})

    def extract_csrf_token(self, request):
        '''Extract and remove the CSRF token from a given
        :class:`webob.Request`
        '''
        csrf_token = None

        if self.csrf_token_id in request.GET:
            log.debug("%(token)s in GET" % {'token':
                                            to_bytes(self.csrf_token_id)})
            csrf_token = request.GET[self.csrf_token_id]
            del(request.GET[self.csrf_token_id])
            request.query_string = '&'.join(['%s=%s' % (k, v) for k, v in
                                             request.GET.items()])

        if self.csrf_token_id in request.POST:
            log.debug("%(token)s in POST" % {'token':
                                             to_bytes(self.csrf_token_id)})
            csrf_token = request.POST[self.csrf_token_id]
            del(request.POST[self.csrf_token_id])

        return csrf_token