1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
from __future__ import print_function
import time
import threading
import sys
import socket
import zlib
import Pyro4
import serpent
if sys.version_info < (3, 0):
input = raw_input
def regular_pyro(uri):
blobsize = 10*1024*1024
num_blobs = 10
total_size = 0
start = time.time()
name = threading.currentThread().name
with Pyro4.core.Proxy(uri) as p:
for _ in range(num_blobs):
print("thread {0} getting a blob using regular Pyro call...".format(name))
data = p.get_with_pyro(blobsize)
data = serpent.tobytes(data) # in case of serpent encoded bytes
total_size += len(data)
assert total_size == blobsize*num_blobs
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def via_iterator(uri):
blobsize = 10*1024*1024
num_blobs = 10
total_size = 0
start = time.time()
name = threading.currentThread().name
with Pyro4.core.Proxy(uri) as p:
for _ in range(num_blobs):
print("thread {0} getting a blob using remote iterators...".format(name))
for chunk in p.iterator(blobsize):
chunk = serpent.tobytes(chunk) # in case of serpent encoded bytes
total_size += len(chunk)
assert total_size == blobsize*num_blobs
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def via_annotation_stream(uri):
name = threading.currentThread().name
start = time.time()
total_size = 0
print("thread {0} downloading via annotation stream...".format(name))
with Pyro4.core.Proxy(uri) as p:
perform_checksum = False
for progress, checksum in p.annotation_stream(perform_checksum):
chunk = Pyro4.current_context.response_annotations["FDAT"]
if perform_checksum and zlib.crc32(chunk) != checksum:
raise ValueError("checksum error")
total_size += len(chunk)
assert progress == total_size
Pyro4.current_context.response_annotations.clear() # clean them up once we're done with them
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def raw_socket(uri):
blobsize = 40*1024*1024
num_blobs = 10
total_size = 0
name = threading.currentThread().name
with Pyro4.core.Proxy(uri) as p:
print("thread {0} preparing {1} blobs of size {2} Mb".format(name, num_blobs, blobsize/1024.0/1024.0))
blobs = {}
for _ in range(num_blobs):
file_id, blob_address = p.prepare_file_blob(blobsize)
blobs[file_id] = blob_address
start = time.time()
for file_id in blobs:
print("thread {0} retrieving blob using raw socket...".format(name))
blob_address = blobs[file_id]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(tuple(blob_address))
sock.sendall(file_id.encode())
size = 0
chunk = "dummy"
while chunk:
chunk = sock.recv(60000)
size += len(chunk)
sock.close()
assert size == blobsize
total_size += size
duration = time.time() - start
assert total_size == blobsize * num_blobs
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
if __name__ == "__main__":
uri = input("Uri of filetransfer server? ").strip()
print("\n\n**** regular pyro calls ****\n")
t1 = threading.Thread(target=regular_pyro, args=(uri, ))
t2 = threading.Thread(target=regular_pyro, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** transfer via iterators ****\n")
t1 = threading.Thread(target=via_iterator, args=(uri, ))
t2 = threading.Thread(target=via_iterator, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** transfer via annotation stream ****\n")
t1 = threading.Thread(target=via_annotation_stream, args=(uri, ))
t2 = threading.Thread(target=via_annotation_stream, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** raw socket transfers ****\n")
t1 = threading.Thread(target=raw_socket, args=(uri, ))
t2 = threading.Thread(target=raw_socket, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to exit:")
|