1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
|
# frozen_string_literal: true
# This file is part of the ruby-dbus project
# Copyright (C) 2007 Arnaud Cornet and Paul van Tilburg
# Copyright (C) 2023 Martin Vidner
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License, version 2.1 as published by the Free Software Foundation.
# See the file "COPYING" for the exact licensing terms.
module DBus
# D-Bus main connection class
#
# Main class that maintains a connection to a bus and can handle incoming
# and outgoing messages.
class Connection
# pop and push messages here
# @return [MessageQueue]
attr_reader :message_queue
# Create a new connection to the bus for a given connect _path_. _path_
# format is described in the D-Bus specification:
# http://dbus.freedesktop.org/doc/dbus-specification.html#addresses
# and is something like:
# "transport1:key1=value1,key2=value2;transport2:key1=value1,key2=value2"
# e.g. "unix:path=/tmp/dbus-test" or "tcp:host=localhost,port=2687"
def initialize(path)
@message_queue = MessageQueue.new(path)
# @return [Hash{Integer => Proc}]
# key: message serial
# value: block to be run when the reply to that message is received
@method_call_replies = {}
# @return [Hash{Integer => Message}]
# for debugging only: messages for which a reply was not received yet;
# key == value.serial
@method_call_msgs = {}
@signal_matchrules = {}
end
def object_server
@object_server ||= ObjectServer.new(self)
end
# Dispatch all messages that are available in the queue,
# but do not block on the queue.
# Called by a main loop when something is available in the queue
def dispatch_message_queue
while (msg = @message_queue.pop(blocking: false)) # FIXME: EOFError
process(msg)
end
end
# Tell a bus to register itself on the glib main loop
def glibize
require "glib2"
# Circumvent a ruby-glib bug
@channels ||= []
gio = GLib::IOChannel.new(@message_queue.socket.fileno)
@channels << gio
gio.add_watch(GLib::IOChannel::IN) do |_c, _ch|
dispatch_message_queue
true
end
end
# NAME_FLAG_* and REQUEST_NAME_* belong to BusConnection
# but users will have referenced them in Connection so they need to stay here
# FIXME: describe the following names, flags and constants.
# See DBus spec for definition
NAME_FLAG_ALLOW_REPLACEMENT = 0x1
NAME_FLAG_REPLACE_EXISTING = 0x2
NAME_FLAG_DO_NOT_QUEUE = 0x4
REQUEST_NAME_REPLY_PRIMARY_OWNER = 0x1
REQUEST_NAME_REPLY_IN_QUEUE = 0x2
REQUEST_NAME_REPLY_EXISTS = 0x3
REQUEST_NAME_REPLY_ALREADY_OWNER = 0x4
# @api private
# Send a _message_.
# If _reply_handler_ is not given, wait for the reply
# and return the reply, or raise the error.
# If _reply_handler_ is given, it will be called when the reply
# eventually arrives, with the reply message as the 1st param
# and its params following
def send_sync_or_async(message, &reply_handler)
ret = nil
if reply_handler.nil?
send_sync(message) do |rmsg|
raise rmsg if rmsg.is_a?(Error)
ret = rmsg.params
end
else
on_return(message) do |rmsg|
if rmsg.is_a?(Error)
reply_handler.call(rmsg)
else
reply_handler.call(rmsg, * rmsg.params)
end
end
@message_queue.push(message)
end
ret
end
# @api private
def introspect_data(dest, path, &reply_handler)
m = DBus::Message.new(DBus::Message::METHOD_CALL)
m.path = path
m.interface = "org.freedesktop.DBus.Introspectable"
m.destination = dest
m.member = "Introspect"
m.sender = unique_name
if reply_handler.nil?
send_sync_or_async(m).first
else
send_sync_or_async(m) do |*args|
# TODO: test async introspection, is it used at all?
args.shift # forget the message, pass only the text
reply_handler.call(*args)
nil
end
end
end
# @api private
# Issues a call to the org.freedesktop.DBus.Introspectable.Introspect method
# _dest_ is the service and _path_ the object path you want to introspect
# If a code block is given, the introspect call in asynchronous. If not
# data is returned
#
# FIXME: link to ProxyObject data definition
# The returned object is a ProxyObject that has methods you can call to
# issue somme METHOD_CALL messages, and to setup to receive METHOD_RETURN
def introspect(dest, path)
if !block_given?
# introspect in synchronous !
data = introspect_data(dest, path)
pof = DBus::ProxyObjectFactory.new(data, self, dest, path)
pof.build
else
introspect_data(dest, path) do |async_data|
yield(DBus::ProxyObjectFactory.new(async_data, self, dest, path).build)
end
end
end
# Exception raised when a service name is requested that is not available.
class NameRequestError < Exception
# @return [Integer] one of
# REQUEST_NAME_REPLY_IN_QUEUE
# REQUEST_NAME_REPLY_EXISTS
attr_reader :error_code
def initialize(error_code)
@error_code = error_code
super()
end
end
# In case RequestName did not succeed, raise an exception but first ask the bus who owns the name instead of us
# @param ret [Integer] what RequestName returned
# @param name Name that was requested
# @return [REQUEST_NAME_REPLY_PRIMARY_OWNER,REQUEST_NAME_REPLY_ALREADY_OWNER] on success
# @raise [NameRequestError] with #error_code REQUEST_NAME_REPLY_EXISTS or REQUEST_NAME_REPLY_IN_QUEUE, on failure
# @api private
def handle_return_of_request_name(ret, name)
if [REQUEST_NAME_REPLY_EXISTS, REQUEST_NAME_REPLY_IN_QUEUE].include?(ret)
other = proxy.GetNameOwner(name).first
other_creds = proxy.GetConnectionCredentials(other).first
message = "Could not request #{name}, already owned by #{other}, #{other_creds.inspect}"
raise NameRequestError.new(ret), message
end
ret
end
# Attempt to request a service _name_.
# @raise NameRequestError which cannot really be rescued as it will be raised when dispatching a later call.
# @return [ObjectServer]
# @deprecated Use {BusConnection#request_name}.
def request_service(name)
# Use RequestName, but asynchronously!
# A synchronous call would not work with service activation, where
# method calls to be serviced arrive before the reply for RequestName
# (Ticket#29).
proxy.RequestName(name, NAME_FLAG_REPLACE_EXISTING) do |rmsg, r|
# check and report errors first
raise rmsg if rmsg.is_a?(Error)
handle_return_of_request_name(r, name)
end
object_server
end
# @api private
# Wait for a message to arrive. Return it once it is available.
def wait_for_message
@message_queue.pop # FIXME: EOFError
end
# @api private
# Send a message _msg_ on to the bus. This is done synchronously, thus
# the call will block until a reply message arrives.
# @param msg [Message]
# @param retc [Proc] the reply handler
# @yieldparam rmsg [MethodReturnMessage] the reply
# @yieldreturn [Array<Object>] the reply (out) parameters
def send_sync(msg, &retc) # :yields: reply/return message
return if msg.nil? # check if somethings wrong
@message_queue.push(msg)
@method_call_msgs[msg.serial] = msg
@method_call_replies[msg.serial] = retc
retm = wait_for_message
return if retm.nil? # check if somethings wrong
process(retm)
while @method_call_replies.key? msg.serial
retm = wait_for_message
process(retm)
end
rescue EOFError
new_err = DBus::Error.new("Connection dropped after we sent #{msg.inspect}")
raise new_err
end
# @api private
# Specify a code block that has to be executed when a reply for
# message _msg_ is received.
# @param msg [Message]
def on_return(msg, &retc)
# Have a better exception here
if msg.message_type != Message::METHOD_CALL
raise "on_return should only get method_calls"
end
@method_call_msgs[msg.serial] = msg
@method_call_replies[msg.serial] = retc
end
# Asks bus to send us messages matching mr, and execute slot when
# received
# @param match_rule [MatchRule,#to_s]
# @return [void] actually return whether the rule existed, internal detail
def add_match(match_rule, &slot)
# check this is a signal.
mrs = match_rule.to_s
DBus.logger.debug "#{@signal_matchrules.size} rules, adding #{mrs.inspect}"
rule_existed = @signal_matchrules.key?(mrs)
@signal_matchrules[mrs] = slot
rule_existed
end
# @param match_rule [MatchRule,#to_s]
# @return [void] actually return whether the rule existed, internal detail
def remove_match(match_rule)
mrs = match_rule.to_s
@signal_matchrules.delete(mrs).nil?
end
# @api private
# Process a message _msg_ based on its type.
# @param msg [Message]
def process(msg)
return if msg.nil? # check if somethings wrong
case msg.message_type
when Message::ERROR, Message::METHOD_RETURN
raise InvalidPacketException if msg.reply_serial.nil?
mcs = @method_call_replies[msg.reply_serial]
if !mcs
DBus.logger.debug "no return code for mcs: #{mcs.inspect} msg: #{msg.inspect}"
else
if msg.message_type == Message::ERROR
mcs.call(Error.new(msg))
else
mcs.call(msg)
end
@method_call_replies.delete(msg.reply_serial)
@method_call_msgs.delete(msg.reply_serial)
end
when DBus::Message::METHOD_CALL
if msg.path == "/org/freedesktop/DBus"
DBus.logger.debug "Got method call on /org/freedesktop/DBus"
end
node = object_server.get_node(msg.path, create: false)
# introspect a known path even if there is no object on it
if node &&
msg.interface == "org.freedesktop.DBus.Introspectable" &&
msg.member == "Introspect"
reply = Message.new(Message::METHOD_RETURN).reply_to(msg)
reply.sender = @unique_name
xml = node.to_xml(msg.path)
reply.add_param(Type::STRING, xml)
@message_queue.push(reply)
# dispatch for an object
elsif node&.object
node.object.dispatch(msg)
else
reply = Message.error(msg, "org.freedesktop.DBus.Error.UnknownObject",
"Object #{msg.path} doesn't exist")
@message_queue.push(reply)
end
when DBus::Message::SIGNAL
# the signal can match multiple different rules
# clone to allow new signale handlers to be registered
@signal_matchrules.dup.each do |mrs, slot|
if DBus::MatchRule.new.from_s(mrs).match(msg)
slot.call(msg)
end
end
else
# spec(Message Format): Unknown types must be ignored.
DBus.logger.debug "Unknown message type: #{msg.message_type}"
end
rescue Exception => e
raise msg.annotate_exception(e)
end
# @api private
# Emit a signal event for the given _service_, object _obj_, interface
# _intf_ and signal _sig_ with arguments _args_.
# @param _service unused
# @param obj [DBus::Object]
# @param intf [Interface]
# @param sig [Signal]
# @param args arguments for the signal
def emit(_service, obj, intf, sig, *args)
m = Message.new(DBus::Message::SIGNAL)
m.path = obj.path
m.interface = intf.name
m.member = sig.name
i = 0
sig.params.each do |par|
m.add_param(par.type, args[i])
i += 1
end
@message_queue.push(m)
end
end
# A {Connection} that is talking directly to a peer, with no bus daemon in between.
# A prominent example is the PulseAudio connection,
# see https://www.freedesktop.org/wiki/Software/PulseAudio/Documentation/Developer/Clients/DBus/
# When starting, it still starts with authentication but omits the Hello message.
class PeerConnection < Connection
# Get a {ProxyPeerService}, a dummy helper to get {ProxyObject}s for
# a {PeerConnection}.
# @return [ProxyPeerService]
def peer_service
ProxyPeerService.new(self)
end
end
end
|