File: abs.py

package info (click to toggle)
python-papermill 2.6.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,220 kB
  • sloc: python: 4,977; makefile: 17; sh: 5
file content (69 lines) | stat: -rw-r--r-- 2,615 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""Utilities for working with Azure blob storage"""
import io
import re

from azure.identity import EnvironmentCredential
from azure.storage.blob import BlobServiceClient


class AzureBlobStore:
    """
    Represents a Blob of storage on Azure

    Methods
    -------
    The following are wrapped utilities for Azure storage:
        - read
        - listdir
        - write
    """

    def _blob_service_client(self, account_name, sas_token=None):
        blob_service_client = BlobServiceClient(
            account_url=f"{account_name}.blob.core.windows.net",
            credential=sas_token or EnvironmentCredential(),
        )

        return blob_service_client

    @classmethod
    def _split_url(self, url):
        """
        see: https://docs.microsoft.com/en-us/azure/storage/common/storage-dotnet-shared-access-signature-part-1
        abs://myaccount.blob.core.windows.net/sascontainer/sasblob.txt?sastoken
        """
        match = re.match(r"abs://(.*)\.blob\.core\.windows\.net\/(.*?)\/([^\?]*)\??(.*)$", url)
        if not match:
            raise Exception(f"Invalid azure blob url '{url}'")
        else:
            params = {
                "account": match.group(1),
                "container": match.group(2),
                "blob": match.group(3),
                "sas_token": match.group(4),
            }
            return params

    def read(self, url):
        """Read storage at a given url"""
        params = self._split_url(url)
        output_stream = io.BytesIO()
        blob_service_client = self._blob_service_client(params["account"], params["sas_token"])
        blob_client = blob_service_client.get_blob_client(params['container'], params['blob'])
        blob_client.download_blob().readinto(output_stream)
        output_stream.seek(0)
        return [line.decode("utf-8") for line in output_stream]

    def listdir(self, url):
        """Returns a list of the files under the specified path"""
        params = self._split_url(url)
        blob_service_client = self._blob_service_client(params["account"], params["sas_token"])
        container_client = blob_service_client.get_container_client(params["container"])
        return list(container_client.list_blobs(params["blob"]))

    def write(self, buf, url):
        """Write buffer to storage at a given url"""
        params = self._split_url(url)
        blob_service_client = self._blob_service_client(params["account"], params["sas_token"])
        blob_client = blob_service_client.get_blob_client(params['container'], params['blob'])
        blob_client.upload_blob(data=buf, overwrite=True)