1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
import time
import threading
import socket
import zlib
import sys
import serpent
from Pyro5.api import Proxy, current_context
def regular_pyro(uri):
blobsize = 10*1024*1024
num_blobs = 10
total_size = 0
start = time.time()
name = threading.current_thread().name
with Proxy(uri) as p:
for _ in range(num_blobs):
print("thread {0} getting a blob using regular Pyro call...".format(name))
data = p.get_with_pyro(blobsize)
data = serpent.tobytes(data) # in case of serpent encoded bytes
total_size += len(data)
assert total_size == blobsize*num_blobs
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def via_iterator(uri):
blobsize = 10*1024*1024
num_blobs = 10
total_size = 0
start = time.time()
name = threading.current_thread().name
with Proxy(uri) as p:
for _ in range(num_blobs):
print("thread {0} getting a blob using remote iterators...".format(name))
for chunk in p.iterator(blobsize):
chunk = serpent.tobytes(chunk) # in case of serpent encoded bytes
total_size += len(chunk)
assert total_size == blobsize*num_blobs
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def via_annotation_stream(uri):
name = threading.current_thread().name
start = time.time()
total_size = 0
print("thread {0} downloading via annotation stream...".format(name))
with Proxy(uri) as p:
perform_checksum = False
for progress, checksum in p.annotation_stream(perform_checksum):
chunk = current_context.response_annotations["FDAT"]
if perform_checksum and zlib.crc32(chunk) != checksum:
raise ValueError("checksum error")
total_size += len(chunk)
assert progress == total_size
current_context.response_annotations.clear() # clean them up once we're done with them
duration = time.time() - start
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
def raw_socket(uri):
blobsize = 40*1024*1024
num_blobs = 10
total_size = 0
name = threading.current_thread().name
with Proxy(uri) as p:
print("thread {0} preparing {1} blobs of size {2} Mb".format(name, num_blobs, blobsize/1024.0/1024.0))
blobs = {}
for _ in range(num_blobs):
file_id, blob_address = p.prepare_file_blob(blobsize)
blobs[file_id] = blob_address
start = time.time()
for file_id in blobs:
print("thread {0} retrieving blob using raw socket...".format(name))
blob_address = blobs[file_id]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(tuple(blob_address))
sock.sendall(file_id.encode())
size = 0
chunk = b"dummy"
while chunk:
chunk = sock.recv(60000)
size += len(chunk)
sock.close()
assert size == blobsize
total_size += size
duration = time.time() - start
assert total_size == blobsize * num_blobs
print("thread {0} done, {1:.2f} Mb/sec.".format(name, total_size/1024.0/1024.0/duration))
if __name__ == "__main__":
uri = input("Uri of filetransfer server? ").strip()
print("\n\n**** regular pyro calls ****\n")
t1 = threading.Thread(target=regular_pyro, args=(uri, ))
t2 = threading.Thread(target=regular_pyro, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** transfer via iterators ****\n")
t1 = threading.Thread(target=via_iterator, args=(uri, ))
t2 = threading.Thread(target=via_iterator, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** transfer via annotation stream ****\n")
t1 = threading.Thread(target=via_annotation_stream, args=(uri, ))
t2 = threading.Thread(target=via_annotation_stream, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to continue:")
print("\n\n**** raw socket transfers ****\n")
t1 = threading.Thread(target=raw_socket, args=(uri, ))
t2 = threading.Thread(target=raw_socket, args=(uri, ))
t1.start()
t2.start()
t1.join()
t2.join()
input("enter to exit:")
|