File: storageTest.py

package info (click to toggle)
python-soappy 0.11.3-1.7
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 908 kB
  • ctags: 1,617
  • sloc: python: 12,660; makefile: 9
file content (123 lines) | stat: -rwxr-xr-x 4,012 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python

ident = '$Id: storageTest.py,v 1.5 2003/12/18 06:31:50 warnes Exp $'

import sys, os, time, signal, re
sys.path.insert(1, "..")
from SOAPpy import SOAPProxy, SOAPConfig, SOAPUserAgent

# Check for a web proxy definition in environment
try:
   proxy_url=os.environ['http_proxy']
   phost, pport = re.search('http://([^:]+):([0-9]+)', proxy_url).group(1,2)
   http_proxy = "%s:%s" % (phost, pport)
except:
   http_proxy = None


PROXY="http://www.soapware.org/xmlStorageSystem"
EMAIL="SOAPpy@actzero.com"
NAME="test_user"
PASSWORD="mypasswd"

MY_PORT=15600

def resourceChanged (url):
    print "\n##### NOTIFICATION MESSAGE: Resource %s has changed #####\n" % url
    return booleanType(1)

def printstatus (cmd, stat):
    print
    if stat.flError:
    	print "### %s failed: %s ###" % (cmd, stat.message)
    else:
        print "### %s successful: %s ###" % (cmd, stat.message)
    return not stat.flError

server = SOAPProxy(encoding="US-ASCII", 
                   proxy=PROXY,
                   soapaction="/xmlStorageSystem",
                   http_proxy=http_proxy,
#                   config=SOAPConfig(debug=1)
                   )

# Register as a new user or update user information
reg = server.registerUser(email=EMAIL, name=NAME, password=PASSWORD, clientPort=MY_PORT, userAgent=SOAPUserAgent())
printstatus("registerUser", reg)

# See what this server can do
reg = server.getServerCapabilities (email=EMAIL, password=PASSWORD)
if printstatus("getServerCapabilities", reg):
    print "Legal file extensions: " + str(reg.legalFileExtensions)
    print "Maximum file size: " + str(reg.maxFileSize)
    print "Maximum bytes per user: " + str(reg.maxBytesPerUser)
    print "Number of bytes in use by the indicated user: " + str(reg.ctBytesInUse)
    print "URL of the folder containing your files: " + str(reg.yourUpstreamFolderUrl)

# Store some files
reg = server.saveMultipleFiles (email=EMAIL, password=PASSWORD, 
	relativepathList=['index.html','again.html'], 
	fileTextList=['<html><title>bennett@actzero.com home page</title><body>' + 
			'<a href=again.html>Hello Earth</a></body></html>',
			'<html><title>bennett@actzero.com home page</title><body>' + 
			'<a href=index.html>Hello Earth Again</a></body></html>'])
if printstatus("saveMultipleFiles", reg):
    print "Files stored:"
    for file in reg.urlList:
    	print "    %s" % file

    # Save this for call to test pleaseNotify
    mylist = reg.urlList
else:
    mylist = []

# Check to see what files are stored
reg = server.getMyDirectory (email=EMAIL, password=PASSWORD)
if printstatus("getMyDirectory", reg):
    i = 1
    while hasattr(reg.directory, "file%05d" % i):
    	d = getattr(reg.directory, "file%05d" % i)
	print "Relative Path: %s" % d.relativePath
	print "Size: %d" % d.size
	print "Created: %s" % d.whenCreated
	print "Last Uploaded: %s" % d.whenLastUploaded
	print "URL: %s" % d.url
	print
	i += 1

# Set up notification
reg = server.pleaseNotify(notifyProcedure="resourceChanged", port=MY_PORT, path="/", protocol="soap", urlList=mylist)
printstatus("notifyProcedure", reg)

pid = os.fork()
if pid == 0:
    # I am a child process.  Set up SOAP server to receive notification
    print
    print "## Starting notification server ##"

    s = SOAPServer(('localhost', MY_PORT))
    s.registerFunction(resourceChanged)
    s.serve_forever()

else:

    def handler(signum, frame):
	# Kill child process
	print "Killing child process %d" % pid
    	os.kill(pid, signal.SIGINT)

    signal.signal(signal.SIGINT, handler)

    # I am a parent process
    # Change some files
    time.sleep(3)
    reg = server.saveMultipleFiles (email=EMAIL, password=PASSWORD, 
		relativepathList=['index.html'], 
		fileTextList=['<html><title>bennett@actzero.com home page</title><body>' + 
			'<a href=again.html>Hello Bennett</a></body></html>'])
    if printstatus("saveMultipleFiles", reg):
    	print "Files stored:"
    	for file in reg.urlList:
    	    print "    %s" % file

    os.waitpid(pid, 0)