1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
|
# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
require 'json'
module AWS
class SNS
class Topic
include Core::Model
include HasDeliveryPolicy
# @param [String] arn The topic ARN.
def initialize arn, options = {}
@arn = arn
super
end
# @return [String] The topic ARN.
attr_reader :arn
# The topic name.
#
# If you have not set a display name (see {#display_name=}) then
# this is used as the "From" field for notifications to email and
# email-json endpoints.
# @return [String] Returns the topic name.
def name
arn.split(/:/)[-1]
end
# Causes the given `endpoint` to receive messages published to this
# topic.
#
# ## Subscribing to SQS Queues
#
# If you subscribe to an SQS queue (with a {SQS::Queue} object}
# then a policy will be added/updated to the queue that will
# permit this topic to send it messages. Some important notes:
#
# * If you subscribe with a queue by ARN then you must change the
# policy yourself.
#
# * If you do not want the policy modified then pass `:update_policy`
# as false or just pass the queue's arn
#
# topic.subscribe(queue.arn)
# topic.subscribe(queue, :update_policy => false)
#
# @example Using a url string to set the endpoint (http and https)
#
# topic.subscribe('http://example.com/messages')
# topic.subscribe('https://example.com/messages')
#
# @example Using a uri object to set the endpoint (http and https)
#
# topic.subscribe(URI.parse('http://example.com/messages'))
# topic.subscribe(URI.parse('https://example.com/messages'))
#
# @example Email address as endpoint
#
# topic.subscribe('nobody@example.com')
#
# @example Email address as a JSON endpoint
#
# # send messages encoded as json object to the given email address
# topic.subscribe('nobody@example.com', :json => true)
#
# @example SQS Queue (by arn)
#
# # you must manage the queue policy yourself to allow the
# # the topic to send messages (policy action 'sqs:SendMessage')
# topic.subscribe('arn:aws:sqs:us-west-2:123456789123:AQueue')
#
# @example SQS Queue (by Queue object)
#
# # the queue policy will be added/updated to allow the topic
# # to send it messages
# topic.subscribe(AWS::SQS.new.queues.first)
#
# @param [mixed] endpoint The endpoint that should receive
# messages that are published to this topic. Valid values
# for `endpoint` include:
#
# * URI object
# * http and https URI strings
# * email address
# * {SQS::Queue}
# * SQS queue ARN
# * phone number of an SMS-enabled device
#
# @param [Hash] options
# @option options [Boolean] :json (false)
# @return [Subscription,nil] Returns a subscription when possible.
# If the subscription requires confirmation first, then `nil` is
# returned instead.
def subscribe endpoint, options = {}
subscribe_opts = endpoint_opts(endpoint, options).merge(:topic_arn => arn)
resp = client.subscribe(subscribe_opts)
if arn = resp[:subscription_arn] and arn =~ /^arn:/
Subscription.new(arn, :config => config)
else
nil
end
end
# Verifies an endpoint owner's intent to receive messages by
# validating the token sent to the endpoint by an earlier
# Subscribe action. If the token is valid, the action creates a
# new subscription.
#
# @param [String] token Short-lived token sent to an endpoint
# during the {#subscribe} action.
#
# @param [Hash] options Additional options for confirming the
# subscription.
#
# @option options [Boolean] :authenticate_on_unsubscribe
# Indicates that you want to disable unauthenticated
# unsubsciption of the subscription.
#
# @return [Subscription] The newly created subscription.
#
def confirm_subscription token, options = {}
options[:authenticate_on_unsubscribe] = 'true' if
options[:authenticate_on_unsubscribe]
confirm_opts = options.merge(:token => token, :topic_arn => arn)
resp = client.confirm_subscription(confirm_opts)
Subscription.new(
resp[:subscription_arn],
:topic_arn => arn,
:config => config)
end
# @return [TopicSubscriptionCollection] Returns a collection that
# represents all of the subscriptions for this topic.
def subscriptions
TopicSubscriptionCollection.new(self)
end
# @return [String] Returns the human-readable name used in
# the "From" field for notifications to email and email-json
# endpoints. If you have not set the display name the topic
# {#name} will be used/returned instead.
def display_name
to_h[:display_name]
end
# @param [String] display_name Sets the human-readable name used in
# the "From" field for notifications to email and email-json
# endpoints.
# @return [String] Returns the display_name as passed.
def display_name= display_name
set_attribute('DisplayName', display_name)
display_name
end
# @return [String] The topic owner's ID.
def owner
to_h[:owner]
end
# @return [Integer] Returns number of confirmed topic subscriptions.
def num_subscriptions_confirmed
to_h[:num_subscriptions_confirmed]
end
# @return [Integer] Returns number of pending topic subscriptions.
def num_subscriptions_pending
to_h[:num_subscriptions_pending]
end
# @return [Integer] Returns number of deleted topic subscriptions.
def num_subscriptions_deleted
to_h[:num_subscriptions_deleted]
end
# @return [Policy] The topic's {Policy}.
def policy
to_h[:policy]
end
# Sets the topic's policy.
# @param [String,Policy] policy A JSON policy string, a {Policy} object
# or any other object that responds to #to_json with a valid
# policy.
# @return [nil]
def policy= policy
policy_json = policy.is_a?(String) ? policy : policy.to_json
set_attribute('Policy', policy_json)
nil
end
# @return [nil,String<JSON>] The delivery policy JSON string.
def delivery_policy_json
to_h[:delivery_policy_json]
end
# @return [String<JSON>] The effective delivery policy JSON string.
# into account system defaults.
def effective_delivery_policy_json
to_h[:effective_delivery_policy_json]
end
# Publishes a message to this SNS topic.
#
# topic.publish('a short message')
#
# You can pass a subject that is used when sending the message to
# email endpoints:
#
# topic.publish('message', :subject => 'SNS message subject')
#
# If you would like to pass a different message to various protocols
# (endpoint types) you can pass those as options:
#
# topic.publish('default message',
# :http => "message sent to http endpoints",
# :https => "message sent to https endpoints",
# :email => "message sent to email endpoints")
#
# The full list of acceptable protocols are listed below. The default
# message is sent to endpoints who's protocol was not listed.
#
# @param [String] default_message The message you want to send to the
# topic. Messages must be UTF-8 encoded strings at most 8 KB in size
# (8192 bytes, not 8192 characters).
# @param [Hash] options
# @option options [String] :subject Used as the "Subject" line when
# the message is delivered to email endpoints. Will also be
# included in the standard JSON messages delivered to other endpoints.
# * must be ASCII text that begins with a letter, number or
# punctuation mark
# * must not include line breaks or control characters
# * and must be less than 100 characters long
# @option options [String] :http - Message to use when sending to an
# HTTP endpoint.
# @option options [String] :https - Message to use when sending to an
# HTTPS endpoint.
# @option options [String] :email - Message to use when sending to an
# email endpoint.
# @option options [String] :email_json - Message to use when sending
# to an email json endpoint.
# @option options [String] :sqs - Message to use when sending to an
# SQS endpoint.
# @return [String] Returns the ID of the message that was sent.
def publish default_message, options = {}
message = { :default => default_message }
[:http, :https, :email, :email_json, :sqs].each do |protocol|
if options[protocol]
message[protocol.to_s.gsub(/_/, '-')] = options[protocol]
end
end
publish_opts = {}
publish_opts[:message] = message.to_json
publish_opts[:message_structure] = 'json'
publish_opts[:subject] = options[:subject] if options[:subject]
publish_opts[:topic_arn] = arn
response = client.publish(publish_opts)
response[:message_id]
end
# Deletes the topic.
# @return [nil]
def delete
client.delete_topic(:topic_arn => arn)
nil
end
# @return [Hash] Returns a hash of attributes about this topic,
# including:
#
# * `:arn`
# * `:name`
# * `:owner`
# * `:display_name`
# * `:policy`
# * `:num_subscriptions_confirmed`
# * `:num_subscriptions_pending`
# * `:num_subscriptions_deleted`
#
def to_h
attributes = client.get_topic_attributes(:topic_arn => arn).attributes
{
:arn => arn,
:name => name,
:owner => attributes['Owner'],
:display_name => attributes['DisplayName'] || name,
:policy => parse_policy(attributes['Policy']),
:num_subscriptions_confirmed => attributes['SubscriptionsConfirmed'].to_i,
:num_subscriptions_pending => attributes['SubscriptionsPending'].to_i,
:num_subscriptions_deleted => attributes['SubscriptionsDeleted'].to_i,
:delivery_policy_json => attributes['DeliveryPolicy'],
:effective_delivery_policy_json => attributes['EffectiveDeliveryPolicy'],
}
end
# @return [Boolean] Returns true if compared to another {Topic}
# with the same ARN.
def eql? other
other.kind_of?(Topic) and other.arn == arn
end
alias_method :==, :eql?
protected
def update_delivery_policy policy_json
set_attribute('DeliveryPolicy', policy_json)
end
protected
def parse_policy policy_json
if policy_json
policy = SNS::Policy.from_json(policy_json)
policy.extend(PolicyProxy)
policy.topic = self
policy
else
nil
end
end
# @api private
protected
def set_attribute name, value
client.send(:set_topic_attributes, {
:topic_arn => arn,
:attribute_name => name,
:attribute_value => value,
})
end
# @api private
module PolicyProxy
attr_accessor :topic
def change
yield(self)
topic.policy = self
end
end
# @api private
protected
def endpoint_opts(endpoint, opts = {})
case
when endpoint.is_a?(SQS::Queue)
# auto add a policy to the queue to allow the topic
# to send the queue messages
unless opts[:update_policy] == false
policy = endpoint.policy || SQS::Policy.new
policy.allow(
:principal => :any,
:actions => [:send_message],
:resources => [endpoint]
).where(:source_arn).is(arn)
endpoint.policy = policy
end
{ :protocol => 'sqs', :endpoint => endpoint.arn }
when endpoint =~ /^arn:/
raise ArgumentError, "expected a queue ARN" unless
endpoint =~ /^arn:aws(.*?):sqs:/
{ :protocol => "sqs", :endpoint => endpoint }
when endpoint.kind_of?(URI)
{ :protocol => endpoint.scheme,
:endpoint => endpoint.to_s }
when endpoint =~ /^(https?):/
{ :protocol => $1, :endpoint => endpoint }
when endpoint.include?("@")
{ :protocol => opts[:json] ? "email-json" : "email",
:endpoint => endpoint }
when endpoint.gsub(/\D/,'') =~ /\d{11,15}/
{ :protocol => "sms", :endpoint => endpoint.gsub(/\D/,'') }
else
raise ArgumentError, "could not determine protocol for '#{endpoint}'"
end
end
end
end
end
|