1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
#!/usr/bin/env python
import unittest
from twisted.internet import reactor, protocol
from pymodbus.constants import Defaults
from pymodbus.client.asynchronous import ModbusClientProtocol
from base_runner import Runner
class AsynchronousTcpClient(Runner, unittest.TestCase):
"""
These are the integration tests for the asynchronous
tcp client.
"""
def setUp(self):
""" Initializes the test environment """
def _callback(client): self.client = client
self.initialize(["../tools/reference/diagslave", "-m", "tcp", "-p", "12345"])
defer = protocol.ClientCreator(reactor, ModbusClientProtocol
).connectTCP("localhost", Defaults.Port)
defer.addCallback(_callback)
reactor.run()
def tearDown(self):
""" Cleans up the test environment """
reactor.callLater(1, client.transport.loseConnection)
reactor.callLater(2, reactor.stop)
reactor.shutdown()
# --------------------------------------------------------------------------- #
# Main
# --------------------------------------------------------------------------- #
if __name__ == "__main__":
unittest.main()
|