File: events.cc

package info (click to toggle)
c%2B%2B-annotations 10.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 10,536 kB
  • ctags: 3,247
  • sloc: cpp: 19,157; makefile: 1,521; ansic: 165; sh: 128; perl: 90
file content (147 lines) | stat: -rw-r--r-- 3,507 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// g++ --std=c++0x -pthread events.cc

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <iostream>
#include <chrono>

    class Semaphore
    {
//data
mutable std::mutex d_mutex;
std::condition_variable d_condition;
size_t d_available;
//=
        public:
            Semaphore(size_t available)
            :
                d_available(available)
            {}

            void wait();
            void notify_all();
            size_t size() const;                 // returning d_available
    };

size_t Semaphore::size() const
{
    std::lock_guard<std::mutex> lk(d_mutex);
    return d_available;
}

//notify_all
void Semaphore::notify_all()
{
    std::lock_guard<std::mutex> lk(d_mutex);    // get the lock
    if (d_available++ == 0)
        d_condition.notify_all();   // use notify_one to notify one other
                                    // thread
}   // the lock is released
//=

//wait
void Semaphore::wait()
{
    std::unique_lock<std::mutex> lk(d_mutex);   // get the lock
    while (d_available == 0)
        d_condition.wait(lk);   // internally releases the lock
                                // and waits, on exit
                                // acquires the lock again
    --d_available;              // dec. available
}   // the lock is released
//=


    using namespace std;

    Semaphore     g_available(5);
    Semaphore     g_filled(0);

    mutex g_qMutex;
    queue<size_t> g_queue;

    struct Producer
    {
        size_t d_trials;
        size_t d_item;

        Producer(size_t trials)
        :
            d_trials(trials),
            d_item(0)
        {}

        void operator()()
        {
            for (size_t run = 0; run != d_trials; ++run)
            {
                ++d_item;
                cout << "Produced item " << d_item << endl;
                g_available.wait();
                {
                    lock_guard<mutex> lg(g_qMutex);
                    g_queue.push(d_item);
                }
                g_filled.notify_all();
            }
        }
    };

    struct Consumer
    {
        size_t d_trials;
        int d_nr;

        Consumer(size_t trials, int nr)
        :
            d_trials(trials),
            d_nr(nr)
        {}

        void operator()()
        {
            for (size_t run = 0; run != d_trials; ++run)
            {
                g_filled.wait();
                size_t d_item;
                {
                    lock_guard<mutex> lg(g_qMutex);
                    d_item = g_queue.front();
                    this_thread::sleep_for(chrono::milliseconds(10));
                    g_queue.pop();
                }
                g_available.notify_all();
                cout << "\t\tConsumer " << d_nr << " got item " <<
                                                        d_item << endl;
            }
        }
    };


    int main(int argc, char **argv)
    {
        if (argc == 1)
        {
            cerr << "Need nTrials argument\n";
            return 1;
        }
        cerr << "Go!\n";
        size_t trials = stoul(argv[1]) + 2 / 3 * 3;

        Producer prod(trials); //  << 1);
        Consumer cons1(trials / 3, 1);
        Consumer cons2(trials / 3, 2);
        Consumer cons3(trials / 3, 3);

        thread consume1(cons1);
        thread consume2(cons2);
        thread consume3(cons3);
        thread produce(prod);

        produce.join();
        consume1.join();
        consume2.join();
        consume3.join();
    }