File: cluster.py

package info (click to toggle)
moosefs 4.58.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,644 kB
  • sloc: ansic: 129,204; python: 12,419; sh: 5,874; javascript: 2,273; makefile: 838
file content (440 lines) | stat: -rw-r--r-- 16,573 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
from common.constants import *
from common.utils import *
from common.conn import *
from common.models import *

# # called asynchronously after receiving the answer to the info command from each master server
# # it should decode the answer and built the master server object
# def process_info_answer(host,port,data,length,error):
# 	# print("process_info_answer data: %s\n\n" % data)
# 	version = (0,0,0)
# 	workingstate = STATE_DUMMY
# 	statestr = ""
# 	statecolor = 1
# 	memusage = 0
# 	syscpu = 0
# 	usercpu = 0
# 	lastsuccessfulstore = 0
# 	lastsaveseconds = 0
# 	lastsavestatus = 0
# 	metaversion = 0
# 	exportschecksum = None
# 	metaid = None
# 	lastsavemetaversion = None
# 	lastsavemetachecksum = None
# 	usectime = None
# 	chlogtime = 0
# 	try:
# 		if length==52:
# 			version = (1,4,0)
# 		elif length==60:
# 			version = (1,5,0)
# 		elif length==68 or length==76 or length==101: #(size < 121 => version < 2.0.0)
# 			version = struct.unpack(">HBB",data[:4])
# 		# (size = 121,version >= 2.0.0) (size = 129,version >= 3.0.32) (size = 173,version >= 4.4.0)
# 		elif length==121 or length==129 or length==137 or length==149 or length==173 or length==181 or length==193 or length==205:
# 			offset = 8 if (length>=137 and length!=173) else 0
# 			version = struct.unpack(">HBB",data[:4])
# 			memusage,syscpu,usercpu = struct.unpack(">QQQ",data[4:28])
# 			syscpu/=10000000.0
# 			usercpu/=10000000.0
# 			if length==205:
# 				offset += 4
# 			lastsuccessfulstore,lastsaveseconds,lastsavestatus = struct.unpack(">LLB",data[offset+92:offset+101])
# 			if version>(2,0,14):
# 				lastsaveseconds = lastsaveseconds / 1000.0
# 			workingstate,nextstate,stablestate,sync,leaderip,changetime,metaversion = struct.unpack(">BBBBLLQ",data[offset+101:offset+121])
# 			if length>=129:
# 				exportschecksum = struct.unpack(">Q",data[offset+121:offset+129])[0]
# 			if length>=173:
# 				metaid,lastsavemetaversion,lastsavemetachecksum = struct.unpack(">QQL",data[offset+129:offset+149])
# 			if length==149 or length==193 or length==205:
# 				usectime,chlogtime = struct.unpack(">QL",data[length-12:length])
# 			if workingstate==STATE_MASTERCE and nextstate==STATE_MASTERCE and stablestate==STATE_MASTERCE and sync==STATE_MASTERCE:
# 				statestr = state_name(workingstate) # "MASTER"
# 				statecolor = 0
# 			elif stablestate==0 or workingstate!=nextstate:
# 				statestr = "transition %s -> %s" % (state_name(workingstate),state_name(nextstate))
# 				statecolor = 8
# 			else:
# 				statestr = state_name(workingstate)
# 				statecolor = state_color(workingstate,sync)
# 		else:
# 			version = (0,0,0) # unknown version
# 		# except Exception:
# 		# 	statestr = STATE_STR_BUSY
# 		# 	statecolor = 7
# 	except Exception:
# 		workingstate = STATE_UNREACHABLE
# 		statestr = STATE_STR_UNREACHABLE
# 	ms = MasterServer(host,port,version,workingstate,sync,statestr,statecolor,metaversion,memusage,syscpu,usercpu,lastsuccessfulstore,lastsaveseconds,lastsavestatus,exportschecksum,metaid,lastsavemetaversion,lastsavemetachecksum,usectime,chlogtime)
# 	return ms

# Represents state of master servers in the cluster: leader, elect etc.
class Cluster:
	# possible_leader_ip is a hint for possible leading master IP address (the previously known leader)
	def __init__(self, masterhost, masterport, possible_leader_ip=None):
		self.masterhost = masterhost
		self.masterport = masterport

		# Don't use the following underscored variables directly, use the methods instead
		self._masterconn = None
		self._masterlist = None
		self._masterinfo = None
		self._master_exportschecksum = 0
		self._master_metaid = 0
		self._leader_usectime = None

		self._leaderfound = 0
		self._electfound = 0
		self._usurperfound = 0
		self._followerfound = 0
		self._deputyfound = 0

		# Resolve master hostname to all IP addresses
		self.addresses = []
		for mhost in self.masterhost.replace(';',' ').replace(',',' ').split():
			try:
				for i in socket.getaddrinfo(mhost,self.masterport,socket.AF_INET,socket.SOCK_STREAM,socket.SOL_TCP):
					if i[0]==socket.AF_INET and i[1]==socket.SOCK_STREAM and i[2]==socket.SOL_TCP:
						self.addresses.append(i[4])
						# print("Master address: %s:%u" % i[4])
			except Exception:
				pass

		# self.find_all_masters()
		
		if possible_leader_ip:
			# put possible leader to the beginning of the list
			self.addresses.sort(key=lambda x: x[0]!=possible_leader_ip)

		# Find master servers in the cluster, find them all if there is no hint for possible leading master IP address
		self.find_masters(find_all=(possible_leader_ip==None))

	# Returns True if any master server is found in the cluster
	def anyfound(self):
		return self._leaderfound or self._electfound or self._usurperfound or self._followerfound or self._deputyfound
	
	# Returns True if any leading master server is found in the cluster (leader, elect, usurper, or deputy)
	def leadingfound(self):
		return self._leaderfound or self._electfound or self._usurperfound or self._deputyfound

	# Return the current leading master server (connection)
	def master(self):
		return self._masterconn

	def leaderfound(self):
		return self._leaderfound
	def followerfound(self):
		return self._followerfound
	def electfound(self):
		return self._electfound
	def usurperfound(self):
		return self._usurperfound
	def deputyfound(self):
		return self._deputyfound

	def masterinfo(self):
		return self._masterinfo
	def master_metaid(self):
		return self._master_metaid
	def master_exportschecksum(self):
		return self._master_exportschecksum


	# def find_all_masters(self):
	# 	multiconn = MFSMultiConn(0.5,self.addresses)
	# 	answers=multiconn.command(CLTOMA_INFO,MATOCL_INFO,None,process_info_answer) 



	def find_masters(self, find_all):
		# Don't use the following variables directly, use the methods instead
		self._masterconn = None
		self._masterlist = None
		self._masterinfo = None
		self._master_exportschecksum = 0
		self._master_metaid = 0
		self._leader_usectime = None

		self._leaderfound = 0
		self._electfound = 0
		self._usurperfound = 0
		self._followerfound = 0
		self._deputyfound = 0

		leaderinfo = None
		leader_exportschecksum = None
		leader_metaid = None
		leaderconn = None

		electinfo = None
		elect_exportschecksum = None
		elect_metaid = None
		electconn = None

		usurperinfo = None
		usurper_exportschecksum = None
		usurper_metaid = None
		usurperconn = None

		followerinfo = None
		follower_exportschecksum = None
		follower_metaid = None
		followerconn = None

		if find_all:
			self._masterlist = []

		# find leader
		for mhost,mport in self.addresses:
			conn = None
			version = (0,0,0)
			workingstate = STATE_DUMMY
			sync = 0
			statestr = "???"
			statecolor = 1
			memusage = 0
			syscpu = 0
			usercpu = 0
			lastsuccessfulstore = 0
			lastsaveseconds = 0
			lastsavestatus = 0
			metaversion = 0
			exportschecksum = None
			metaid = None
			lastsavemetaversion = None
			lastsavemetachecksum = None
			usectime = None
			chlogtime = 0
			try:
				conn = MasterConn(mhost,mport)
				try:
					data,length = conn.command(CLTOMA_INFO,MATOCL_INFO)
					if length==52:
						version = (1,4,0)
						conn.set_version(version)
						if self._leaderfound==0:
							leaderconn = conn
							leaderinfo = data
							self._leaderfound = 1
						statestr = "OLD MASTER (LEADER ONLY)"
						statecolor = 0
					elif length==60:
						version = (1,5,0)
						conn.set_version(version)
						if self._leaderfound==0:
							leaderconn = conn
							leaderinfo = data
							self._leaderfound = 1
						statestr = "OLD MASTER (LEADER ONLY)"
						statecolor = 0
					elif length==68 or length==76 or length==101:
						version = struct.unpack(">HBB",data[:4])
						conn.set_version(version)
						if self._leaderfound==0 and version<(1,7,0):
							leaderconn = conn
							leaderinfo = data
							self._leaderfound = 1
						if length==76:
							memusage = struct.unpack(">Q",data[4:12])[0]
						if length==101:
							memusage,syscpu,usercpu = struct.unpack(">QQQ",data[4:28])
							syscpu/=10000000.0
							usercpu/=10000000.0
							lastsuccessfulstore,lastsaveseconds,lastsavestatus = struct.unpack(">LLB",data[92:101])
						if version<(1,7,0):
							statestr = "OLD MASTER (LEADER ONLY)"
							statecolor = 0
						else:
							statestr = "UPGRADE THIS UNIT!"
							statecolor = 2
					elif length==121 or length==129 or length==137 or length==149 or length==173 or length==181 or length==193 or length==205:
						offset = 8 if (length>=137 and length!=173) else 0
						version = struct.unpack(">HBB",data[:4])
						conn.set_version(version)
						memusage,syscpu,usercpu = struct.unpack(">QQQ",data[4:28])
						syscpu/=10000000.0
						usercpu/=10000000.0
						if length==205:
							offset += 4
						lastsuccessfulstore,lastsaveseconds,lastsavestatus = struct.unpack(">LLB",data[offset+92:offset+101])
						if conn.version_at_least(2,0,14):
							lastsaveseconds = lastsaveseconds / 1000.0
						workingstate,nextstate,stablestate,sync,leaderip,changetime,metaversion = struct.unpack(">BBBBLLQ",data[offset+101:offset+121])
						if length>=129:
							exportschecksum = struct.unpack(">Q",data[offset+121:offset+129])[0]
						if length>=173:
							metaid,lastsavemetaversion,lastsavemetachecksum = struct.unpack(">QQL",data[offset+129:offset+149])
						if length==149 or length==193 or length==205:
							usectime,chlogtime = struct.unpack(">QL",data[length-12:length])
						if workingstate==STATE_MASTERCE and nextstate==STATE_MASTERCE and stablestate==STATE_MASTERCE and sync==0xFF:
							if self._leaderfound==0:
								leaderconn = conn
								leaderinfo = data
								self._leaderfound = 1
								leader_exportschecksum = exportschecksum
								leader_metaid = metaid
								self._leader_usectime = usectime
							statestr = state_name(workingstate) # "MASTER"
							statecolor = 0
						elif stablestate==0 or workingstate!=nextstate:
							statestr = "transition %s -> %s" % (state_name(workingstate),state_name(nextstate))
							statecolor = 8
						else:
							statestr = state_name(workingstate)
							statecolor = state_color(workingstate,sync)
							if workingstate==STATE_FOLLOWER:
								# if sync==0:
								# 	statestr += " (DESYNC)"
								# if sync==2:
								# 	statestr += " (DELAYED)"
								# if sync==3:
								# 	statestr += " (INIT)"
								self._followerfound = 1
								followerconn = conn
								followerinfo = data
								follower_exportschecksum = exportschecksum
								follower_metaid = metaid
							if workingstate==STATE_USURPER and self._usurperfound==0:
								self._usurperfound = 1
								usurperconn = conn
								usurperinfo = data
								usurper_exportschecksum = exportschecksum
								usurper_metaid = metaid
							if workingstate==STATE_ELECT and self._electfound==0:
								self._electfound = 1
								electconn = conn
								electinfo = data
								elect_exportschecksum = exportschecksum
								elect_metaid = metaid
							if (workingstate==STATE_LEADER or workingstate==STATE_DEPUTY) and self._leaderfound==0:
								leaderconn = conn
								leaderinfo = data
								self._leaderfound = 1
								if (workingstate==STATE_DEPUTY):
									self._deputyfound = 1
								leader_exportschecksum = exportschecksum
								leader_metaid = metaid
								self._leader_usectime = usectime
				except Exception:
					statestr = STATE_STR_BUSY
					statecolor = 7
			except Exception:
				workingstate = STATE_UNREACHABLE
				statestr = STATE_STR_UNREACHABLE

			if conn and conn!=leaderconn and conn!=electconn and conn!=usurperconn and conn!=followerconn:
				del conn
			if find_all:
				self._masterlist.append(MasterServer(mhost,mport,version,workingstate,sync,statestr,statecolor,metaversion,memusage,syscpu,usercpu,lastsuccessfulstore,lastsaveseconds,lastsavestatus,exportschecksum,metaid,lastsavemetaversion,lastsavemetachecksum,usectime,chlogtime))

			if self._leaderfound:
				self._masterconn = leaderconn
				self._masterinfo = leaderinfo
				self._master_exportschecksum = leader_exportschecksum
				self._master_metaid = leader_metaid
				if not find_all:
					break # don't need to find other masters if leader is found
			elif self._electfound:
				self._masterconn = electconn
				self._masterinfo = electinfo
				self._master_exportschecksum = elect_exportschecksum
				self._master_metaid = elect_metaid
			elif self._usurperfound:
				self._masterconn = usurperconn
				self._masterinfo = usurperinfo
				self._master_exportschecksum = usurper_exportschecksum
				self._master_metaid = usurper_metaid
			elif self._followerfound:
				self._masterconn = followerconn
				self._masterinfo = followerinfo
				self._master_exportschecksum = follower_exportschecksum
				self._master_metaid = follower_metaid



	# Returns a list of all master servers in the cluster along with its highest metaversion and checksum (tuple)
	# Sorts if IMorder - sort order is provided, IMrev - reverse order
	def get_masterservers(self, IMorder=0, IMrev=False):
		if self._masterlist==None: # _masterlist could not be initialized if only the leader was found on initialization
			self.find_masters(True)
			# self.find_all_masters(None, True)

		if IMorder==None:
			return self._masterlist
		masterservers = []
		for ms in self._masterlist:
			if   IMorder==0:	key=lambda ms: (ms.sortip, ms.port)
			elif IMorder==1:	key=lambda ms: (ms.sortip)
			elif IMorder==2:	key=lambda ms: (ms.sortver)
			elif IMorder==3:	key=lambda ms: (ms.statecolor)
			elif IMorder==4:	key=lambda ms: (ms.usectime if ms.usectime!=None else 0)
			elif IMorder==5:	key=lambda ms: (ms.metaversion)
			elif IMorder==6:	key=lambda ms: (ms.metaid)
			elif IMorder==7:	key=lambda ms: (ms.chlogtime if ms.chlogtime!=None else 0)
			elif IMorder==8:	key=lambda ms: (ms.memusage)
			elif IMorder==9:	key=lambda ms: (ms.syscpu+ms.usercpu)
			elif IMorder==10:	key=lambda ms: (ms.lastsuccessfulstore)
			elif IMorder==11:	key=lambda ms: (ms.lastsaveseconds)
			elif IMorder==12:	key=lambda ms: (ms.lastsavestatus)
			elif IMorder==13:	key=lambda ms: (ms.lastsavemetaversion)
			elif IMorder==14:	key=lambda ms: (ms.lastsavemetachecksum)
			elif IMorder==15:	key=lambda ms: (ms.exportschecksum)
			else:				      key=lambda ms: (0)
			masterservers.append(ms)
		masterservers.sort(key=key)
		if IMrev:
			masterservers.reverse()
		return masterservers

	# Returns a list of all master servers in the cluster along with secdelta and metadelay
	# Sorts if IMorder - sort order is provided, IMrev - reverse order
	def update_masterservers_delays(self, IMorder=0, IMrev=False):
		mservers = self.get_masterservers(IMorder, IMrev)
		# Calculate necessary metrics from all metadata servers
		highest_saved_metaversion = 0
		highest_metaversion_checksum = 0
		master_minusectime = None
		master_maxusectime = None
		master_deltausectime = None
		for ms in mservers:
			if ms.lastsavemetaversion!=None and ms.lastsavemetachecksum!=None:
				if ms.lastsavemetaversion>highest_saved_metaversion:
					highest_saved_metaversion = ms.lastsavemetaversion
					highest_metaversion_checksum = ms.lastsavemetachecksum
				elif ms.lastsavemetaversion==highest_saved_metaversion:
					highest_metaversion_checksum |= ms.lastsavemetachecksum
			if self._leader_usectime==None or self._leader_usectime==0:
				if ms.usectime!=None and ms.usectime>0:
					if master_minusectime==None or ms.usectime<master_minusectime:
						master_minusectime = ms.usectime
					if master_maxusectime==None or ms.usectime>master_maxusectime:
						master_maxusectime = ms.usectime
				if master_maxusectime and master_minusectime:
					master_deltausectime = master_maxusectime - master_minusectime
		updated_masterlist = []
		for ms in mservers:
			secdelta = 0.0
			if ms.usectime!=None and ms.usectime!=0:
				if self._leader_usectime==None or self._leader_usectime==0:
					if master_deltausectime!=None:
						secdelta = (master_deltausectime) / 1000000.0
				else:
					if self._leader_usectime > ms.usectime:
						secdelta = (self._leader_usectime - ms.usectime) / 1000000.0
					else:
						secdelta = (ms.usectime - self._leader_usectime) / 1000000.0
			if ms.chlogtime==None or ms.chlogtime==0 or self._leader_usectime==None or self._leader_usectime==0:
				metadelay = None
			else:
				metadelay = self._leader_usectime/1000000.0 - ms.chlogtime
				if metadelay>1.0:
					metadelay-=1.0
				else:
					metadelay=0.0
			ms.secdelta = secdelta
			ms.metadelay = metadelay
			updated_masterlist.append(ms)
		self._masterlist = updated_masterlist
		return highest_saved_metaversion, highest_metaversion_checksum