File: bucket_api.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (119 lines) | stat: -rw-r--r-- 4,394 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
A bucket is a named location where time series data is stored.

All buckets have a retention policy, a duration of time that each data point persists.
A bucket belongs to an organization.
"""
import warnings

from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param


class BucketsApi(object):
    """Implementation for '/api/v2/buckets' endpoint."""

    def __init__(self, influxdb_client):
        """Initialize defaults."""
        self._influxdb_client = influxdb_client
        self._buckets_service = BucketsService(influxdb_client.api_client)

    def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None,
                      description=None, org=None) -> Bucket:
        """Create a bucket.

        :param Bucket|PostBucketRequest bucket: bucket to create
        :param bucket_name: bucket name
        :param description: bucket description
        :param org_id: org_id
        :param bucket_name: bucket name
        :param retention_rules: retention rules array or single BucketRetentionRules
        :param str, Organization org: specifies the organization for create the bucket;
                                      Take the ``ID``, ``Name`` or ``Organization``.
                                      If not specified the default value from ``InfluxDBClient.org`` is used.
        :return: Bucket
                 If the method is called asynchronously,
                 returns the request thread.
        """
        if retention_rules is None:
            retention_rules = []

        rules = []

        if isinstance(retention_rules, list):
            rules.extend(retention_rules)
        else:
            rules.append(retention_rules)

        if org_id is not None:
            warnings.warn("org_id is deprecated; use org", DeprecationWarning)

        if bucket is None:
            bucket = PostBucketRequest(name=bucket_name,
                                       retention_rules=rules,
                                       description=description,
                                       org_id=get_org_query_param(org=(org_id if org is None else org),
                                                                  client=self._influxdb_client,
                                                                  required_id=True))

        return self._buckets_service.post_buckets(post_bucket_request=bucket)

    def update_bucket(self, bucket: Bucket) -> Bucket:
        """Update a bucket.

        :param bucket: Bucket update to apply (required)
        :return: Bucket
        """
        request = PatchBucketRequest(name=bucket.name,
                                     description=bucket.description,
                                     retention_rules=bucket.retention_rules)

        return self._buckets_service.patch_buckets_id(bucket_id=bucket.id, patch_bucket_request=request)

    def delete_bucket(self, bucket):
        """Delete a bucket.

        :param bucket: bucket id or Bucket
        :return: Bucket
        """
        if isinstance(bucket, Bucket):
            bucket_id = bucket.id
        else:
            bucket_id = bucket

        return self._buckets_service.delete_buckets_id(bucket_id=bucket_id)

    def find_bucket_by_id(self, id):
        """Find bucket by ID.

        :param id:
        :return:
        """
        return self._buckets_service.get_buckets_id(id)

    def find_bucket_by_name(self, bucket_name):
        """Find bucket by name.

        :param bucket_name: bucket name
        :return: Bucket
        """
        buckets = self._buckets_service.get_buckets(name=bucket_name)

        if len(buckets.buckets) > 0:
            return buckets.buckets[0]
        else:
            return None

    def find_buckets(self, **kwargs):
        """List buckets.

        :key int offset: Offset for pagination
        :key int limit: Limit for pagination
        :key str after: The last resource ID from which to seek from (but not including).
                        This is to be used instead of `offset`.
        :key str org: The organization name.
        :key str org_id: The organization ID.
        :key str name: Only returns buckets with a specific name.
        :return: Buckets
        """
        return self._buckets_service.get_buckets(**kwargs)