1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
"""
Store Catalog - Lookup Product Information
"""
from typing import List
from xbox.webapi.api.provider.baseprovider import BaseProvider
from xbox.webapi.api.provider.catalog.models import (
AlternateIdType,
CatalogResponse,
CatalogSearchResponse,
FieldsTemplate,
PlatformType,
)
class CatalogProvider(BaseProvider):
CATALOG_URL = "https://displaycatalog.mp.microsoft.com"
SEPERATOR = ","
async def get_products(
self,
big_ids: List[str],
fields: FieldsTemplate = FieldsTemplate.DETAILS,
**kwargs,
) -> CatalogResponse:
"""Lookup product by Big IDs."""
ids = self.SEPERATOR.join(big_ids)
params = {
"actionFilter": "Browse",
"bigIds": ids,
"fieldsTemplate": fields.value,
"languages": self.client.language.locale,
"market": self.client.language.short_id,
}
url = f"{self.CATALOG_URL}/v7.0/products"
resp = await self.client.session.get(
url, params=params, include_auth=False, **kwargs
)
resp.raise_for_status()
return CatalogResponse(**resp.json())
async def get_product_from_alternate_id(
self,
id: str,
id_type: AlternateIdType,
fields: FieldsTemplate = FieldsTemplate.DETAILS,
top: int = 25,
**kwargs,
) -> CatalogResponse:
"""Lookup product by Alternate ID."""
params = {
"top": top,
"alternateId": id_type.value,
"fieldsTemplate": fields.value,
"languages": self.client.language.locale,
"market": self.client.language.short_id,
"value": id,
}
url = f"{self.CATALOG_URL}/v7.0/products/lookup"
resp = await self.client.session.get(
url, params=params, include_auth=False, **kwargs
)
resp.raise_for_status()
return CatalogResponse(**resp.json())
async def product_search(
self,
query: str,
platform: PlatformType = PlatformType.XBOX,
top: int = 5,
**kwargs,
) -> CatalogSearchResponse:
"""Search for products by name."""
params = {
"languages": self.client.language.locale,
"market": self.client.language.short_id,
"platformdependencyname": platform.value,
"productFamilyNames": "Games,Apps",
"query": query,
"topProducts": top,
}
url = f"{self.CATALOG_URL}/v7.0/productFamilies/autosuggest"
resp = await self.client.session.get(
url, params=params, include_auth=False, **kwargs
)
resp.raise_for_status()
return CatalogSearchResponse(**resp.json())
|