File: tube_test.rb

package info (click to toggle)
ruby-beaneater 1.0.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 276 kB
  • sloc: ruby: 1,602; sh: 4; makefile: 2
file content (198 lines) | stat: -rw-r--r-- 5,435 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# test/tube_test.rb

require File.expand_path('../test_helper', __FILE__)

describe Beaneater::Tube do
  before do
    @beanstalk = Beaneater.new('localhost')
    @tube = Beaneater::Tube.new(@beanstalk, 'baz')
  end

  describe "for #put" do
    before do
      @time = Time.now.to_i
    end

    it "should insert a job" do
      @tube.put "bar put #{@time}"
      assert_equal "bar put #{@time}", @tube.peek(:ready).body
      assert_equal 120, @tube.peek(:ready).ttr
      assert_equal 65536, @tube.peek(:ready).pri
      assert_equal 0, @tube.peek(:ready).delay
    end

    it "should insert a delayed job" do
      @tube.put "delayed put #{@time}", :delay => 1
      assert_equal "delayed put #{@time}", @tube.peek(:delayed).body
    end

    it "should support custom serializer" do
      Beaneater.configure.job_serializer = lambda { |b| JSON.dump(b) }
      @tube.put({ foo: "bar"})
      assert_equal '{"foo":"bar"}', @tube.peek(:ready).body
    end

    after do
      Beaneater::Connection.any_instance.unstub(:transmit)
      Beaneater.configure.job_serializer = lambda { |b| b }
    end
  end # put

  describe "for #peek" do
    before do
      @time = Time.now.to_i
    end

    it "should peek delayed" do
      @tube.put "foo delay #{@time}", :delay => 1
      assert_equal "foo delay #{@time}", @tube.peek(:delayed).body
    end

    it "should peek ready" do
      @tube.put "foo ready #{@time}", :delay => 0
      assert_equal "foo ready #{@time}", @tube.peek(:ready).body
    end

    it "should peek buried" do
      @tube.put "foo buried #{@time}"
      @tube.reserve.bury

      assert_equal "foo buried #{@time}", @tube.peek(:buried).body
    end

    it "should return nil for empty peek" do
      assert_nil @tube.peek(:buried)
    end

    it "supports valid JSON" do
      json = '{ "foo" : "bar" }'
      @tube.put(json)
      assert_equal 'bar', JSON.parse(@tube.peek(:ready).body)['foo']
    end

    it "supports non valid JSON" do
      json = '{"message":"hi"}'
      @tube.put(json)
      assert_equal 'hi', JSON.parse(@tube.peek(:ready).body)['message']
    end

    it "supports passing crlf through" do
      @tube.put("\r\n")
      assert_equal "\r\n", @tube.peek(:ready).body
    end

    it "supports passing any byte value through" do
      bytes = (0..255).to_a.pack("c*")
      @tube.put(bytes)
      assert_equal bytes, @tube.peek(:ready).body
    end

    it "should support custom parser" do
      Beaneater.configure.job_parser = lambda { |b| JSON.parse(b) }
      json = '{"message":"hi"}'
      @tube.put(json)
      assert_equal 'hi', @tube.peek(:ready).body['message']
    end

    after do
      Beaneater.configure.job_parser = lambda { |b| b }
    end
  end # peek

  describe "for #reserve" do
    before do
      @time = Time.now.to_i
      @json = %Q&{ "foo" : "#{@time} bar" }&
      @tube.put @json
    end

    it "should reserve job" do
      @job = @tube.reserve
      assert_equal "#{@time} bar", JSON.parse(@job.body)['foo']
      @job.delete
    end

    it "should reserve job with block" do
      job = nil
      @tube.reserve { |j| job = j; job.delete }
      assert_equal "#{@time} bar", JSON.parse(job.body)['foo']
    end

    it "should support custom parser" do
      Beaneater.configure.job_parser = lambda { |b| JSON.parse(b) }
      @job = @tube.reserve
      assert_equal "#{@time} bar", @job.body['foo']
      @job.delete
    end

    after do
      Beaneater.configure.job_parser = lambda { |b| b }
    end
  end # reserve

  describe "for #pause" do
    before do
      @time = Time.now.to_i
      @tube = Beaneater::Tube.new(@beanstalk, 'bam')
      @tube.put "foo pause #{@time}"
    end

    it "should allow tube pause" do
      assert_equal 0, @tube.stats.pause
      @tube.pause(1)
      assert_equal 1, @tube.stats.pause
    end
  end # pause

  describe "for #stats" do
    before do
      @time = Time.now.to_i
      @tube.put "foo stats #{@time}"
      @stats = @tube.stats
    end

    it "should return total number of jobs in tube" do
      assert_equal 1, @stats['current_jobs_ready']
      assert_equal 0, @stats.current_jobs_delayed
    end

    it "should raise error for empty tube" do
      assert_raises(Beaneater::NotFoundError) { @beanstalk.tubes.find('fake_tube').stats }
    end
  end # stats

  describe "for #kick" do
    before do
      @time, @time2 = 2.times.map { Time.now.to_i }
      @tube.put "kick #{@time}"
      @tube.put "kick #{@time2}"

      2.times.map { @tube.reserve.bury }
    end

    it "should kick 2 buried jobs" do
      assert_equal 2, @tube.stats.current_jobs_buried
      @tube.kick(2)
      assert_equal 0, @tube.stats.current_jobs_buried
      assert_equal 2, @tube.stats.current_jobs_ready
    end
  end # kick

  describe "for #clear" do
    @time = Time.now.to_i
    before do
      2.times { |i| @tube.put "to clear success #{i} #{@time}" }
      2.times { |i| @tube.put "to clear delayed #{i} #{@time}", :delay => 5 }
      2.times { |i| @tube.put "to clear bury #{i} #{@time}", :pri => 1 }
      @tube.reserve.bury while @tube.peek(:ready).stats['pri'] == 1
    end

    it "should clear all jobs in tube" do
      tube_counts = lambda { %w(ready buried delayed).map { |s| @tube.stats["current_jobs_#{s}"] } }
      assert_equal [2, 2, 2], tube_counts.call
      @tube.clear
      stats = @tube.stats
      assert_equal [0, 0, 0], tube_counts.call
    end
  end # clear
end # Beaneater::Tube