File: test_service_threads.py

package info (click to toggle)
python-jtoolkit 0.7.8-2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 1,436 kB
  • ctags: 2,536
  • sloc: python: 15,143; makefile: 20
file content (141 lines) | stat: -rw-r--r-- 4,729 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python

import threading
import time
import win32serviceutil
import win32evtlog
import win32evtlogutil
import win32service
import servicemanager
import sys
import os
import traceback

class TestThreading:
	ThreadClass = threading.Thread
	def target(self, argdict):
		argdict["success"] = True
	def test_thread_creation(self):
		argdict = {}
		t = self.ThreadClass(target = self.target, args = (argdict,))
		t.start()
		t.join()
		assert "success" in argdict
		return True

class TestLuceneThreading(TestThreading):
	def setup_class(cls):
		import PyLucene
		cls.ThreadClass = PyLucene.PythonThread

def run_tests():
	testplain = TestThreading()
	print testplain.test_thread_creation()
	testlucene = TestLuceneThreading()
	testlucene.setup_class()
	print testlucene.test_thread_creation()

class ServiceRunner(win32serviceutil.ServiceFramework):
	_svc_name_ = "PyThreadTest"
	_svc_display_name_ = "PyThreadTest"
	_svc_evt_src_ = _svc_name_
	def SvcStop(self):
		self.dolog("CALLED SvcStop")
		self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
		self.stop = True
	def SvcDoRun(self):
		self.dolog("CALLED SvcDoRun")
		while not self.stop:
			self.run_tests()
	def run_tests(self):
		testplain = TestThreading()
		testplain.test_thread_creation()
		win32evtlogutil.ReportEvent(self._svc_evt_src_, 1, 0, win32evtlog.EVENTLOG_INFORMATION_TYPE, ["Plain Test passed"], None, None)
		# testlucene = TestLuceneThreading()
		# testlucene.setup_class()
		# testlucene.test_thread_creation()
		# win32evtlogutil.ReportEvent(self._svc_evt_src_, 1, 0, win32evtlog.EVENTLOG_INFORMATION_TYPE, ["PyLucene Test passed"], None, None)
		time.sleep(1)
	def dolog(self, message):
		win32evtlogutil.ReportEvent(service._svc_evt_src_, 1, 0, win32evtlog.EVENTLOG_INFORMATION_TYPE, [message], None, None)

class ServiceManager:
	_svc_name_ = "PyThreadTest"
	_svc_display_name_ = "PyThreadTest"
	_svc_evt_src_ = _svc_name_
	def get_manager(self):
		self.hscm = win32service.OpenSCManager(None, None, win32service.SC_MANAGER_ALL_ACCESS)
	def release_manager(self):
		win32service.CloseServiceHandle(self.hscm)
	def installservice(self):
		self.get_manager()
		pythonserviceexe = sys.executable # win32serviceutil.LocatePythonServiceExe()
		exeName = "%s %s --runservice" % (pythonserviceexe, os.path.abspath(__file__))
		try:
			hs = win32service.CreateService(self.hscm, self._svc_name_, self._svc_display_name_,
				win32service.SERVICE_ALL_ACCESS,
				win32service.SERVICE_WIN32_OWN_PROCESS,
				win32service.SERVICE_DEMAND_START,
				win32service.SERVICE_ERROR_NORMAL,
				exeName, None, 0, None, None, None)
			win32service.CloseServiceHandle(hs)
		finally:
			self.release_manager()
	def removeservice(self):
		win32serviceutil.RemoveService(self._svc_name_)
	def runservice(self):
		servicemanager.SetEventSourceName(self._svc_evt_src_)
		win32evtlogutil.AddSourceToRegistry(self._svc_evt_src_, win32evtlog.__file__)
		self.stop = False
		servicemanager.Initialize(self._svc_name_, os.path.abspath(servicemanager.__file__))
		servicemanager.PrepareToHostSingle(ServiceRunner)
		self.ssh = servicemanager.RegisterServiceCtrlHandler(self._svc_name_, self.ServiceCtrlHandler)
		servicemanager.StartServiceCtrlDispatcher()
	def launchservice(self):
		hs = None
		self.get_manager()
		try:
			hs = win32service.OpenService(self.hscm, self._svc_name_, win32service.SERVICE_ALL_ACCESS)
			win32service.StartService(hs, [])
			win32serviceutil.WaitForServiceStatus(self._svc_name_, win32service.SERVICE_RUNNING, 5)
			win32service.ControlService(hs, win32service.SERVICE_STOP)
			win32serviceutil.WaitForServiceStatus(self._svc_name_, win32service.SERVICE_STOPPED, 5)
		finally:
			if hs is not None:
				win32service.CloseServiceHandle(hs)
			self.release_manager()

class TestService:
	def disabled_test_service(self):
		servicemanager = ServiceManager()
		try:
			servicemanager.removeservice()
		except Exception, e:
			print "error removing service (not neccessarily serious): %s" % e
		try:
			servicemanager.installservice()
			time.sleep(1)
			servicemanager.launchservice()
		finally:
			servicemanager.removeservice()


if __name__ == "__main__":
	if "--runservice" in sys.argv:
		service = ServiceRunner()
		try:
			service.runservice()
		except Exception, e:
			exc_info = sys.exc_info()
			logmessage = "".join(traceback.format_exception(exc_info[0], exc_info[1], exc_info[2]))
			win32evtlogutil.ReportEvent(service._svc_evt_src_, 1, 0, win32evtlog.EVENTLOG_INFORMATION_TYPE, [logmessage], None, None)
			raise
	elif "--serviceinstall" in sys.argv:
		service = ServiceManager()
		service.installservice()
	elif "--serviceremove" in sys.argv:
		service = ServiceManager()
		service.removeservice()
	else:
		run_tests()