1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
"""
CQS
Used for download stump (TV Streaming) data
(RemoteTVInput ServiceChannel on Smartglass)
"""
from xbox.webapi.api.provider.baseprovider import BaseProvider
from xbox.webapi.api.provider.cqs.models import (
CqsChannelListResponse,
CqsScheduleResponse,
)
class CQSProvider(BaseProvider):
CQS_URL = "https://cqs.xboxlive.com"
HEADERS_CQS = {
"Cache-Control": "no-cache",
"Accept": "application/json",
"Pragma": "no-cache",
"x-xbl-client-type": "Companion",
"x-xbl-client-version": "2.0",
"x-xbl-contract-version": "1.b",
"x-xbl-device-type": "WindowsPhone",
"x-xbl-isautomated-client": "true",
}
async def get_channel_list(
self, locale_info: str, headend_id: str, **kwargs
) -> CqsChannelListResponse:
"""
Get stump channel list
Args:
locale_info: Locale string (format: "en-US")
headend_id: Headend id
Returns:
:class:`CqsChannelListResponse`: Channel List Response
"""
url = self.CQS_URL + f"/epg/{locale_info}/lineups/{headend_id}/channels"
params = {"desired": "vesper_mobile_lineup"}
resp = await self.client.session.get(
url, params=params, headers=self.HEADERS_CQS, **kwargs
)
resp.raise_for_status()
return CqsChannelListResponse(**resp.json())
async def get_schedule(
self,
locale_info: str,
headend_id: str,
start_date: str,
duration_minutes: int,
channel_skip: int,
channel_count: int,
**kwargs,
) -> CqsScheduleResponse:
"""
Get stump epg data
Args:
locale_info: Locale string (format: "en-US")
headend_id: Headend id
start_date: Start date (format: 2016-07-11T21:50:00.000Z)
duration_minutes: Schedule duration to download
channel_skip: Count of channels to skip
channel_count: Count of channels to get data for
Returns:
:class:`CqsScheduleResponse`: Schedule Response
"""
url = self.CQS_URL + f"/epg/{locale_info}/lineups/{headend_id}/programs"
params = {
"startDate": start_date,
"durationMinutes": duration_minutes,
"channelSkip": channel_skip,
"channelCount": channel_count,
"desired": "vesper_mobile_schedule",
}
resp = await self.client.session.get(
url, params=params, headers=self.HEADERS_CQS, **kwargs
)
resp.raise_for_status()
return CqsScheduleResponse(**resp.json())
|