File: test_SimpleMultiprocessPlugin.py

package info (click to toggle)
yapsy 1.12.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 888 kB
  • sloc: python: 2,684; makefile: 82
file content (46 lines) | stat: -rw-r--r-- 1,464 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# -*- coding: utf-8; tab-width: 4; indent-tabs-mode: t; python-indent: 4 -*-

import unittest
import os 

from yapsy.MultiprocessPluginManager import MultiprocessPluginManager

class SimpleMultiprocessTestCase(unittest.TestCase):
	"""
	Test the correct loading of a multiprocessed plugin as well as basic
	communication.
	"""
	
	def setUp(self):
		"""
		init
		"""
		# create the plugin manager
		self.mpPluginManager = MultiprocessPluginManager(directories_list=[
				os.path.join(
					os.path.dirname(os.path.abspath(__file__)),"plugins")],
				plugin_info_ext="multiprocess-plugin")
		# load the plugins that may be found
		self.mpPluginManager.collectPlugins()
		# Will be used later
		self.plugin_info = None

	def testUpAndRunning(self):
		"""
		Test if the plugin is loaded and if the communication pipe is properly setuped.
		"""
		for plugin_index, plugin in enumerate(self.mpPluginManager.getAllPlugins()):
			child_pipe = plugin.plugin_object.child_pipe
			content_from_parent = "hello-{0}-from-parent".format(plugin_index)
			child_pipe.send(content_from_parent)
			content_from_child = False
			if child_pipe.poll(5):
				content_from_child = child_pipe.recv()
			self.assertEqual("{0}|echo_from_child".format(content_from_parent),
							 content_from_child)
		num_tested_plugin = plugin_index+1
		self.assertEqual(2, num_tested_plugin)
		
suite = unittest.TestSuite([
		unittest.TestLoader().loadTestsFromTestCase(SimpleMultiprocessTestCase),
		])