# Example InfluxDB Jupyter notebook - stream data

This example demonstrates how to query data from InfluxDB 2.0 using Flux and display results in real time.

Prerequisites:
1. Start InfluxDB: `./scripts/influxdb-restart.sh`
2. Start Telegraf: `telegraf  -config ./notebooks/telegraf.conf`
3. Install the following dependencies: `rx`, `pandas`, `streamz`, `hvplot`

In [None]:
# Import a Client

import os
import sys

sys.path.insert(0, os.path.abspath('../'))

In [None]:
from datetime import timedelta
from typing import List

import hvplot.streamz
import pandas as pd
import reactivex as rx
from reactivex import operators as ops

from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient

In [None]:
def source_data(auto_refresh: int, query: str, sink: Stream):
    rx \
        .interval(period=timedelta(seconds=auto_refresh)) \
        .pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
                                    f'|> range(start: -{auto_refresh}s, stop: now()) '
                                    f'{query}')) \
        .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
        .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
        .subscribe(on_next=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
    pass

In [None]:
client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org')

In [None]:
cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \
            '|> filter(fn: (r) => r._field == "usage_user") ' \
            '|> filter(fn: (r) => r.cpu == "cpu-total") ' \
            '|> keep(columns: ["_time", "_value"])'


cpu_sink = Stream()
cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])
cpu_df = DataFrame(cpu_sink, example=cpu_example)

source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)

In [None]:
mem_query = '|> filter(fn: (r) => r._measurement == "mem") ' \
            '|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "total" or r._field == "used") ' \
            '|> map(fn: (r) => ({ r with _value: r._value / 1024 / 1024 }))' \
            '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
            '|> keep(columns: ["_time", "used", "total", "free", "available"])'

mem_sink = Stream()
mem_example = pd.DataFrame({'used': [], 'total': [], 'free': [], 'available': []}, columns=['available', 'free', 'total', 'used'])
mem_df = DataFrame(mem_sink, example=mem_example)

source_data(auto_refresh=5, sink=mem_sink, query=mem_query)

In [None]:
from bokeh.models.formatters import DatetimeTickFormatter

# Time formatter
formatter = DatetimeTickFormatter(
    microseconds = ["%H:%M:%S"],
    milliseconds = ["%H:%M:%S"],
    seconds = ["%H:%M:%S"],
    minsec = ["%H:%M:%S"],
    minutes = ["%H:%M:%S"],
    hourmin = ["%H:%M:%S"],
    hours=["%H:%M:%S"],
    days=["%H:%M:%S"],
    months=["%H:%M:%S"],
    years=["%H:%M:%S"],
)

cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter) +\
mem_df.hvplot.line(width=450, backlog=50, title='Memory', xlabel='Time', ylabel='MiB', xformatter=formatter, legend='top_left')