File: protocol.rb

package info (click to toggle)
avro-java 1.8.2-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 12,784 kB
  • sloc: java: 58,236; ansic: 27,618; cpp: 15,332; cs: 12,876; python: 10,443; xml: 6,338; php: 3,836; ruby: 3,158; perl: 1,656; sh: 733; lex: 203; yacc: 140; makefile: 7
file content (161 lines) | stat: -rw-r--r-- 5,460 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Avro
  class Protocol
    VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed])
    VALID_TYPE_SCHEMA_TYPES_SYM = Set.new(VALID_TYPE_SCHEMA_TYPES.map(&:to_sym))
    class ProtocolParseError < Avro::AvroError; end

    attr_reader :name, :namespace, :types, :messages, :md5
    def self.parse(protocol_string)
      json_data = MultiJson.load(protocol_string)

      if json_data.is_a? Hash
        name = json_data['protocol']
        namespace = json_data['namespace']
        types = json_data['types']
        messages = json_data['messages']
        Protocol.new(name, namespace, types, messages)
      else
        raise ProtocolParseError, "Not a JSON object: #{json_data}"
      end
    end

    def initialize(name, namespace=nil, types=nil, messages=nil)
      # Ensure valid ctor args
      if !name
        raise ProtocolParseError, 'Protocols must have a non-empty name.'
      elsif !name.is_a?(String)
        raise ProtocolParseError, 'The name property must be a string.'
      elsif !namespace.is_a?(String)
        raise ProtocolParseError, 'The namespace property must be a string.'
      elsif !types.is_a?(Array)
        raise ProtocolParseError, 'The types property must be a list.'
      elsif !messages.is_a?(Hash)
        raise ProtocolParseError, 'The messages property must be a JSON object.'
      end

      @name = name
      @namespace = namespace
      type_names = {}
      @types = parse_types(types, type_names)
      @messages = parse_messages(messages, type_names)
      @md5 = Digest::MD5.digest(to_s)
    end

    def to_s
      MultiJson.dump to_avro
    end

    def ==(other)
      to_avro == other.to_avro
    end

    private
    def parse_types(types, type_names)
      type_objects = []
      types.collect do |type|
        # FIXME adding type.name to type_names is not defined in the
        # spec. Possible bug in the python impl and the spec.
        type_object = Schema.real_parse(type, type_names, namespace)
        unless VALID_TYPE_SCHEMA_TYPES_SYM.include?(type_object.type_sym)
          msg = "Type #{type} not an enum, record, fixed or error."
          raise ProtocolParseError, msg
        end
        type_object
      end
    end

    def parse_messages(messages, names)
      message_objects = {}
      messages.each do |name, body|
        if message_objects.has_key?(name)
          raise ProtocolParseError, "Message name \"#{name}\" repeated."
        elsif !body.is_a?(Hash)
          raise ProtocolParseError, "Message name \"#{name}\" has non-object body #{body.inspect}"
        end

        request  = body['request']
        response = body['response']
        errors   = body['errors']
        message_objects[name] = Message.new(name, request, response, errors, names, namespace)
      end
      message_objects
    end

    protected
    def to_avro(names=Set.new)
      hsh = {'protocol' => name}
      hsh['namespace'] = namespace if namespace
      hsh['types'] = types.map{|t| t.to_avro(names) } if types

      if messages
        hsh['messages'] = messages.inject({}) {|h, (k,t)| h[k] = t.to_avro(names); h }
      end

      hsh
    end

    class Message
      attr_reader :name, :request, :response, :errors, :default_namespace

      def initialize(name, request, response, errors=nil, names=nil, default_namespace=nil)
        @name = name
        @default_namespace = default_namespace
        @request = parse_request(request, names)
        @response = parse_response(response, names)
        @errors = parse_errors(errors, names) if errors
      end

      def to_avro(names=Set.new)
        {
          'request' => request.to_avro(names),
          'response' => response.to_avro(names)
        }.tap do |hash|
          hash['errors'] = errors.to_avro(names) if errors
        end
      end

      def to_s
        Yajl.dump to_avro
      end

      def parse_request(request, names)
        unless request.is_a?(Array)
          raise ProtocolParseError, "Request property not an Array: #{request.inspect}"
        end
        Schema::RecordSchema.new(nil, default_namespace, request, names, :request)
      end

      def parse_response(response, names)
        if response.is_a?(String) && names
          fullname = Name.make_fullname(response, default_namespace)
          return names[fullname] if names.include?(fullname)
        end

        Schema.real_parse(response, names, default_namespace)
      end

      def parse_errors(errors, names)
        unless errors.is_a?(Array)
          raise ProtocolParseError, "Errors property not an Array: #{errors}"
        end
        Schema.real_parse(errors, names, default_namespace)
      end
    end
  end
end