File: xr-srv.py

package info (click to toggle)
bitpim 1.0.7%2Bdfsg1-3
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 31,384 kB
  • sloc: python: 267,746; cpp: 2,076; perl: 600; ansic: 409; sh: 226; makefile: 152; sed: 1
file content (123 lines) | stat: -rw-r--r-- 3,608 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# M2Crypto
from M2Crypto import DH, SSL
import SimpleXMLRPCServer
import sys
import base64
import xmlrpclib

class AuthenticationChecker:
    realm="XMLRPC"

    def check_auth_headers(self):
        # self.client_address (host, port)
        try:
            cred=self.headers["Authorization"]
        except KeyError:
            return self.auth_failed()

        cred=cred.split()
        if cred[0].lower()!="basic":
            return self.auth_failed()
        
        v=base64.decodestring(cred[1])
        p=v.find(':')
        if p<0:
            return self.auth_failed()

        uname=v[:p]
        pwd=v[p+1:]

        if not self.check_credentials(uname, pwd, self.client_address):
            return self.auth_failed()

        return True

    def check_credentials(self, username, password, address):
        print "check_credentials %s, %s, %s" % (`username`, `password`, `address`)
        return True

    def auth_failed(self):
        self.send_response(401, "Authentication required")
        self.send_header("WWW-Authenticate", "Basic realm="+self.realm)
        self.end_headers()

        return False
        

if sys.version>="2.3": # Python 2.3 - we also supply doc

    
    import DocXMLRPCServer

    class BADocXMLRPCRequestHandler(DocXMLRPCServer.DocXMLRPCRequestHandler, AuthenticationChecker):

        def do_POST(self):
            if not self.check_auth_headers():
                return
            #import pdb
            #pdb.set_trace()
            DocXMLRPCServer.DocXMLRPCRequestHandler.do_POST(self)

        def do_GET(self):
            DocXMLRPCServer.DocXMLRPCRequestHandler.do_GET(self)

        def finish(self):
            # do proper SSL shutdown sequence
            self.request.set_shutdown(SSL.SSL_RECEIVED_SHUTDOWN | SSL.SSL_SENT_SHUTDOWN)
            self.request.close()


    
    class SSLSimpleXMLRPCServer(SSL.SSLServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher, DocXMLRPCServer.XMLRPCDocGenerator):
        def __init__(self, addr, ssl_context ):
            self.logRequests=True
            SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
            SSL.SSLServer.__init__(self, addr, BADocXMLRPCRequestHandler, ssl_context)
            DocXMLRPCServer.XMLRPCDocGenerator.__init__(self)


else: # Python 2.2 - no doc

    class SSLSimpleXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler, AuthenticationChecker):

        def do_POST(self):
            if not self.check_auth_headers():
                return
            SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)


        def finish(self):
            print "my finish called"
            # do proper SSL shutdown sequence
            self.request.set_shutdown(SSL.SSL_RECEIVED_SHUTDOWN | SSL.SSL_SENT_SHUTDOWN)
            self.request.close()

    
    class SSLSimpleXMLRPCServer(SSL.SSLServer):
        def __init__(self, addr, ssl_context ):
            self.logRequests=True
            self.instance=None
            self.funcs={}
            SSL.SSLServer.__init__(self, addr, SSLSimpleXMLRPCRequestHandler, ssl_context)
            
        def register_instance(self, instance):
            self.instance = instance

        def register_function(self, function, name = None):
            if name is None:
                name = function.__name__
            self.funcs[name] = function


context=SSL.Context("sslv23")
context.load_cert("host.pem", "privkey.pem")

server = SSLSimpleXMLRPCServer(("localhost", 1234), context)

def add(a,b):
    "doc for add"
    return a+b

server.register_function(add)

server.serve_forever()