From eab3baaba75c8c9e549aea54d3b356ab287a57b0 Mon Sep 17 00:00:00 2001
From: Patrick Reynolds <patrick.reynolds@github.com>
Date: Tue, 11 Mar 2014 16:01:25 -0500
Subject: [PATCH 1/3] use ruby select api with expandable fd sets

Conflicts:
	ext/em.h
---
 ext/em.cpp             | 54 +++++++++++++++++++++++++-------------------------
 ext/em.h               | 10 +++++-----
 tests/test_many_fds.rb | 22 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 32 deletions(-)
 create mode 100644 tests/test_many_fds.rb

diff --git a/ext/em.cpp b/ext/em.cpp
index 670da31..6a3a2ef 100644
--- a/ext/em.cpp
+++ b/ext/em.cpp
@@ -524,12 +524,12 @@ void EventMachine_t::_RunEpollOnce()
 	#ifdef HAVE_RB_WAIT_FOR_SINGLE_FD
 	if ((ret = rb_wait_for_single_fd(epfd, RB_WAITFD_IN|RB_WAITFD_PRI, &tv)) < 1) {
 	#else
-	fd_set fdreads;
+	rb_fdset_t fdreads;
 
-	FD_ZERO(&fdreads);
-	FD_SET(epfd, &fdreads);
+	rb_fd_init(&fdreads);
+	rb_fd_set(epfd, &fdreads);
 
-	if ((ret = rb_thread_select(epfd + 1, &fdreads, NULL, NULL, &tv)) < 1) {
+	if ((ret = rb_thread_fd_select(epfd + 1, &fdreads, NULL, NULL, &tv)) < 1) {
 	#endif
 		if (ret == -1) {
 			assert(errno != EINVAL);
@@ -601,12 +601,12 @@ void EventMachine_t::_RunKqueueOnce()
 	#ifdef HAVE_RB_WAIT_FOR_SINGLE_FD
 	if ((ret = rb_wait_for_single_fd(kqfd, RB_WAITFD_IN|RB_WAITFD_PRI, &tv)) < 1) {
 	#else
-	fd_set fdreads;
+	rb_fdset_t fdreads;
 
-	FD_ZERO(&fdreads);
-	FD_SET(kqfd, &fdreads);
+	rb_fd_init(&fdreads);
+	rb_fd_set(kqfd, &fdreads);
 
-	if ((ret = rb_thread_select(kqfd + 1, &fdreads, NULL, NULL, &tv)) < 1) {
+	if ((ret = rb_thread_fd_select(kqfd + 1, &fdreads, NULL, NULL, &tv)) < 1) {
 	#endif
 		if (ret == -1) {
 			assert(errno != EINVAL);
@@ -792,9 +792,9 @@ SelectData_t::SelectData_t
 SelectData_t::SelectData_t()
 {
 	maxsocket = 0;
-	FD_ZERO (&fdreads);
-	FD_ZERO (&fdwrites);
-	FD_ZERO (&fderrors);
+	rb_fd_init (&fdreads);
+	rb_fd_init (&fdwrites);
+	rb_fd_init (&fderrors);
 }
 
 
@@ -807,7 +807,7 @@ _SelectDataSelect
 static VALUE _SelectDataSelect (void *v)
 {
 	SelectData_t *sd = (SelectData_t*)v;
-	sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), &(sd->fderrors), &(sd->tv));
+	sd->nSockets = rb_fd_select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), &(sd->fderrors), &(sd->tv));
 	return Qnil;
 }
 #endif
@@ -848,9 +848,9 @@ void EventMachine_t::_RunSelectOnce()
 
 	SelectData_t SelectData;
 	/*
-	fd_set fdreads, fdwrites;
-	FD_ZERO (&fdreads);
-	FD_ZERO (&fdwrites);
+	rb_fdset_t fdreads, fdwrites;
+	rb_fd_init (&fdreads);
+	rb_fd_init (&fdwrites);
 
 	int maxsocket = 0;
 	*/
@@ -860,7 +860,7 @@ void EventMachine_t::_RunSelectOnce()
 	// running on localhost with a randomly-chosen port. (*Puke*)
 	// Windows has a version of the Unix pipe() library function, but it doesn't
 	// give you back descriptors that are selectable.
-	FD_SET (LoopBreakerReader, &(SelectData.fdreads));
+	rb_fd_set (LoopBreakerReader, &(SelectData.fdreads));
 	if (SelectData.maxsocket < LoopBreakerReader)
 		SelectData.maxsocket = LoopBreakerReader;
 
@@ -875,15 +875,15 @@ void EventMachine_t::_RunSelectOnce()
 		assert (sd != INVALID_SOCKET);
 
 		if (ed->SelectForRead())
-			FD_SET (sd, &(SelectData.fdreads));
+			rb_fd_set (sd, &(SelectData.fdreads));
 		if (ed->SelectForWrite())
-			FD_SET (sd, &(SelectData.fdwrites));
+			rb_fd_set (sd, &(SelectData.fdwrites));
 
 		#ifdef OS_WIN32
 		/* 21Sep09: on windows, a non-blocking connect() that fails does not come up as writable.
 		   Instead, it is added to the error set. See http://www.mail-archive.com/openssl-users@openssl.org/msg58500.html
 		*/
-		FD_SET (sd, &(SelectData.fderrors));
+		rb_fd_set (sd, &(SelectData.fderrors));
 		#endif
 
 		if (SelectData.maxsocket < sd)
@@ -918,15 +918,15 @@ void EventMachine_t::_RunSelectOnce()
 					continue;
 				assert (sd != INVALID_SOCKET);
 
-				if (FD_ISSET (sd, &(SelectData.fdwrites)))
+				if (rb_fd_isset (sd, &(SelectData.fdwrites)))
 					ed->Write();
-				if (FD_ISSET (sd, &(SelectData.fdreads)))
+				if (rb_fd_isset (sd, &(SelectData.fdreads)))
 					ed->Read();
-				if (FD_ISSET (sd, &(SelectData.fderrors)))
+				if (rb_fd_isset (sd, &(SelectData.fderrors)))
 					ed->HandleError();
 			}
 
-			if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
+			if (rb_fd_isset (LoopBreakerReader, &(SelectData.fdreads)))
 				_ReadLoopBreaker();
 		}
 		else if (s < 0) {
@@ -964,11 +964,11 @@ void EventMachine_t::_CleanBadDescriptors()
 		tv.tv_sec = 0;
 		tv.tv_usec = 0;
 
-		fd_set fds;
-		FD_ZERO(&fds);
-		FD_SET(sd, &fds);
+		rb_fdset_t fds;
+		rb_fd_init(&fds);
+		rb_fd_set(sd, &fds);
 
-		int ret = select(sd + 1, &fds, NULL, NULL, &tv);
+		int ret = rb_fd_select(sd + 1, &fds, NULL, NULL, &tv);
 
 		if (ret == -1) {
 			if (errno == EBADF)
diff --git a/ext/em.h b/ext/em.h
index df68ec1..ff7c26c 100644
--- a/ext/em.h
+++ b/ext/em.h
@@ -22,7 +22,7 @@ See the file COPYING for complete licensing information.
 
 #ifdef BUILD_FOR_RUBY
   #include <ruby.h>
-  #define EmSelect rb_thread_select
+  #define EmSelect rb_thread_fd_select
 
   #ifdef HAVE_RB_WAIT_FOR_SINGLE_FD
     #include <ruby/io.h>
@@ -57,7 +57,7 @@ See the file COPYING for complete licensing information.
     #define RSTRING_LENINT(str) RSTRING_LEN(str)
   #endif
 #else
-  #define EmSelect select
+  #define EmSelect rb_fd_select
 #endif
 
 class EventableDescriptor;
@@ -233,9 +233,9 @@ struct SelectData_t
 	int _Select();
 
 	int maxsocket;
-	fd_set fdreads;
-	fd_set fdwrites;
-	fd_set fderrors;
+	rb_fdset_t fdreads;
+	rb_fdset_t fdwrites;
+	rb_fdset_t fderrors;
 	timeval tv;
 	int nSockets;
 };
diff --git a/tests/test_many_fds.rb b/tests/test_many_fds.rb
new file mode 100644
index 0000000..74dc926
--- /dev/null
+++ b/tests/test_many_fds.rb
@@ -0,0 +1,22 @@
+require 'em_test_helper'
+require 'socket'
+
+class TestManyFDs < Test::Unit::TestCase
+  def setup
+    @port = next_port
+  end
+
+  def test_connection_class_cache
+    mod = Module.new
+    a = nil
+    Process.setrlimit(Process::RLIMIT_NOFILE,4096);
+    EM.run {
+      EM.start_server '127.0.0.1', @port, mod
+      1100.times do
+        a = EM.connect '127.0.0.1', @port, mod
+        assert_kind_of EM::Connection, a
+      end
+      EM.stop
+    }
+  end
+end
-- 
2.1.4

