File: fakessh_test.py

package info (click to toggle)
python-mitogen 0.3.26-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,456 kB
  • sloc: python: 22,134; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (58 lines) | stat: -rw-r--r-- 1,894 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import shutil
import unittest

import mitogen.fakessh

import testlib


@unittest.skip('broken')
class RsyncTest(testlib.DockerMixin, testlib.TestCase):
    def test_rsync_from_master(self):
        context = self.docker_ssh_any()

        if context.call(os.path.exists, '/tmp/data'):
            context.call(shutil.rmtree, '/tmp/data')

        return_code = mitogen.fakessh.run(context, self.router, [
            'rsync', '--progress', '-vvva',
            testlib.data_path('.'), 'target:/tmp/data'
        ])

        self.assertEqual(return_code, 0)
        self.assertTrue(context.call(os.path.exists, '/tmp/data'))
        self.assertTrue(context.call(os.path.exists, '/tmp/data/simple_pkg/a.py'))

    def test_rsync_between_direct_children(self):
        # master -> SSH -> mitogen__has_sudo_pubkey -> rsync(.ssh) -> master ->
        # mitogen__has_sudo -> rsync

        pubkey_acct = self.docker_ssh(
            username='mitogen__has_sudo_pubkey',
            identity_file=testlib.data_path('docker/mitogen__has_sudo_pubkey.key'),
        )

        nopw_acct = self.docker_ssh(
            username='mitogen__has_sudo_nopw',
            password='has_sudo_nopw_password',
        )

        webapp_acct = self.router.sudo(
            via=nopw_acct,
            username='webapp',
        )

        dest_path = webapp_acct.call(os.path.expanduser, '~/.ssh')
        if webapp_acct.call(os.path.exists, dest_path):
            webapp_acct.call(shutil.rmtree, dest_path)

        return_code = pubkey_acct.call(mitogen.fakessh.run, webapp_acct, args=[
            'rsync', '--progress', '-vvva', '.ssh/', 'target:' + dest_path
        ])

        self.assertEqual(return_code, 0)
        self.assertEqual(
            pubkey_acct.call(os.path.getsize, '.ssh/authorized_keys'),
            webapp_acct.call(os.path.getsize, dest_path + '/authorized_keys'),
        )