File: connection.py

package info (click to toggle)
python-pure-python-adb 0.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,500 kB
  • sloc: python: 2,597; makefile: 8; sh: 1
file content (118 lines) | stat: -rw-r--r-- 3,214 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import struct
import socket

from ppadb.protocol import Protocol
from ppadb.utils.logger import AdbLogging

logger = AdbLogging.get_logger(__name__)


class Connection:
    def __init__(self, host='localhost', port=5037, timeout=None):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.socket = None

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def connect(self):
        logger.debug("Connect to adb server - {}:{}".format(self.host, self.port))

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        l_onoff = 1
        l_linger = 0

        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger))
        if self.timeout:
            self.socket.settimeout(self.timeout)

        try:
            self.socket.connect((self.host, self.port))
        except socket.error as e:
            self.close()
            raise RuntimeError("ERROR: connecting to {}:{} {}.\nIs adb running on your computer?".format(
                self.host,
                self.port,
                e
            ))

        return self.socket

    def close(self):
        if not self.socket:
            return

        logger.debug("Connection closed...")
        try:
            self.socket.close()
        except OSError:
            pass

    ##############################################################################################################
    #
    # Send command & Receive command result
    #
    ##############################################################################################################
    def _recv(self, length):
        return self.socket.recv(length)

    def _recv_into(self, length):
        recv = bytearray(length)
        view = memoryview(recv)
        self.socket.recv_into(view)
        return recv

    def _send(self, data):
        self.socket.send(data)

    def receive(self):
        nob = int(self._recv(4).decode('utf-8'), 16)
        recv = self._recv_into(nob)

        return recv.decode('utf-8')

    def send(self, msg):
        msg = Protocol.encode_data(msg)
        logger.debug(msg)
        self._send(msg)
        return self._check_status()

    def _check_status(self):
        recv = self._recv(4).decode('utf-8')
        if recv != Protocol.OKAY:
            error = self._recv(1024).decode('utf-8')
            raise RuntimeError("ERROR: {} {}".format(repr(recv), error))

        return True

    def check_status(self):
        return self._check_status()

    ##############################################################################################################
    #
    # Socket read/write
    #
    ##############################################################################################################
    def read_all(self):
        data = bytearray()

        while True:
            recv = self._recv(4096)
            if not recv:
                break
            data += recv

        return data

    def read(self, length=0):
        data = self._recv(length)
        return data

    def write(self, data):
        self._send(data)