File: message_queue.rb

package info (click to toggle)
ruby-dbus 0.16.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 520 kB
  • sloc: ruby: 3,786; sh: 53; makefile: 8
file content (168 lines) | stat: -rw-r--r-- 5,041 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# This file is part of the ruby-dbus project
# Copyright (C) 2007 Arnaud Cornet and Paul van Tilburg
# Copyright (C) 2009-2014 Martin Vidner
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License, version 2.1 as published by the Free Software Foundation.
# See the file "COPYING" for the exact licensing terms.

require "fcntl"
require "socket"

module DBus
  # Encapsulates a socket so that we can {#push} and {#pop} {Message}s.
  class MessageQueue
    # The socket that is used to connect with the bus.
    attr_reader :socket

    def initialize(address)
      @address = address
      @buffer = ""
      @is_tcp = false
      connect
    end

    # @param non_block [Boolean] if true, return nil instead of waiting
    # @return [Message,nil] one message or nil if unavailable
    # @raise EOFError
    # @todo failure modes
    def pop(non_block = false)
      buffer_from_socket_nonblock
      message = message_from_buffer_nonblock
      unless non_block
        # we can block
        while message.nil?
          r, _d, _d = IO.select([@socket])
          if r && r[0] == @socket
            buffer_from_socket_nonblock
            message = message_from_buffer_nonblock
          end
        end
      end
      message
    end

    def push(message)
      @socket.write(message.marshall)
    end
    alias << push

    private

    # Connect to the bus and initialize the connection.
    def connect
      addresses = @address.split ";"
      # connect to first one that succeeds
      worked = addresses.find do |a|
        transport, keyvaluestring = a.split ":"
        kv_list = keyvaluestring.split ","
        kv_hash = {}
        kv_list.each do |kv|
          key, escaped_value = kv.split "="
          value = escaped_value.gsub(/%(..)/) { |_m| [Regexp.last_match(1)].pack "H2" }
          kv_hash[key] = value
        end
        case transport
        when "unix"
          connect_to_unix kv_hash
        when "tcp"
          connect_to_tcp kv_hash
        when "launchd"
          connect_to_launchd kv_hash
        else
          # ignore, report?
        end
      end
      worked
      # returns the address that worked or nil.
      # how to report failure?
    end

    # Connect to a bus over tcp and initialize the connection.
    def connect_to_tcp(params)
      # check if the path is sufficient
      if params.key?("host") && params.key?("port")
        begin
          # initialize the tcp socket
          @socket = TCPSocket.new(params["host"], params["port"].to_i)
          @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
          init_connection
          @is_tcp = true
        rescue Exception => e
          puts "Oops:", e
          puts "Error: Could not establish connection to: #{@path}, will now exit."
          exit(1) # a little harsh
        end
      else
        # Danger, Will Robinson: the specified "path" is not usable
        puts "Error: supplied path: #{@path}, unusable! sorry."
      end
    end

    # Connect to an abstract unix bus and initialize the connection.
    def connect_to_unix(params)
      @socket = Socket.new(Socket::Constants::PF_UNIX, Socket::Constants::SOCK_STREAM, 0)
      @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
      if !params["abstract"].nil?
        sockaddr = if HOST_END == LIL_END
                     "\1\0\0#{params["abstract"]}"
                   else
                     "\0\1\0#{params["abstract"]}"
                   end
      elsif !params["path"].nil?
        sockaddr = Socket.pack_sockaddr_un(params["path"])
      end
      @socket.connect(sockaddr)
      init_connection
    end

    def connect_to_launchd(params)
      socket_var = params["env"]
      socket = `launchctl getenv #{socket_var}`.chomp
      connect_to_unix "path" => socket
    end

    # Initialize the connection to the bus.
    def init_connection
      client = Client.new(@socket)
      client.authenticate
    end

    public # FIXME: fix Main loop instead

    # Get and remove one message from the buffer.
    # @return [Message,nil] the message or nil if unavailable
    def message_from_buffer_nonblock
      return nil if @buffer.empty?
      ret = nil
      begin
        ret, size = Message.new.unmarshall_buffer(@buffer)
        @buffer.slice!(0, size)
      rescue IncompleteBufferException
        # fall through, let ret remain nil
      end
      ret
    end

    # The buffer size for messages.
    MSG_BUF_SIZE = 4096

    # Fill (append) the buffer from data that might be available on the
    # socket.
    # @return [void]
    # @raise EOFError
    def buffer_from_socket_nonblock
      @buffer += @socket.read_nonblock(MSG_BUF_SIZE)
    rescue EOFError
      raise # the caller expects it
    rescue Errno::EAGAIN
      # fine, would block
    rescue Exception => e
      puts "Oops:", e
      raise if @is_tcp # why?
      puts "WARNING: read_nonblock failed, falling back to .recv"
      @buffer += @socket.recv(MSG_BUF_SIZE)
    end
  end
end