File: core.py

package info (click to toggle)
python-ulmo 0.8.4%2Bdfsg1-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,100 kB
  • sloc: python: 6,752; makefile: 145
file content (299 lines) | stat: -rw-r--r-- 10,222 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""
    ulmo.usgs.eddn.core
    ~~~~~~~~~~~~~~~~~~~~~

    This module provides access to data provided by the `United States Geological
    Survey`_ `Emergency Data Distribution Network`_ web site.

    The `DCP message format`_ includes some header information that is parsed and
    the message body, with a variable number of characters. The format of the
    message body varies widely depending on the manufacturer of the transmitter,
    data logger, sensors, and the technician who programmed the DCP. The body can
    be simple ASCII, sometime with parameter codes and time-stamps embedded,
    sometimes not. The body can also be in 'Pseudo-Binary' which is character
    encoding of binary data that uses 6 bits of every byte and guarantees that
    all characters are printable.


    .. _United States Geological Survey: http://www.usgs.gov/
    .. _Emergency Data Distribution Network: http://eddn.usgs.gov/
    .. _http://eddn.usgs.gov/dcpformat.html

"""
from past.builtins import basestring

from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import isodate
import logging
import os
import pandas as pd
import re
import requests
import shutil
from ulmo import util

from . import parsers

# eddn query base url
EDDN_URL = 'http://eddn.usgs.gov/cgi-bin/retrieveData.pl?%s'

# default file path (appended to default ulmo path)
DEFAULT_FILE_PATH = 'usgs/eddn/'

# configure logging
LOG_FORMAT = '%(message)s'
logging.basicConfig(format=LOG_FORMAT)
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)


def decode(dataframe, parser, **kwargs):
    """decodes dcp message data in pandas dataframe returned by ulmo.usgs.eddn.get_data().

    Parameters
    ----------
    dataframe : pandas.DataFrame
        pandas.DataFrame returned by ulmo.usgs.eddn.get_data()
    parser : {function, str}
        function that acts on dcp_message each row of the dataframe and returns a new dataframe
        containing several rows of decoded data. This returned dataframe may have different
        (but derived) timestamps than that the original row. If a string is passed then a matching
        parser function is looked up from ulmo.usgs.eddn.parsers

    Returns
    -------
    decoded_data : pandas.DataFrame
        pandas dataframe, the format and parameters in the returned dataframe depend wholly on the parser used

    """
    if isinstance(parser, basestring):
        parser = getattr(parsers, parser)

    df = []
    for timestamp, data in dataframe.iterrows():
        parsed = parser(data, **kwargs)
        parsed.dropna(how='all', inplace=True)
        if not parsed.empty:
            df.append(parsed)

    df = pd.concat(df)
    return df


def get_data(
        dcp_address, start=None, end=None, networklist='', channel='', spacecraft='Any', baud='Any',
        electronic_mail='', dcp_bul='', glob_bul='', timing='', retransmitted='Y', daps_status='N',
        use_cache=False, cache_path=None, as_dataframe=True):
    """Fetches GOES Satellite DCP messages from USGS Emergency Data Distribution Network.

    Parameters
    ----------
    dcp_address : str, iterable of strings
        DCP address or list of DCP addresses to be fetched; lists will be joined by a ','.
    start : {``None``, str, datetime, datetime.timedelta}
        If ``None`` (default) then the start time is 2 days prior (or date of last data if cache is used)
        If a datetime or datetime like string is specified it will be used as the start date.
        If a timedelta or string in ISO 8601 period format (e.g 'P2D' for a period of 2 days) then
        'now' minus the timedelta will be used as the start.
        NOTE: The EDDN service does not specify how far back data is available. The service also imposes
        a maximum data limit of 25000 character. If this is limit reached multiple requests will be made 
        until all available data is downloaded. 
    end : {``None``, str, datetime, datetime.timedelta}
        If ``None`` (default) then the end time is 'now'
        If a datetime or datetime like string is specified it will be used as the end date.
        If a timedelta or string in ISO 8601 period format (e.g 'P2D' for a period of 2 days) then
        'now' minus the timedelta will be used as the end.
        NOTE: The EDDN service does not specify how far back data is available. The service also imposes
        a maximum data limit of 25000 character.
    networklist : str,
        '' (default). Filter by network.
    channel : str,
        '' (default). Filter by channel.
    spacecraft : str,
        East, West, Any (default). Filter by GOES East/West Satellite
    baud : str,
        'Any' (default). Filter by baud rate. See http://eddn.usgs.gov/msgaccess.html for options
    electronic_mail : str,
        '' (default) or 'Y'
    dcp_bul : str,
        '' (default) or 'Y'
    glob_bul : str,
        '' (default) or 'Y'
    timing : str,
        '' (default) or 'Y'
    retransmitted : str,
        'Y' (default) or 'N'
    daps_status : str,
        'N' (default) or 'Y'
    use_cache : bool,
        If True (default) use hdf file to cache data and retrieve new data on subsequent requests
    cache_path : {``None``, str},
        If ``None`` use default ulmo location for cached files otherwise use specified path. files are named
        using dcp_address.
    as_dataframe : bool
        If True (default) return data in a pandas dataframe otherwise return a dict.

    Returns
    -------
    message_data : {pandas.DataFrame, dict}
        Either a pandas dataframe or a dict indexed by dcp message times
    """

    if isinstance(dcp_address, list):
        dcp_address = ','.join(dcp_address)

    data = pd.DataFrame()

    if use_cache:
        dcp_data_path = _get_store_path(cache_path, dcp_address + '.h5')
        if os.path.exists(dcp_data_path):
            data = pd.read_hdf(dcp_data_path, dcp_address)

    if start:
        drs_since = _format_time(start)
    else:
        try:
            drs_since = _format_time(data['message_timestamp_utc'][-1])
        except:
            drs_since = 'now -2 days'

    if end:
        drs_until = _format_time(end)
    else:
        drs_until = 'now'

    params = {}
    params['DCP_ADDRESS'] = dcp_address
    params['DRS_SINCE'] = drs_since
    params['DRS_UNTIL'] = drs_until
    params['NETWORKLIST'] = networklist
    params['CHANNEL'] = channel
    params['BEFORE'] = '//START\n',
    params['AFTER'] = '\n//END\n',
    params['SPACECRAFT'] = spacecraft
    params['BAUD'] = baud
    params['ELECTRONIC_MAIL'] = electronic_mail
    params['DCP_BUL'] = dcp_bul
    params['GLOB_BUL'] = glob_bul
    params['TIMING'] = timing
    params['RETRANSMITTED'] = retransmitted
    params['DAPS_STATUS'] = daps_status

    data_limit_reached = True
    messages = []
    while data_limit_reached:
        new_message, data_limit_reached =  _fetch_url(params)
        messages += new_message
        if data_limit_reached:
            params['DRS_UNTIL'] = _format_time(_parse(new_message[-1])['message_timestamp_utc'])

    new_data = pd.DataFrame([_parse(row) for row in messages])

    if not new_data.empty:
        new_data.index = new_data.message_timestamp_utc
        data = new_data.combine_first(data)
        data.sort(inplace=True)

        if use_cache:
            #write to a tmp file and move to avoid ballooning h5 file
            tmp = dcp_data_path + '.tmp'
            data.to_hdf(tmp, dcp_address)
            shutil.move(tmp, dcp_data_path)

    if data.empty:
        if as_dataframe:
            return data
        else:
            return {}

    if start:
        if start.startswith('P'):
            start = data['message_timestamp_utc'][-1] - isodate.parse_duration(start)

        data = data[start:]

    if end:
        if end.startswith('P'):
            end = data['message_timestamp_utc'][-1] - isodate.parse_duration(end)

        data = data[:end]

    if not as_dataframe:
        data = data.T.to_dict()

    return data


def _fetch_url(params):
    r = requests.get(EDDN_URL, params=params)
    log.info('data requested using url: %s\n' % r.url)
    soup = BeautifulSoup(r.text)
    message = soup.find('pre').contents[0].replace('\n', '').replace('\r', ' ')

    data_limit_reached = False
    if 'Max data limit reached' in message:
        data_limit_reached = True
        log.info('Max data limit reached, making new request for older data\n')

    if not message:
        log.info('No data found\n')
        message = []
    else:
        message = [msg[1].strip() for msg in re.findall('(//START)(.*?)(//END)', message, re.M | re.S)]

    return message, data_limit_reached


def _format_period(period):
    days, hours, minutes = period.days, period.seconds // 3600, (period.seconds // 60) % 60

    if minutes:
        return 'now -%s minutes' % period.seconds / 60

    if hours:
        return 'now -%s hours' % period.seconds / 3600

    if days:
        return 'now -%s days' % days


def _format_time(timestamp):

    if isinstance(timestamp, basestring):
        if timestamp.startswith('P'):
            timestamp = isodate.parse_duration(timestamp)
        else:
            timestamp = isodate.parse_datetime(timestamp)

    if isinstance(timestamp, datetime):
        return timestamp.strftime('%Y/%j %H:%M:%S')
    elif isinstance(timestamp, timedelta):
        return _format_period(timestamp)


def _get_store_path(path, default_file_name):
    if path is None:
        path = os.path.join(util.get_ulmo_dir(), DEFAULT_FILE_PATH)

    if not os.path.exists(path):
        os.makedirs(path)

    return os.path.join(path, default_file_name)


def _parse(line):
    return {
        'dcp_address': line[:8],
        'message_timestamp_utc': datetime.strptime(line[8:19], '%y%j%H%M%S'),
        'failure_code': line[19:20],
        'signal_strength': line[20:22],
        'frequency_offset': line[22:24],
        'modulation_index': line[24:25],
        'data_quality_indicator': line[25:26],
        'goes_receive_channel': line[26:29],
        'goes_spacecraft_indicator': line[29:30],
        'uplink_carrier_status': line[30:32],
        'message_data_length': line[32:37],
        'dcp_message': line[37:],
    }