1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
|
#!/usr/bin/env ruby
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pubsub_demo demos accesses the Google PubSub API via its gRPC interface
#
# $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
# path/to/pubsub_demo.rb \
# [--action=<chosen_demo_action> ]
#
# There are options related to the chosen action, see #parse_args below.
# - the possible actions are given by the method names of NamedAction class
# - the default action is list_some_topics
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'optparse'
require 'grpc'
require 'googleauth'
require 'google/protobuf'
require 'google/protobuf/empty'
require 'tech/pubsub/proto/pubsub'
require 'tech/pubsub/proto/pubsub_services'
# creates a SSL Credentials from the production certificates.
def ssl_creds
GRPC::Core::ChannelCredentials.new()
end
# Builds the metadata authentication update proc.
def auth_proc(opts)
auth_creds = Google::Auth.get_application_default
return auth_creds.updater_proc
end
# Creates a stub for accessing the publisher service.
def publisher_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
GRPC.logger.info("... access PublisherService at #{address}")
call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
combined_creds = ssl_creds.compose(call_creds)
stub_clz.new(address, creds: combined_creds,
GRPC::Core::Channel::SSL_TARGET => opts.host)
end
# Creates a stub for accessing the subscriber service.
def subscriber_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
GRPC.logger.info("... access SubscriberService at #{address}")
call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
combined_creds = ssl_creds.compose(call_creds)
stub_clz.new(address, creds: combined_creds,
GRPC::Core::Channel::SSL_TARGET => opts.host)
end
# defines methods corresponding to each interop test case.
class NamedActions
include Tech::Pubsub
# Initializes NamedActions
#
# @param pub [Stub] a stub for accessing the publisher service
# @param sub [Stub] a stub for accessing the publisher service
# @param args [Args] provides access to the command line
def initialize(pub, sub, args)
@pub = pub
@sub = sub
@args = args
end
# Removes the test topic if it exists
def remove_topic
name = test_topic_name
p "... removing Topic #{name}"
@pub.delete_topic(DeleteTopicRequest.new(topic: name))
p "removed Topic: #{name} OK"
rescue GRPC::BadStatus => e
p "Could not delete a topics: rpc failed with '#{e}'"
end
# Creates a test topic
def create_topic
name = test_topic_name
p "... creating Topic #{name}"
resp = @pub.create_topic(Topic.new(name: name))
p "created Topic: #{resp.name} OK"
rescue GRPC::BadStatus => e
p "Could not create a topics: rpc failed with '#{e}'"
end
# Lists topics in the project
def list_some_topics
p 'Listing topics'
p '-------------_'
list_project_topics.topic.each { |t| p t.name }
rescue GRPC::BadStatus => e
p "Could not list topics: rpc failed with '#{e}'"
end
# Checks if a topics exists in a project
def check_exists
name = test_topic_name
p "... checking for topic #{name}"
exists = topic_exists?(name)
p "#{name} is a topic" if exists
p "#{name} is not a topic" unless exists
rescue GRPC::BadStatus => e
p "Could not check for a topics: rpc failed with '#{e}'"
end
# Publishes some messages
def random_pub_sub
topic_name, sub_name = test_topic_name, test_sub_name
create_topic_if_needed(topic_name)
@sub.create_subscription(Subscription.new(name: sub_name,
topic: topic_name))
msg_count = rand(10..30)
msg_count.times do |x|
msg = PubsubMessage.new(data: "message #{x}")
@pub.publish(PublishRequest.new(topic: topic_name, message: msg))
end
p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
max_events: msg_count))
ack_ids = batch.pull_responses.map { |x| x.ack_id }
p "Got #{ack_ids.size} messages; acknowledging them.."
@sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
ack_id: ack_ids))
p "Test messages were acknowledged OK, deleting the subscription"
del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
@sub.delete_subscription(del_req)
rescue GRPC::BadStatus => e
p "Could not do random pub sub: rpc failed with '#{e}'"
end
private
# test_topic_name is the topic name to use in this test.
def test_topic_name
unless @args.topic_name.nil?
return "/topics/#{@args.project_id}/#{@args.topic_name}"
end
now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
"/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
end
# test_sub_name is the subscription name to use in this test.
def test_sub_name
unless @args.sub_name.nil?
return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
end
now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
"/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
end
# determines if the topic name exists
def topic_exists?(name)
topics = list_project_topics.topic.map { |t| t.name }
topics.include?(name)
end
def create_topic_if_needed(name)
return if topic_exists?(name)
@pub.create_topic(Topic.new(name: name))
end
def list_project_topics
q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
@pub.list_topics(ListTopicsRequest.new(query: q))
end
end
# Args is used to hold the command line info.
Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
:sub_name)
# validates the command line options, returning them as an Arg.
def parse_args
args = Args.new('pubsub-staging.googleapis.com',
443, 'list_some_topics', 'stoked-keyword-656')
OptionParser.new do |opts|
opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
args.host = v
end
opts.on('--server_port SERVER_PORT', 'server port') do |v|
args.port = v
end
# instance_methods(false) gives only the methods defined in that class.
scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
scene_list = scenes.join(',')
opts.on("--action CODE", scenes, {}, 'pick a demo action',
" (#{scene_list})") do |v|
args.action = v
end
# Set the remaining values.
%w(project_id topic_name sub_name).each do |o|
opts.on("--#{o} VALUE", "#{o}") do |v|
args[o] = v
end
end
end.parse!
_check_args(args)
end
def _check_args(args)
%w(host port action).each do |a|
if args[a].nil?
raise OptionParser::MissingArgument.new("please specify --#{a}")
end
end
args
end
def main
args = parse_args
pub, sub = publisher_stub(args), subscriber_stub(args)
NamedActions.new(pub, sub, args).method(args.action).call
end
main
|