File: python-krbV-test.py

package info (click to toggle)
python-krbv 1.0.90-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, jessie, jessie-kfreebsd, stretch, wheezy
  • size: 1,456 kB
  • ctags: 189
  • sloc: sh: 10,115; ansic: 3,559; python: 326; makefile: 41; awk: 10
file content (202 lines) | stat: -rw-r--r-- 8,067 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/python

# Simple test script to exercise a few code paths in python-krbV
# Mike Bonnet <mikeb@redhat.com>, 2010-07-12

import sys
import optparse
import socket
import select
import krbV

def handle_tcp(opts, sock):
    conn, addr = sock.accept()
    ctx = krbV.default_context()
    sprinc = krbV.Principal(name=opts.principal, context=ctx)
    keytab = krbV.Keytab(name=opts.keytab, context=ctx)
    ac, cprinc = ctx.recvauth(conn, '1.0',
                              options=krbV.AP_OPTS_MUTUAL_REQUIRED,
                              server=sprinc, keytab=keytab)
    print 'Successfully authenticated via tcp: %s' % cprinc.name
    ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
    ac.genaddrs(conn,
                krbV.KRB5_AUTH_CONTEXT_GENERATE_LOCAL_FULL_ADDR|
                krbV.KRB5_AUTH_CONTEXT_GENERATE_REMOTE_FULL_ADDR)
    msg_enc = conn.recv(4096)
    msg = ac.rd_priv(msg_enc)
    print '  Received: %s' % msg
    resp_enc = ac.mk_priv(msg)
    conn.send(resp_enc)
    conn.close()

def handle_udp(opts, sock):
    data, addr = sock.recvfrom(4096)
    ctx = krbV.default_context()
    sprinc = krbV.Principal(name=opts.principal, context=ctx)
    keytab = krbV.Keytab(name=opts.keytab, context=ctx)
    ac = krbV.AuthContext(context=ctx)
    ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
    ac, options, sprinc, ccreds = ctx.rd_req(data, server=sprinc, keytab=keytab,
                                             auth_context=ac,
                                             options=krbV.AP_OPTS_MUTUAL_REQUIRED)
    cprinc = ccreds[2]
    print 'Successfully authenticated via udp: %s' % cprinc.name
    rep = ctx.mk_rep(auth_context=ac)
    sock.sendto(rep, addr)
    msg_enc, addr = sock.recvfrom(4096)
    print 'Using addresses: %s' % str((opts.serveraddr[0], opts.serveraddr[1], addr[0], addr[1]))
    ac.addrs = (opts.serveraddr[0], opts.serveraddr[1], addr[0], addr[1])
    msg = ac.rd_priv(msg_enc)
    print '  Received: %s' % msg
    resp_enc = ac.mk_priv(msg)
    sock.sendto(resp_enc, addr)

def handle_connections(opts, socklist):
    while True:
        try:
            rd, wr, ex = select.select(socklist, [], [], 60)
            for sock in rd:
                if sock.type == socket.SOCK_STREAM:
                    handle_tcp(opts, sock)
                elif sock.type == socket.SOCK_DGRAM:
                    handle_udp(opts, sock)
                else:
                    raise ValueError, 'unknown socket type: %s' % sock.type
        except krbV.Krb5Error, e:
            print >> sys.stderr, 'krbV.Krb5Error:', e
        except socket.timeout:
            pass
        except KeyboardInterrupt:
            break

def server(opts):
    print 'Binding to: %s' % str(opts.serveraddr)
    tcpsock = socket.socket(opts.addr_family, socket.SOCK_STREAM)
    tcpsock.settimeout(15)
    tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    tcpsock.bind(opts.serveraddr)
    tcpsock.listen(5)
    udpsock = socket.socket(opts.addr_family, socket.SOCK_DGRAM)
    udpsock.settimeout(15)
    udpsock.bind(opts.serveraddr)

    try:
        handle_connections(opts, [tcpsock, udpsock])
    finally:
        tcpsock.close()
        udpsock.close()

def tcp_client(opts, conn):
    ctx = krbV.default_context()
    if opts.ccache:
        ccache = krbV.CCache(name='FILE:' + opts.ccache, context=ctx)
    else:
        ccache = ctx.default_ccache()
    cprinc = ccache.principal()
    sprinc = krbV.Principal(name=opts.principal, context=ctx)
    ac = ctx.sendauth(conn, '1.0',
                      options=krbV.AP_OPTS_MUTUAL_REQUIRED,
                      server=sprinc, client=cprinc,
                      ccache=ccache, data='authtest')
    print 'Successfully authenticated via tcp to service: %s' % sprinc.name
    ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
    ac.rcache = ctx.default_rcache()
    ac.genaddrs(conn,
                krbV.KRB5_AUTH_CONTEXT_GENERATE_LOCAL_FULL_ADDR|
                krbV.KRB5_AUTH_CONTEXT_GENERATE_REMOTE_FULL_ADDR)
    enc_msg = ac.mk_priv(opts.message)
    conn.send(enc_msg)
    enc_resp = conn.recv(4096)
    resp = ac.rd_priv(enc_resp)
    if resp == opts.message:
        print '  Exchanging encrypted messages succeeded'
    conn.close()

def gai_error(opts, addrtype, addr, e):
    af = 'IPv4'
    if opts.addr_family == socket.AF_INET6:
        af = 'IPv6'
    print >> sys.stderr, 'error: Could not get %s address for %s hostname %s' % (af, addrtype, addr)
    print >> sys.stderr, e
    sys.exit(1)

def udp_client(opts, sock, addr):
    ctx = krbV.default_context()
    if opts.ccache:
        ccache = krbV.CCache(name='FILE:' + opts.ccache, context=ctx)
    else:
        ccache = ctx.default_ccache()
    cprinc = ccache.principal()
    sprinc = krbV.Principal(name=opts.principal, context=ctx)
    ac = krbV.AuthContext(context=ctx)
    ac.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE|krbV.KRB5_AUTH_CONTEXT_DO_TIME
    ac.rcache = ctx.default_rcache()
    ac, req = ctx.mk_req(server=sprinc, client=cprinc,
                         auth_context=ac, ccache=ccache,
                         options=krbV.AP_OPTS_MUTUAL_REQUIRED)
    sock.sendto(req, addr)
    rep, saddr = sock.recvfrom(4096)
    rep_tup = ctx.rd_rep(rep, auth_context=ac)
    print 'Successfully authenticated via udp to service: %s' % sprinc.name
    try:
        addrinfo = socket.getaddrinfo(socket.gethostname(), sock.getsockname()[1],
                                      opts.addr_family)
        localaddr = addrinfo[0][4]
    except socket.gaierror, e:
        gai_error(opts, 'local', socket.gethostname(), e)
    print 'Using addresses: %s' % str((localaddr[0], localaddr[1], addr[0], addr[1]))
    ac.addrs = (localaddr[0], localaddr[1], addr[0], addr[1])
    msg_enc = ac.mk_priv(opts.message)
    sock.sendto(msg_enc, addr)
    resp_enc, saddr = sock.recvfrom(4096)
    resp = ac.rd_priv(resp_enc)
    if resp == opts.message:
        print '  Exchanging encrypted messages succeeded'

def client(opts):
    print 'Connecting to: %s' % str(opts.serveraddr)

    tcpsock = socket.socket(opts.addr_family, socket.SOCK_STREAM)
    tcpsock.connect(opts.serveraddr)
    tcp_client(opts, tcpsock)

    udpsock = socket.socket(opts.addr_family, socket.SOCK_DGRAM)
    udpsock.settimeout(15)
    udp_client(opts, udpsock, opts.serveraddr)


if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option('-s', '--server', action='store_true', help='Run in server mode')
    parser.add_option('-p', '--port', type='int', default=11234, help='Port to use for running the test.  The server will bind to this port, and the client will connect to it.')
    parser.add_option('-6', '--ipv6', action='store_true', help='Use IPv6')
    parser.add_option('-a', '--address', default='localhost', help='The address to bind the sockets to in server mode, or the host to connect to in client mode')
    parser.add_option('-P', '--principal', help='The server principal')
    parser.add_option('-k', '--keytab', help='Service keytab')
    parser.add_option('-c', '--ccache', help='Location of the credentials cache')
    parser.add_option('-m', '--message', help='Message to encrypt and send from the client to the server', default='Kerberos is working')

    opts, args = parser.parse_args()

    if not opts.principal:
        parser.error('You must specify the server principal')

    if opts.ipv6:
        opts.addr_family = socket.AF_INET6
    else:
        opts.addr_family = socket.AF_INET

    try:
        addrinfo = socket.getaddrinfo(opts.address, opts.port, opts.addr_family)
        opts.serveraddr = addrinfo[0][4]
    except socket.gaierror, e:
        gai_error(opts, 'server', opts.address, e)

    if opts.server:
        if not opts.keytab:
            parser.error('You must specify a keytab in server mode')
        server(opts)
    else:
        if opts.keytab:
            parser.error('You may only specify a keytab in server mode')
        client(opts)