File: import_data_set_sync_batching.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (62 lines) | stat: -rw-r--r-- 1,822 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""
How to use RxPY to prepare batches for synchronous write into InfluxDB
"""

from csv import DictReader

import reactivex as rx
from reactivex import operators as ops

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS


def csv_to_generator(csv_file_path):
    """
    Parse your CSV file into generator
    """
    for row in DictReader(open(csv_file_path, 'r')):
        point = Point('financial-analysis') \
            .tag('type', 'vix-daily') \
            .field('open', float(row['VIX Open'])) \
            .field('high', float(row['VIX High'])) \
            .field('low', float(row['VIX Low'])) \
            .field('close', float(row['VIX Close'])) \
            .time(row['Date'])
        yield point


"""
Define Retry strategy - 3 attempts => 2, 4, 8
"""
retries = WritesRetry(total=3, retry_interval=1, exponential_base=2)
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client:

    """
    Use synchronous version of WriteApi to strongly depends on result of write
    """
    write_api = client.write_api(write_options=SYNCHRONOUS)

    """
    Prepare batches from generator
    """
    batches = rx \
        .from_iterable(csv_to_generator('vix-daily.csv')) \
        .pipe(ops.buffer_with_count(500))


    def write_batch(batch):
        """
        Synchronous write
        """
        print(f'Writing... {len(batch)}')
        write_api.write(bucket='my-bucket', record=batch)


    """
    Write batches
    """
    batches.subscribe(on_next=lambda batch: write_batch(batch),
                      on_error=lambda ex: print(f'Unexpected error: {ex}'),
                      on_completed=lambda: print('Import finished!'))