File: CVBuffer.java

package info (click to toggle)
concurrent-dfsg 1.3.4-6
  • links: PTS
  • area: main
  • in suites: bookworm
  • size: 976 kB
  • sloc: java: 10,704; xml: 49; makefile: 12
file content (123 lines) | stat: -rw-r--r-- 2,537 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

package EDU.oswego.cs.dl.util.concurrent.misc;
import  EDU.oswego.cs.dl.util.concurrent.*;


public class CVBuffer implements BoundedChannel {
  private final Mutex mutex;
  private final CondVar notFull;
  private final CondVar notEmpty;
  private int count = 0;
  private int takePtr = 0;     
  private int putPtr = 0;
  private final Object[] array;

  public CVBuffer(int cap) { 
    array = new Object[cap];
    mutex = new Mutex();
    notFull = new CondVar(mutex);
    notEmpty = new CondVar(mutex);
  }

  public CVBuffer() { 
    this(DefaultChannelCapacity.get()); 
  }

  public int capacity() { return array.length; }

  public void put(Object x) throws InterruptedException {
    mutex.acquire();
    try {
      while (count == array.length) {
        notFull.await();
      }
      array[putPtr] = x;
      putPtr = (putPtr + 1) % array.length;
      ++count;
      notEmpty.signal();
    }
    finally {
      mutex.release();
    }
  }

  public Object take() throws InterruptedException {
    Object x = null;
    mutex.acquire();
    try {
      while (count == 0) {
        notEmpty.await();
      }
      x = array[takePtr];
      array[takePtr] = null;
      takePtr = (takePtr + 1) % array.length;
      --count;
      notFull.signal();
    }
    finally {
      mutex.release();
    }
    return x;
  }
    
  public boolean offer(Object x, long msecs) throws InterruptedException {
    mutex.acquire();
    try {
      if (count == array.length) {
        notFull.timedwait(msecs);
        if (count == array.length)
          return false;
      }
      array[putPtr] = x;
      putPtr = (putPtr + 1) % array.length;
      ++count;
      notEmpty.signal();
      return true;
    }
    finally {
      mutex.release();
    }
  }
  
  public Object poll(long msecs) throws InterruptedException {
    Object x = null;
    mutex.acquire();
    try {
      if (count == 0) {
        notEmpty.timedwait(msecs);
        if (count == 0)
          return null;
      }
      x = array[takePtr];
      array[takePtr] = null;
      takePtr = (takePtr + 1) % array.length;
      --count;
      notFull.signal();
    }
    finally {
      mutex.release();
    }
    return x;
  }

  public Object peek() {
    try {
      mutex.acquire();
      try {
        if (count == 0) 
          return null;
        else
          return array[takePtr];
      }
      finally {
        mutex.release();
      }
    }
    catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      return null;
    }
  }

}