File: tcp_socket.rb

package info (click to toggle)
ruby-celluloid-io 0.16.2-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, buster, stretch
  • size: 432 kB
  • ctags: 189
  • sloc: ruby: 1,727; makefile: 6
file content (114 lines) | stat: -rw-r--r-- 3,690 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
require 'socket'
require 'resolv'

module Celluloid
  module IO
    # TCPSocket with combined blocking and evented support
    class TCPSocket < Stream
      extend Forwardable

      def_delegators :@socket, :read_nonblock, :write_nonblock, :close, :close_read, :close_write, :closed?
      def_delegators :@socket, :addr, :peeraddr, :setsockopt, :getsockname

      # Open a TCP socket, yielding it to the given block and closing it
      # automatically when done (if a block is given)
      def self.open(*args, &block)
        sock = new(*args)
        return sock unless block_given?

        begin
          yield(sock)
        ensure
          sock.close
        end
      end

      # Convert a Ruby TCPSocket into a Celluloid::IO::TCPSocket
      # DEPRECATED: to be removed in a future release
      def self.from_ruby_socket(ruby_socket)
        new(ruby_socket)
      end

      # Opens a TCP connection to remote_host on remote_port. If local_host
      # and local_port are specified, then those parameters are used on the
      # local end to establish the connection.
      def initialize(remote_host, remote_port = nil, local_host = nil, local_port = nil)
        super()

        # Allow users to pass in a Ruby TCPSocket directly
        if remote_host.is_a? ::TCPSocket
          @addr = nil
          @socket = remote_host
          return
        elsif remote_port.nil?
          raise ArgumentError, "wrong number of arguments (1 for 2)"
        end

        # Is it an IPv4 address?
        begin
          @addr = Resolv::IPv4.create(remote_host)
        rescue ArgumentError
        end

        # Guess it's not IPv4! Is it IPv6?
        unless @addr
          begin
            @addr = Resolv::IPv6.create(remote_host)
          rescue ArgumentError
          end
        end

        # Guess it's not an IP address, so let's try DNS
        unless @addr
          addrs = Array(DNSResolver.new.resolve(remote_host))
          raise Resolv::ResolvError, "DNS result has no information for #{remote_host}" if addrs.empty?

          # Pseudorandom round-robin DNS support :/
          @addr = addrs[rand(addrs.size)]
        end

        case @addr
        when Resolv::IPv4
          family = Socket::AF_INET
        when Resolv::IPv6
          family = Socket::AF_INET6
        else raise ArgumentError, "unsupported address class: #{@addr.class}"
        end

        @socket = Socket.new(family, Socket::SOCK_STREAM, 0)
        @socket.bind Addrinfo.tcp(local_host, local_port) if local_host

        begin
          @socket.connect_nonblock Socket.sockaddr_in(remote_port, @addr.to_s)
        rescue Errno::EINPROGRESS
          wait_writable

          # HAX: for some reason we need to finish_connect ourselves on JRuby
          # This logic is unnecessary but JRuby still throws Errno::EINPROGRESS
          # if we retry the non-blocking connect instead of just finishing it
          retry unless defined?(JRUBY_VERSION) && @socket.to_channel.finish_connect
        rescue Errno::EISCONN
          # We're now connected! Yay exceptions for flow control
          # NOTE: This is the approach the Ruby stdlib docs suggest ;_;
        end
      end

      def to_io
        @socket
      end

      # Receives a message
      def recv(maxlen, flags = nil)
        raise NotImplementedError, "flags not supported" if flags && !flags.zero?
        readpartial(maxlen)
      end

      # Send a message
      def send(msg, flags, dest_sockaddr = nil)
        raise NotImplementedError, "dest_sockaddr not supported" if dest_sockaddr
        raise NotImplementedError, "flags not supported" unless flags.zero?
        write(msg)
      end
    end
  end
end