File: impl.py

package info (click to toggle)
ros-osrf-pycommon 2.1.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 360 kB
  • sloc: python: 1,726; makefile: 146; xml: 16
file content (142 lines) | stat: -rw-r--r-- 5,038 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright 2014 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os

try:
    import pty
    has_pty = True
except ImportError:
    has_pty = False

from ..get_loop_impl import get_loop_impl


def get_loop():
    return get_loop_impl(asyncio)


async def _async_execute_process_nopty(
    protocol_class, cmd, cwd, env, shell,
    stderr_to_stdout=True
):
    loop = get_loop()
    stderr = asyncio.subprocess.PIPE
    if stderr_to_stdout is True:
        stderr = asyncio.subprocess.STDOUT
    # Start the subprocess
    if shell is True:
        transport, protocol = await loop.subprocess_shell(
            protocol_class, " ".join(cmd), cwd=cwd, env=env,
            stderr=stderr, close_fds=False)
    else:
        transport, protocol = await loop.subprocess_exec(
            protocol_class, *cmd, cwd=cwd, env=env,
            stderr=stderr, close_fds=False)
    return transport, protocol


if has_pty:
    # If pty is availabe, use them to emulate the tty
    async def _async_execute_process_pty(
        protocol_class, cmd, cwd, env, shell,
        stderr_to_stdout=True
    ):
        loop = get_loop()
        # Create the PTY's
        stdout_master, stdout_slave = pty.openpty()
        if stderr_to_stdout:
            stderr_master, stderr_slave = stdout_master, stdout_slave
        else:
            stderr_master, stderr_slave = pty.openpty()

        def protocol_factory():
            return protocol_class(
                stdin=None,
                stdout=stdout_master,
                stderr=stderr_master
            )

        # Start the subprocess
        if shell is True:
            transport, protocol = await loop.subprocess_shell(
                protocol_factory, " ".join(cmd), cwd=cwd, env=env,
                stdout=stdout_slave, stderr=stderr_slave, close_fds=False)
        else:
            transport, protocol = await loop.subprocess_exec(
                protocol_factory, *cmd, cwd=cwd, env=env,
                stdout=stdout_slave, stderr=stderr_slave, close_fds=False)

        # Close our copies of the slaves,
        # the child's copy of the slave remain open until it terminates
        os.close(stdout_slave)
        if not stderr_to_stdout:
            os.close(stderr_slave)

        # Create Protocol classes
        class PtyStdoutProtocol(asyncio.Protocol):
            def connection_made(self, transport):
                if hasattr(protocol, 'on_stdout_open'):
                    protocol.on_stdout_open()

            def data_received(self, data):
                if hasattr(protocol, 'on_stdout_received'):
                    protocol.on_stdout_received(data)

            def connection_lost(self, exc):
                if hasattr(protocol, 'on_stdout_close'):
                    protocol.on_stdout_close(exc)

        class PtyStderrProtocol(asyncio.Protocol):
            def connection_made(self, transport):
                if hasattr(protocol, 'on_stderr_open'):
                    protocol.on_stderr_open()

            def data_received(self, data):
                if hasattr(protocol, 'on_stderr_received'):
                    protocol.on_stderr_received(data)

            def connection_lost(self, exc):
                if hasattr(protocol, 'on_stderr_close'):
                    protocol.on_stderr_close(exc)

        # Add the pty's to the read loop
        # Also store the transport, protocol tuple for each call to
        # connect_read_pipe, to prevent the destruction of the protocol
        # class instance, otherwise no data is received.
        protocol.stdout_tuple = await loop.connect_read_pipe(
            PtyStdoutProtocol, os.fdopen(stdout_master, 'rb', 0))
        if not stderr_to_stdout:
            protocol.stderr_tuple = await loop.connect_read_pipe(
                PtyStderrProtocol, os.fdopen(stderr_master, 'rb', 0))
        # Return the protocol and transport
        return transport, protocol
else:
    _async_execute_process_pty = _async_execute_process_nopty


async def async_execute_process(
    protocol_class, cmd=None, cwd=None, env=None, shell=False,
    emulate_tty=False, stderr_to_stdout=True
):
    if emulate_tty:
        transport, protocol = await _async_execute_process_pty(
            protocol_class, cmd, cwd, env, shell,
            stderr_to_stdout)
    else:
        transport, protocol = await _async_execute_process_nopty(
            protocol_class, cmd, cwd, env, shell,
            stderr_to_stdout)
    return transport, protocol