1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
import unittest
from unittest import mock
from ...management.log_streams import LogStreams
class TestLogStreams(unittest.TestCase):
def test_init_with_optionals(self):
t = LogStreams(
domain="domain", token="jwttoken", telemetry=False, timeout=(10, 2)
)
self.assertEqual(t.client.options.timeout, (10, 2))
telemetry_header = t.client.base_headers.get("Auth0-Client", None)
self.assertEqual(telemetry_header, None)
@mock.patch("auth0.management.log_streams.RestClient")
def test_list(self, mock_rc):
mock_instance = mock_rc.return_value
c = LogStreams(domain="domain", token="jwttoken")
c.list()
args, kwargs = mock_instance.get.call_args
self.assertEqual("https://domain/api/v2/log-streams", args[0])
@mock.patch("auth0.management.log_streams.RestClient")
def test_get(self, mock_rc):
mock_instance = mock_rc.return_value
c = LogStreams(domain="domain", token="jwttoken")
c.get("an-id")
args, kwargs = mock_instance.get.call_args
self.assertEqual("https://domain/api/v2/log-streams/an-id", args[0])
@mock.patch("auth0.management.log_streams.RestClient")
def test_create(self, mock_rc):
mock_instance = mock_rc.return_value
c = LogStreams(domain="domain", token="jwttoken")
# Sample data belongs to an `http` stream
log_stream_data = {
"name": "string",
"type": "http",
"sink": {
"httpEndpoint": "string",
"httpContentType": "string",
"httpContentFormat": "JSONLINES|JSONARRAY",
"httpAuthorization": "string",
},
}
c.create(log_stream_data)
args, kwargs = mock_instance.post.call_args
self.assertEqual("https://domain/api/v2/log-streams", args[0])
self.assertEqual(kwargs["data"], log_stream_data)
@mock.patch("auth0.management.log_streams.RestClient")
def test_delete(self, mock_rc):
mock_instance = mock_rc.return_value
c = LogStreams(domain="domain", token="jwttoken")
c.delete("an-id")
mock_instance.delete.assert_called_with(
"https://domain/api/v2/log-streams/an-id"
)
@mock.patch("auth0.management.log_streams.RestClient")
def test_update(self, mock_rc):
mock_instance = mock_rc.return_value
log_stream_update = {"name": "string"}
c = LogStreams(domain="domain", token="jwttoken")
c.update("an-id", log_stream_update)
args, kwargs = mock_instance.patch.call_args
self.assertEqual("https://domain/api/v2/log-streams/an-id", args[0])
self.assertEqual(kwargs["data"], log_stream_update)
|