File: client.py

package info (click to toggle)
python-ofxclient 2.0.4-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 256 kB
  • sloc: python: 1,322; makefile: 6
file content (290 lines) | stat: -rw-r--r-- 9,935 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
from __future__ import absolute_import
from __future__ import unicode_literals
try:
    # python 3
    from http.client import HTTPSConnection
except ImportError:
    # python 2
    from httplib import HTTPSConnection
import logging
import time
try:
    # python 3
    from urllib.parse import splittype, splithost
except ImportError:
    # python 2
    from urllib import splittype, splithost
import uuid

DEFAULT_APP_ID = 'QWIN'
DEFAULT_APP_VERSION = '2500'
DEFAULT_OFX_VERSION = '102'
DEFAULT_USER_AGENT = 'httpclient'
DEFAULT_ACCEPT = '*/*, application/x-ofx'

LINE_ENDING = "\r\n"


def ofx_uid():
    return str(uuid.uuid4().hex)


class Client:
    """This communicates with the banks via the OFX protocol

    :param institution: institution to connect to
    :type institution: :py:class:`ofxclient.Institution`
    :param id: client id (optional need for OFX version >= 103)
    :type id: string
    :param app_id: OFX app id
    :type app_id: string
    :param app_version: OFX app version
    :type app_version: string
    :param ofx_version: OFX spec version
    :type ofx_version: string
    :param user_agent: Value to send for User-Agent HTTP header. Leave as
      None to send default. Set to False to not send User-Agent header.
    :type user_agent: str, None or False
    :param accept: Value to send for Accept HTTP header. Leave as
      None to send default. Set to False to not send User-Agent header.
    :type accept: str, None or False
    """

    def __init__(
        self,
        institution,
        id=ofx_uid(),
        app_id=DEFAULT_APP_ID,
        app_version=DEFAULT_APP_VERSION,
        ofx_version=DEFAULT_OFX_VERSION,
        user_agent=DEFAULT_USER_AGENT,
        accept=DEFAULT_ACCEPT
    ):
        self.institution = institution
        self.id = id
        self.app_id = app_id
        self.app_version = app_version
        self.ofx_version = ofx_version
        self.user_agent = user_agent
        self.accept = accept
        # used when serializing Institutions
        self._init_args = {
            'id': self.id,
            'app_id': self.app_id,
            'app_version': self.app_version,
            'ofx_version': self.ofx_version,
            'user_agent': self.user_agent,
            'accept': self.accept
        }
        self.cookie = 3

    @property
    def init_args(self):
        """
        Return a dict of the arguments used to initialize this client,
        suitable for use when serializing an Institution.

        :return: constructor arguments
        :rtype: dict
        """
        return self._init_args

    def authenticated_query(
        self,
        with_message=None,
        username=None,
        password=None
    ):
        """Authenticated query

        If you pass a 'with_messages' array those queries will be passed along
        otherwise this will just be an authentication probe query only.
        """
        u = username or self.institution.username
        p = password or self.institution.password

        contents = ['OFX', self._signOn(username=u, password=p)]
        if with_message:
            contents.append(with_message)
        return LINE_ENDING.join([self.header(), _tag(*contents)])

    def bank_account_query(self, number, date, account_type, bank_id):
        """Bank account statement request"""
        return self.authenticated_query(
            self._bareq(number, date, account_type, bank_id)
        )

    def credit_card_account_query(self, number, date):
        """CC Statement request"""
        return self.authenticated_query(self._ccreq(number, date))

    def brokerage_account_query(self, number, date, broker_id):
        return self.authenticated_query(
            self._invstreq(broker_id, number, date))

    def account_list_query(self, date='19700101000000'):
        return self.authenticated_query(self._acctreq(date))

    def post(self, query):
        """
        Wrapper around ``_do_post()`` to handle accounts that require
        sending back session cookies (``self.set_cookies`` True).
        """
        res, response = self._do_post(query)
        cookies = res.getheader('Set-Cookie', None)
        if len(response) == 0 and cookies is not None and res.status == 200:
            logging.debug('Got 0-length 200 response with Set-Cookies header; '
                          'retrying request with cookies')
            _, response = self._do_post(query, [('Cookie', cookies)])
        return response

    def _do_post(self, query, extra_headers=[]):
        """
        Do a POST to the Institution.

        :param query: Body content to POST (OFX Query)
        :type query: str
        :param extra_headers: Extra headers to send with the request, as a list
          of (Name, Value) header 2-tuples.
        :type extra_headers: list
        :return: 2-tuple of (HTTPResponse, str response body)
        :rtype: tuple
        """
        i = self.institution
        logging.debug('posting data to %s' % i.url)
        garbage, path = splittype(i.url)
        host, selector = splithost(path)
        h = HTTPSConnection(host, timeout=60)
        # Discover requires a particular ordering of headers, so send the
        # request step by step.
        h.putrequest('POST', selector, skip_host=True,
                     skip_accept_encoding=True)
        headers = [
            ('Content-Type', 'application/x-ofx'),
            ('Host', host),
            ('Content-Length', len(query)),
            ('Connection', 'Keep-Alive')
        ]
        if self.accept:
            headers.append(('Accept', self.accept))
        if self.user_agent:
            headers.append(('User-Agent', self.user_agent))
        for ehname, ehval in extra_headers:
            headers.append((ehname, ehval))
        logging.debug('---- request headers ----')
        for hname, hval in headers:
            logging.debug('%s: %s', hname, hval)
            h.putheader(hname, hval)
        logging.debug('---- request body (query) ----')
        logging.debug(query)
        h.endheaders(query.encode())
        res = h.getresponse()
        response = res.read().decode('ascii', 'ignore')
        logging.debug('---- response ----')
        logging.debug(res.__dict__)
        logging.debug('Headers: %s', res.getheaders())
        logging.debug(response)
        res.close()
        return res, response

    def next_cookie(self):
        self.cookie += 1
        return str(self.cookie)

    def header(self):
        parts = [
            "OFXHEADER:100",
            "DATA:OFXSGML",
            "VERSION:%d" % int(self.ofx_version),
            "SECURITY:NONE",
            "ENCODING:USASCII",
            "CHARSET:1252",
            "COMPRESSION:NONE",
            "OLDFILEUID:NONE",
            "NEWFILEUID:"+ofx_uid(),
            ""
        ]
        return LINE_ENDING.join(parts)

    """Generate signon message"""
    def _signOn(self, username=None, password=None):
        i = self.institution
        u = username or i.username
        p = password or i.password
        fidata = [_field("ORG", i.org)]
        if i.id:
            fidata.append(_field("FID", i.id))

        client_uid = ''
        if str(self.ofx_version) == '103':
            client_uid = _field('CLIENTUID', self.id)

        return _tag("SIGNONMSGSRQV1",
                    _tag("SONRQ",
                         _field("DTCLIENT", now()),
                         _field("USERID", u),
                         _field("USERPASS", p),
                         _field("LANGUAGE", "ENG"),
                         _tag("FI", *fidata),
                         _field("APPID", self.app_id),
                         _field("APPVER", self.app_version),
                         client_uid
                         ))

    def _acctreq(self, dtstart):
        req = _tag("ACCTINFORQ", _field("DTACCTUP", dtstart))
        return self._message("SIGNUP", "ACCTINFO", req)

# this is from _ccreq below and reading page 176 of the latest OFX doc.
    def _bareq(self, acctid, dtstart, accttype, bankid):
        req = _tag("STMTRQ",
                   _tag("BANKACCTFROM",
                        _field("BANKID", bankid),
                        _field("ACCTID", acctid),
                        _field("ACCTTYPE", accttype)),
                   _tag("INCTRAN",
                        _field("DTSTART", dtstart),
                        _field("INCLUDE", "Y")))
        return self._message("BANK", "STMT", req)

    def _ccreq(self, acctid, dtstart):
        req = _tag("CCSTMTRQ",
                   _tag("CCACCTFROM", _field("ACCTID", acctid)),
                   _tag("INCTRAN",
                        _field("DTSTART", dtstart),
                        _field("INCLUDE", "Y")))
        return self._message("CREDITCARD", "CCSTMT", req)

    def _invstreq(self, brokerid, acctid, dtstart):
        req = _tag("INVSTMTRQ",
                   _tag("INVACCTFROM",
                        _field("BROKERID", brokerid),
                        _field("ACCTID", acctid)),
                   _tag("INCTRAN",
                        _field("DTSTART", dtstart),
                        _field("INCLUDE", "Y")),
                   _field("INCOO", "Y"),
                   _tag("INCPOS",
                        _field("DTASOF", now()),
                        _field("INCLUDE", "Y")),
                   _field("INCBAL", "Y"))
        return self._message("INVSTMT", "INVSTMT", req)

    def _message(self, msgType, trnType, request):
        return _tag(msgType+"MSGSRQV1",
                    _tag(trnType+"TRNRQ",
                         _field("TRNUID", ofx_uid()),
                         _field("CLTCOOKIE", self.next_cookie()),
                         request))


def _field(tag, value):
    return "<"+tag+">"+value


def _tag(tag, *contents):
    return LINE_ENDING.join(['<'+tag+'>']+list(contents)+['</'+tag+'>'])


def now():
    return time.strftime("%Y%m%d%H%M%S", time.localtime())