1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
"""
A bucket is a named location where time series data is stored.
All buckets have a retention policy, a duration of time that each data point persists.
A bucket belongs to an organization.
"""
import warnings
from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param
class BucketsApi(object):
"""Implementation for '/api/v2/buckets' endpoint."""
def __init__(self, influxdb_client):
"""Initialize defaults."""
self._influxdb_client = influxdb_client
self._buckets_service = BucketsService(influxdb_client.api_client)
def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None,
description=None, org=None) -> Bucket:
"""Create a bucket.
:param Bucket|PostBucketRequest bucket: bucket to create
:param bucket_name: bucket name
:param description: bucket description
:param org_id: org_id
:param bucket_name: bucket name
:param retention_rules: retention rules array or single BucketRetentionRules
:param str, Organization org: specifies the organization for create the bucket;
Take the ``ID``, ``Name`` or ``Organization``.
If not specified the default value from ``InfluxDBClient.org`` is used.
:return: Bucket
If the method is called asynchronously,
returns the request thread.
"""
if retention_rules is None:
retention_rules = []
rules = []
if isinstance(retention_rules, list):
rules.extend(retention_rules)
else:
rules.append(retention_rules)
if org_id is not None:
warnings.warn("org_id is deprecated; use org", DeprecationWarning)
if bucket is None:
bucket = PostBucketRequest(name=bucket_name,
retention_rules=rules,
description=description,
org_id=get_org_query_param(org=(org_id if org is None else org),
client=self._influxdb_client,
required_id=True))
return self._buckets_service.post_buckets(post_bucket_request=bucket)
def update_bucket(self, bucket: Bucket) -> Bucket:
"""Update a bucket.
:param bucket: Bucket update to apply (required)
:return: Bucket
"""
request = PatchBucketRequest(name=bucket.name,
description=bucket.description,
retention_rules=bucket.retention_rules)
return self._buckets_service.patch_buckets_id(bucket_id=bucket.id, patch_bucket_request=request)
def delete_bucket(self, bucket):
"""Delete a bucket.
:param bucket: bucket id or Bucket
:return: Bucket
"""
if isinstance(bucket, Bucket):
bucket_id = bucket.id
else:
bucket_id = bucket
return self._buckets_service.delete_buckets_id(bucket_id=bucket_id)
def find_bucket_by_id(self, id):
"""Find bucket by ID.
:param id:
:return:
"""
return self._buckets_service.get_buckets_id(id)
def find_bucket_by_name(self, bucket_name):
"""Find bucket by name.
:param bucket_name: bucket name
:return: Bucket
"""
buckets = self._buckets_service.get_buckets(name=bucket_name)
if len(buckets.buckets) > 0:
return buckets.buckets[0]
else:
return None
def find_buckets(self, **kwargs):
"""List buckets.
:key int offset: Offset for pagination
:key int limit: Limit for pagination
:key str after: The last resource ID from which to seek from (but not including).
This is to be used instead of `offset`.
:key str org: The organization name.
:key str org_id: The organization ID.
:key str name: Only returns buckets with a specific name.
:return: Buckets
"""
return self._buckets_service.get_buckets(**kwargs)
|