File: test_async.py

package info (click to toggle)
mozjs78 78.15.0-7
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 739,892 kB
  • sloc: javascript: 1,344,214; cpp: 1,215,708; python: 526,544; ansic: 433,835; xml: 118,736; sh: 26,176; asm: 16,664; makefile: 11,537; yacc: 4,486; perl: 2,564; ada: 1,681; lex: 1,414; pascal: 1,139; cs: 879; exp: 499; java: 164; ruby: 68; sql: 45; csh: 35; sed: 18; lisp: 2
file content (63 lines) | stat: -rw-r--r-- 2,301 bytes parent folder | download | duplicates (12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from __future__ import division, print_function, absolute_import
import unittest
import datetime
import os

import asyncio

import base
import taskcluster.aio.auth as subjectAsync


@unittest.skipIf(os.environ.get('NO_TESTS_OVER_WIRE'), "Skipping tests over wire")
class TestAuthenticationAsync(base.TCTest):

    def test_async_works_with_permanent_credentials(self):
        """we can call methods which require authentication with valid
        permacreds"""

        loop = asyncio.get_event_loop()

        async def x():
            async with subjectAsync.createSession(loop=loop) as session:
                client = subjectAsync.Auth({
                    'rootUrl': self.real_root_url,
                    'credentials': {
                        'clientId': 'tester',
                        'accessToken': 'no-secret',
                    },
                }, session=session)
                result = await client.testAuthenticate({
                    'clientScopes': ['test:a'],
                    'requiredScopes': ['test:a'],
                })
                self.assertEqual(result, {'scopes': ['test:a'], 'clientId': 'tester'})

        loop.run_until_complete(x())

    def test_async_works_with_temporary_credentials(self):
        """we can call methods which require authentication with temporary
        credentials generated by python client"""
        loop = asyncio.get_event_loop()

        async def x():
            async with subjectAsync.createSession(loop=loop) as session:
                tempCred = subjectAsync.createTemporaryCredentials(
                    'tester',
                    'no-secret',
                    datetime.datetime.utcnow(),
                    datetime.datetime.utcnow() + datetime.timedelta(hours=1),
                    ['test:xyz'],
                )
                client = subjectAsync.Auth({
                    'rootUrl': self.real_root_url,
                    'credentials': tempCred,
                }, session=session)

                result = await client.testAuthenticate({
                    'clientScopes': ['test:*'],
                    'requiredScopes': ['test:xyz'],
                })
                self.assertEqual(result, {'scopes': ['test:xyz'], 'clientId': 'tester'})

        loop.run_until_complete(x())