File: zeroconf-thread.cpp

package info (click to toggle)
aseba 1.6.0-5
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 18,300 kB
  • sloc: cpp: 44,647; ansic: 5,686; python: 1,455; java: 1,136; sh: 393; xml: 202; makefile: 10
file content (158 lines) | stat: -rw-r--r-- 4,524 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
	Aseba - an event-based framework for distributed robot control
	Copyright (C) 2007--2016:
		Stephane Magnenat <stephane at magnenat dot net>
		(http://stephane.magnenat.net)
		and other contributors, see authors.txt for details

	This program is free software: you can redistribute it and/or modify
	it under the terms of the GNU Lesser General Public License as published
	by the Free Software Foundation, version 3 of the License.

	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Lesser General Public License for more details.

	You should have received a copy of the GNU Lesser General Public License
	along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "../utils/utils.h"
#include "../utils/FormatableString.h"
#include <dashel/dashel.h>
#include "zeroconf-thread.h"
#include "dns_sd.h"
#ifdef WIN32
#include <winsock2.h>
#endif

using namespace std;

namespace Aseba
{
	using namespace Dashel;

	//! Destructor, need to terminate the thread, clear all targets, and deallocate remaining service references
	ThreadZeroconf::~ThreadZeroconf()
	{
		running = false;
		threadWait.notify_one();
		watcher.join(); // tell watcher to stop
		// clear all targets
		targets.clear();
		// deallocate remaining service references
		for (auto serviceRef: serviceRefs)
			DNSServiceRefDeallocate(serviceRef);
		for (auto serviceRef: pendingReleaseServiceRefs)
			DNSServiceRefDeallocate(serviceRef);
	}

	//! Wait for the watcher thread to complete, rethrowing exceptions
	void ThreadZeroconf::run()
	{
		watcher.join();
		if (watcherException)
			std::rethrow_exception(watcherException);
	}

	//! With the lock held, insert the provided service reference to serviceRefs
	void ThreadZeroconf::processServiceRef(DNSServiceRef serviceRef)
	{
		assert(serviceRef);
		{
			std::lock_guard<std::recursive_mutex> locker(watcherLock);
			serviceRefs.insert(serviceRef);
		}
		threadWait.notify_one();
	}

	//! With the lock held, move the provided service reference from serviceRefs to
	//! pendingReleaseServiceRefs, so that it will be released after the current call to select has completed.
	void ThreadZeroconf::releaseServiceRef(DNSServiceRef serviceRef)
	{
		if (!serviceRef)
			return;

		std::lock_guard<std::recursive_mutex> locker(watcherLock);
		serviceRefs.erase(serviceRef);
		pendingReleaseServiceRefs.insert(serviceRef);
	}

	//! When serviceRefs is not empty, for each service reference collect their associated
	//! file descriptor and wait on them for activity using select.
	void ThreadZeroconf::handleDnsServiceEvents()
	{
		struct timeval tv{1,0}; //!< maximum time to learn about a new service (1 sec)

		while (true)
		{
			{// lock
				std::unique_lock<std::recursive_mutex> locker(watcherLock);
				// we use a condition variable to avoid infinite loops here
				threadWait.wait(locker, [&] { return !serviceRefs.empty() || !running; });
			}// unlock
			if (!running)
				break;
			fd_set fds;
			int max_fds(0);
			FD_ZERO(&fds);
			std::map<DNSServiceRef,int> serviceFd;

			int fd_count(0);

			{// lock
				std::lock_guard<std::recursive_mutex> locker(watcherLock);
				for (auto serviceRef: serviceRefs)
				{
					int fd = DNSServiceRefSockFD(serviceRef);
					if (fd != -1)
					{
						max_fds = max_fds > fd ? max_fds : fd;
						FD_SET(fd, &fds);
						serviceFd[serviceRef] = fd;
						fd_count++;
					}
				}
			}// unlock
			int result = select(max_fds+1, &fds, (fd_set*)nullptr, (fd_set*)nullptr, &tv);
			try {
				if (result >= 0)
				{
					// lock
					std::lock_guard<std::recursive_mutex> locker(watcherLock);

					// release old service refs
					for (auto serviceRef: pendingReleaseServiceRefs)
						DNSServiceRefDeallocate(serviceRef);
					pendingReleaseServiceRefs.clear();

					// check for activity
					if (result > 0)
					{
						// no timeout
						for (auto serviceRef: serviceRefs)
						{
							auto fdIt(serviceFd.find(serviceRef));
							if (fdIt != serviceFd.end())
							{
								if (FD_ISSET(fdIt->second, &fds))
									DNSServiceProcessResult(serviceRef);
							}
						}
					}
					else
						; // timeout

					// unlock
				}
				else
					throw Zeroconf::Error(FormatableString("handleDnsServiceEvents: select returned %0 errno %1").arg(result).arg(errno));
			}
			catch (...) {
				watcherException = std::current_exception();
				break;
			}
		}
	}
} // namespace Aseba