File: test_bulk.py

package info (click to toggle)
python-fs 2.4.16-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,944 kB
  • sloc: python: 13,048; makefile: 226; sh: 3
file content (18 lines) | stat: -rw-r--r-- 410 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from __future__ import unicode_literals

import unittest

from fs._bulk import Copier, _Task
from fs.errors import BulkCopyFailed


class BrokenTask(_Task):
    def __call__(self):
        1 / 0


class TestBulk(unittest.TestCase):
    def test_worker_error(self):
        with self.assertRaises(BulkCopyFailed):
            with Copier(num_workers=2) as copier:
                copier.queue.put(BrokenTask())