1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
import datetime
import boto3
from moto import mock_aws
@mock_aws
def test_create_node_from_template_job() -> None:
# Given
panorama_client = boto3.client("panorama", "us-east-1")
# When
response = panorama_client.create_node_from_template_job(
JobTags=[
{"ResourceType": "PACKAGE", "Tags": {"key": "value"}},
],
NodeDescription="a description",
NodeName="a-camera-node",
OutputPackageName="an-output-package-name",
OutputPackageVersion="1.0",
TemplateParameters={
"Username": "a-user-name",
"Password": "a-password",
"StreamUrl": "rtsp://",
},
TemplateType="RTSP_CAMERA_STREAM",
)
# Then
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert "JobId" in response
assert isinstance(response["JobId"], str)
@mock_aws
def test_describe_node_from_template_job() -> None:
# Given
panorama_client = boto3.client("panorama", "us-east-1")
# When
response = panorama_client.create_node_from_template_job(
JobTags=[
{"ResourceType": "PACKAGE", "Tags": {"key": "value"}},
],
NodeDescription="a description",
NodeName="not-a-camera-node",
OutputPackageName="not-an-output-package-name",
OutputPackageVersion="1.0",
TemplateParameters={
"Username": "not-a-user-name",
"Password": "not-a-password",
"StreamUrl": "rtsp://192.168.0.201/1",
},
TemplateType="RTSP_CAMERA_STREAM",
)
describe_response = panorama_client.describe_node_from_template_job(
JobId=response["JobId"]
)
# Then
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert "JobId" in response
assert isinstance(response["JobId"], str)
assert isinstance(describe_response["CreatedTime"], datetime.datetime)
assert describe_response["JobId"] == response["JobId"]
assert describe_response["JobTags"] == [
{"ResourceType": "PACKAGE", "Tags": {"key": "value"}}
]
assert isinstance(describe_response["LastUpdatedTime"], datetime.datetime)
# Weird, seems to be a bug in panorama
assert describe_response["NodeDescription"] == "not-a-camera-node"
assert describe_response["NodeName"] == "not-a-camera-node"
assert describe_response["OutputPackageName"] == "not-an-output-package-name"
assert describe_response["OutputPackageVersion"] == "1.0"
assert describe_response["Status"] == "PENDING"
assert describe_response["TemplateParameters"] == {
"Username": "SAVED_AS_SECRET",
"Password": "SAVED_AS_SECRET",
"StreamUrl": "rtsp://192.168.0.201/1",
}
assert describe_response["TemplateType"] == "RTSP_CAMERA_STREAM"
@mock_aws
def test_list_nodes() -> None:
# Given
panorama_client = boto3.client("panorama", "us-east-1")
panorama_client.create_node_from_template_job(
JobTags=[
{"ResourceType": "PACKAGE", "Tags": {"key": "value"}},
],
NodeDescription="a description",
NodeName="not-a-camera-node",
OutputPackageName="not-an-output-package-name",
OutputPackageVersion="1.0",
TemplateParameters={
"Username": "not-a-user-name",
"Password": "not-a-password",
"StreamUrl": "rtsp://",
},
TemplateType="RTSP_CAMERA_STREAM",
)
panorama_client.create_node_from_template_job(
JobTags=[
{"ResourceType": "PACKAGE", "Tags": {"key": "value"}},
],
NodeDescription="a description",
NodeName="not-another-camera-node",
OutputPackageName="not-another-output-package-name",
OutputPackageVersion="1.0",
TemplateParameters={
"Username": "not-another-user-name",
"Password": "not-another-password",
"StreamUrl": "rtsp://",
},
TemplateType="RTSP_CAMERA_STREAM",
)
given_category = "MEDIA_SOURCE"
# When
response_page_1 = panorama_client.list_nodes(Category=given_category, MaxResults=1)
response_page_2 = panorama_client.list_nodes(
Category=given_category, MaxResults=1, NextToken=response_page_1["NextToken"]
)
# Then
assert response_page_1["ResponseMetadata"]["HTTPStatusCode"] == 200
assert "Nodes" in response_page_1
assert isinstance(response_page_1["Nodes"], list)
assert len(response_page_1["Nodes"]) == 1
assert "NextToken" in response_page_1
assert isinstance(response_page_1["NextToken"], str)
node = response_page_1["Nodes"][0]
assert node["Category"] == given_category
assert isinstance(node["CreatedTime"], datetime.datetime)
assert node["Description"] == "a description"
assert node["Name"] == "not-a-camera-node"
assert node["OwnerAccount"] == "123456789012"
assert (
node["PackageArn"]
== "arn:aws:panorama:us-east-1:123456789012:package/package--Y6-ycqU-ogoBTHuibMzGg"
)
assert node["PackageId"] == "package--Y6-ycqU-ogoBTHuibMzGg"
assert node["PackageName"] == "not-an-output-package-name"
assert node["PackageVersion"] == "1.0"
assert (
node["PatchVersion"]
== "y6ycquogobthuibmzggy6ycquogobthuibmzggy6ycquogobthuibmzggy6ycquo"
)
assert response_page_2["ResponseMetadata"]["HTTPStatusCode"] == 200
assert "Nodes" in response_page_2
assert isinstance(response_page_2["Nodes"], list)
assert len(response_page_2["Nodes"]) == 1
assert "NextToken" not in response_page_2
|