File: host_authorization.rb

package info (click to toggle)
rails 2%3A6.0.3.7%2Bdfsg-2%2Bdeb11u2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 70,976 kB
  • sloc: ruby: 271,623; javascript: 19,043; yacc: 46; sql: 43; makefile: 28; sh: 18
file content (121 lines) | stat: -rw-r--r-- 3,558 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# frozen_string_literal: true

require "action_dispatch/http/request"

module ActionDispatch
  # This middleware guards from DNS rebinding attacks by explicitly permitting
  # the hosts a request can be sent to.
  #
  # When a request comes to an unauthorized host, the +response_app+
  # application will be executed and rendered. If no +response_app+ is given, a
  # default one will run, which responds with +403 Forbidden+.
  class HostAuthorization
    ALLOWED_HOSTS_IN_DEVELOPMENT = [".localhost", IPAddr.new("0.0.0.0/0"), IPAddr.new("::/0")]
    PORT_REGEX = /(?::\d+)/ # :nodoc:
    IPV4_HOSTNAME = /(?<host>\d+\.\d+\.\d+\.\d+)#{PORT_REGEX}?/ # :nodoc:
    IPV6_HOSTNAME = /(?<host>[a-f0-9]*:[a-f0-9.:]+)/i # :nodoc:
    IPV6_HOSTNAME_WITH_PORT = /\[#{IPV6_HOSTNAME}\]#{PORT_REGEX}/i # :nodoc:
    VALID_IP_HOSTNAME = Regexp.union( # :nodoc:
      /\A#{IPV4_HOSTNAME}\z/,
      /\A#{IPV6_HOSTNAME}\z/,
      /\A#{IPV6_HOSTNAME_WITH_PORT}\z/,
    )

    class Permissions # :nodoc:
      def initialize(hosts)
        @hosts = sanitize_hosts(hosts)
      end

      def empty?
        @hosts.empty?
      end

      def allows?(host)
        @hosts.any? do |allowed|
          if allowed.is_a?(IPAddr)
            begin
              allowed === extract_hostname(host)
            rescue
              # IPAddr#=== raises an error if you give it a hostname instead of
              # IP. Treat similar errors as blocked access.
              false
            end
          else
            allowed === host
          end
        end
      end

      private
        def sanitize_hosts(hosts)
          Array(hosts).map do |host|
            case host
            when Regexp then sanitize_regexp(host)
            when String then sanitize_string(host)
            else host
            end
          end
        end

        def sanitize_regexp(host)
          /\A#{host}#{PORT_REGEX}?\z/
        end

        def sanitize_string(host)
          if host.start_with?(".")
            /\A([a-z0-9-]+\.)?#{Regexp.escape(host[1..-1])}#{PORT_REGEX}?\z/i
          else
            /\A#{Regexp.escape host}#{PORT_REGEX}?\z/i
          end
        end

        def extract_hostname(host)
          host.slice(VALID_IP_HOSTNAME, "host") || host
        end
    end

    DEFAULT_RESPONSE_APP = -> env do
      request = Request.new(env)

      format = request.xhr? ? "text/plain" : "text/html"
      template = DebugView.new(host: request.host)
      body = template.render(template: "rescues/blocked_host", layout: "rescues/layout")

      [403, {
        "Content-Type" => "#{format}; charset=#{Response.default_charset}",
        "Content-Length" => body.bytesize.to_s,
      }, [body]]
    end

    def initialize(app, hosts, response_app = nil)
      @app = app
      @permissions = Permissions.new(hosts)
      @response_app = response_app || DEFAULT_RESPONSE_APP
    end

    def call(env)
      return @app.call(env) if @permissions.empty?

      request = Request.new(env)

      if authorized?(request)
        mark_as_authorized(request)
        @app.call(env)
      else
        @response_app.call(env)
      end
    end

    private
      def authorized?(request)
        origin_host = request.get_header("HTTP_HOST")
        forwarded_host = request.x_forwarded_host&.split(/,\s?/)&.last

        @permissions.allows?(origin_host) && (forwarded_host.blank? || @permissions.allows?(forwarded_host))
      end

      def mark_as_authorized(request)
        request.set_header("action_dispatch.authorized_host", request.host)
      end
  end
end