File: mc-centralized-mutex.py

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (88 lines) | stat: -rw-r--r-- 3,256 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright (c) 2010-2025. The SimGrid Team. All rights reserved.

# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.

# **************** Centralized Mutual Exclusion Algorithm ******************
# This example implements a centralized mutual exclusion algorithm.        
# There is no bug on it, it is just provided to test the state space       
# reduction of DPOR.                                                       
# **************************************************************************

import sys
from enum import Enum
from simgrid import Actor, Engine, Host, Semaphore, NetZone, Mailbox, this_actor, MC_assert

CS_PER_PROCESS = 1
PROCESS_AMOUNT = 2

MsgKind = Enum('MsgKind', ['GRANT', 'REQUEST', 'RELEASE', 'FINISH'])

class Message:
  kind           = MsgKind.GRANT
  return_mailbox = None

  def __init__(self, kind,  mbox):
    self.kind = kind
    self.return_mailbox = mbox

def coordinator():
  requests = []
  mbox = Mailbox.by_name("coordinator")

  critical_section_used = False # initially the critical section is idle
  active_clients = PROCESS_AMOUNT

  while active_clients > 0:
    this_actor.info(f"Still {active_clients} clients")
    m = mbox.get()
    if m.kind == MsgKind.REQUEST:
      if (critical_section_used): # need to push the request in the vector
        this_actor.info("CS already used. Queue the request")
        requests.append(m.return_mailbox)
      else: # can serve it immediately
        this_actor.info(f"CS idle. Grant immediately to {m.return_mailbox.name}")
        m.return_mailbox.put(Message(MsgKind.GRANT, mbox), 1000)
        critical_section_used = True
    elif m.kind == MsgKind.FINISH:
      active_clients -= 1
    else: # that's a release. Check if someone was waiting for the lock
      if (requests != []):
        this_actor.info(f"CS release. Grant to queued requests (queue size: {len(requests)})")
        req = requests.pop()
        req.put(Message(MsgKind.GRANT, mbox), 1000)
      else: # nobody wants it
        this_actor.info("CS release. resource now idle")
        critical_section_used = False
  
  this_actor.info("Received all releases, quit now")

def client():
  my_pid = this_actor.get_pid()
  my_mailbox = Mailbox.by_name(str(my_pid))

  # request the CS several times, sleeping a bit in between
  for i in range (CS_PER_PROCESS):
    this_actor.info(f"Ask the request for the {i}th time of {CS_PER_PROCESS} times")
    Mailbox.by_name("coordinator").put(Message(MsgKind.REQUEST, my_mailbox), 1000)
    # wait for the answer
    grant = my_mailbox.get()
    this_actor.info("got the answer. Sleep a bit and release it")
    this_actor.sleep_for(1)

    Mailbox.by_name("coordinator").put(Message(MsgKind.RELEASE, my_mailbox), 1000)
    this_actor.sleep_for(my_pid)
  
  this_actor.info("Got all the CS I wanted, quit now")
  Mailbox.by_name("coordinator").put(Message(MsgKind.FINISH, my_mailbox), 1)

if __name__ == '__main__':
    e = Engine(sys.argv)

    e.load_platform(sys.argv[1])

    e.host_by_name("Tremblay").add_actor("coordinator", coordinator)
    for i in range(PROCESS_AMOUNT):
      e.host_by_name("Fafard").add_actor("client", client)

    e.run()