File: test_fixtures.py

package info (click to toggle)
python-rq 2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,580 kB
  • sloc: python: 13,878; makefile: 22; sh: 19
file content (16 lines) | stat: -rw-r--r-- 653 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from rq import Queue
from tests import RQTestCase, fixtures


class TestFixtures(RQTestCase):
    def test_rpush_fixture(self):
        connection_kwargs = self.connection.connection_pool.connection_kwargs
        fixtures.rpush('foo', 'bar', connection_kwargs)
        assert self.connection.lrange('foo', 0, 0)[0].decode() == 'bar'

    def test_start_worker_fixture(self):
        queue = Queue(name='testing', connection=self.connection)
        queue.enqueue(fixtures.say_hello)
        conn_kwargs = self.connection.connection_pool.connection_kwargs
        fixtures.start_worker(queue.name, conn_kwargs, 'w1', True)
        assert not queue.jobs