File: ingest_large_dataframe.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (69 lines) | stat: -rw-r--r-- 1,692 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
How to ingest large DataFrame by splitting into chunks.
"""
import logging
import random
from datetime import datetime

from influxdb_client import InfluxDBClient
from influxdb_client.extras import pd, np

"""
Enable logging for DataFrame serializer
"""
loggerSerializer = logging.getLogger('influxdb_client.client.write.dataframe_serializer')
loggerSerializer.setLevel(level=logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s | %(message)s'))
loggerSerializer.addHandler(handler)

"""
Configuration
"""
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'

"""
Generate Dataframe
"""
print()
print("=== Generating DataFrame ===")
print()
dataframe_rows_count = 150_000

col_data = {
    'time': np.arange(0, dataframe_rows_count, 1, dtype=int),
    'tag': np.random.choice(['tag_a', 'tag_b', 'test_c'], size=(dataframe_rows_count,)),
}
for n in range(2, 2999):
    col_data[f'col{n}'] = random.randint(1, 10)

data_frame = pd.DataFrame(data=col_data).set_index('time')
print(data_frame)

"""
Ingest DataFrame
"""
print()
print("=== Ingesting DataFrame via batching API ===")
print()
startTime = datetime.now()

with InfluxDBClient(url=url, token=token, org=org) as client:

    """
    Use batching API
    """
    with client.write_api() as write_api:
        write_api.write(bucket=bucket, record=data_frame,
                        data_frame_tag_columns=['tag'],
                        data_frame_measurement_name="measurement_name")
        print()
        print("Wait to finishing ingesting DataFrame...")
        print()

print()
print(f'Import finished in: {datetime.now() - startTime}')
print()