File: stock_predictions_import_data.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (86 lines) | stat: -rw-r--r-- 2,784 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0

https://datahub.io/core/finance-vix#data
"""
from collections import OrderedDict
from csv import DictReader
from datetime import timezone

import ciso8601
import requests
import reactivex as rx
from reactivex import operators as ops

from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client.client.write.point import EPOCH

_progress = 0


def parse_row(row: OrderedDict):
    """Parse row of CSV file into LineProtocol with structure:

    CSV format:
        date,symbol,open,close,low,high,volume
        2016-01-05,WLTW,123.43,125.839996,122.309998,126.25,2163600.0
        2016-01-06,WLTW,125.239998,119.980003,119.940002,125.540001,2386400.0
        2016-01-07,WLTW,116.379997,114.949997,114.93,119.739998,2489500.0
        2016-01-08,WLTW,115.480003,116.620003,113.5,117.440002,2006300.0
        2016-01-11,WLTW,117.010002,114.970001,114.089996,117.330002,1408600.0
        2016-01-12,WLTW,115.510002,115.550003,114.5,116.059998,1098000.0
        2016-01-13,WLTW,116.459999,112.849998,112.589996,117.07,949600.0
        ...

    :param row: the row of CSV file
    :return: Parsed csv row to LineProtocol
    """
    global _progress
    _progress += 1

    if _progress % 10000 == 0:
        print(_progress)

    time = (ciso8601.parse_datetime(row["date"]).replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1e9

    return f'financial-analysis,symbol={row["symbol"]} ' \
           f'close={row["close"]},high={row["high"]},low={row["low"]},open={row["open"]} ' \
           f'{int(time)}'


def main():
    parse_row.progress = 0

    url = "https://github.com/influxdata/influxdb-client-python/wiki/data/stock-prices-example.csv"
    response = requests.get(url, stream=True)
    data = rx \
        .from_iterable(DictReader(response.iter_lines(decode_unicode=True))) \
        .pipe(ops.map(lambda row: parse_row(row)))

    client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False)
    write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))

    write_api.write(bucket="my-bucket", record=data)
    write_api.close()

    query = '''
    from(bucket:"my-bucket")
            |> range(start: 0, stop: now())
            |> filter(fn: (r) => r._measurement == "financial-analysis")
            |> filter(fn: (r) => r.symbol == "AAPL")
            |> filter(fn: (r) => r._field == "close")
            |> drop(columns: ["_start", "_stop", "table", "_field","_measurement"])
    '''

    result = client.query_api().query_data_frame(query=query)
    print(result.head(100))

    """
    Close client
    """
    client.close()
    # %%


if __name__ == '__main__':
    main()