# frozen_string_literal: true
# rubocop:todo all

require 'spec_helper'

describe Mongo::Cursor do
  let(:authorized_collection) do
    authorized_client['cursor_spec_collection']
  end

  let(:context) do
    Mongo::Operation::Context.new(client: authorized_client)
  end

  before do
    authorized_collection.drop
  end

  describe '#initialize' do
    let(:server) do
      view.send(:server_selector).select_server(authorized_client.cluster)
    end

    let(:reply) do
      view.send(:send_initial_query, server, context)
    end

    let(:cursor) do
      described_class.new(view, reply, server)
    end

    before do
      documents = [{test: 1}] * 10
      authorized_collection.insert_many(documents)
    end

    shared_context 'with initialized pool' do
      before do
        ClientRegistry.instance.close_all_clients

        # These tests really like creating pools (and thus scheduling
        # the pools' finalizers) when querying collections.
        # Deal with this by pre-creating pools for all known servers.
        cluster = authorized_collection.client.cluster
        cluster.next_primary
        cluster.servers.each do |server|
          reset_pool(server)
        end
      end

      after do
        authorized_client.cluster.servers.each do |server|
          if pool = server.pool_internal
            pool.close
          end
        end
      end
    end

    context 'cursor exhausted by initial result' do
      include_context 'with initialized pool'
      require_no_linting

      let(:view) do
        Mongo::Collection::View.new(authorized_collection)
      end

      it 'does not schedule the finalizer' do
        # Due to https://jira.mongodb.org/browse/RUBY-1772, restrict
        # the scope of the assertion
        RSpec::Mocks.with_temporary_scope do
          expect(ObjectSpace).not_to receive(:define_finalizer)
          cursor
        end
      end
    end

    context 'cursor not exhausted by initial result' do
      include_context 'with initialized pool'
      require_no_linting

      let(:view) do
        Mongo::Collection::View.new(authorized_collection, {}, batch_size: 2)
      end

      it 'schedules the finalizer' do
        # Due to https://jira.mongodb.org/browse/RUBY-1772, restrict
        # the scope of the assertion
        RSpec::Mocks.with_temporary_scope do
          expect(ObjectSpace).to receive(:define_finalizer)
          cursor
        end
      end
    end

    context 'server is unknown' do
      require_topology :single, :replica_set, :sharded

      let(:server) do
        view.send(:server_selector).select_server(authorized_client.cluster).tap do |server|
          authorized_client.cluster.close
          server.unknown!
        end
      end

      let(:view) do
        Mongo::Collection::View.new(authorized_collection)
      end

      it 'raises ServerNotUsable' do
        lambda do
          cursor
        end.should raise_error(Mongo::Error::ServerNotUsable)
      end
    end
  end

  describe '#each' do

    let(:server) do
      view.send(:server_selector).select_server(authorized_client.cluster)
    end

    let(:reply) do
      view.send(:send_initial_query, server, context)
    end

    let(:cursor) do
      described_class.new(view, reply, server)
    end

    context 'when no options are provided to the view' do

      let(:view) do
        Mongo::Collection::View.new(authorized_collection)
      end

      context 'when the initial query retrieves all documents' do

        let(:documents) do
          (1..10).map{ |i| { field: "test#{i}" }}
        end

        before do
          authorized_collection.insert_many(documents)
        end

        it 'returns the correct amount' do
          expect(cursor.to_a.count).to eq(10)
        end

        it 'iterates the documents' do
          cursor.each do |doc|
            expect(doc).to have_key('field')
          end
        end
      end

      context 'when the initial query does not retrieve all documents' do

        let(:documents) do
          (1..102).map{ |i| { field: "test#{i}" }}
        end

        before do
          authorized_collection.insert_many(documents)
        end

        context 'when a getMore gets a socket error' do

          let(:op) do
            double('operation')
          end

          before do
            expect(cursor).to receive(:get_more_operation).and_return(op).ordered
            if SpecConfig.instance.connect_options[:connect] == :load_balanced
              expect(op).to receive(:execute_with_connection).and_raise(Mongo::Error::SocketError).ordered
            else
              expect(op).to receive(:execute).and_raise(Mongo::Error::SocketError).ordered
            end
          end

          it 'raises the error' do
            expect do
              cursor.each do |doc|
              end
            end.to raise_error(Mongo::Error::SocketError)
          end
        end

        context 'when no errors occur' do

          it 'returns the correct amount' do
            expect(cursor.to_a.count).to eq(102)
          end

          it 'iterates the documents' do
            cursor.each do |doc|
              expect(doc).to have_key('field')
            end
          end
        end
      end
    end

    context 'when options are provided to the view' do

      let(:documents) do
        (1..10).map{ |i| { field: "test#{i}" }}
      end

      before do
        authorized_collection.drop
        authorized_collection.insert_many(documents)
      end

      context 'when a limit is provided' do

        context 'when no batch size is provided' do

          context 'when the limit is positive' do

            let(:view) do
              Mongo::Collection::View.new(authorized_collection, {}, :limit => 2)
            end

            it 'returns the correct amount' do
              expect(cursor.to_a.count).to eq(2)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end

          context 'when the limit is negative' do

            let(:view) do
              Mongo::Collection::View.new(authorized_collection, {}, :limit => -2)
            end

            it 'returns the positive number of documents' do
              expect(cursor.to_a.count).to eq(2)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end

          context 'when the limit is zero' do

            let(:view) do
              Mongo::Collection::View.new(authorized_collection, {}, :limit => 0)
            end

            it 'returns all documents' do
              expect(cursor.to_a.count).to eq(10)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end
        end

        context 'when a batch size is provided' do

          context 'when the batch size is less than the limit' do

            let(:view) do
              Mongo::Collection::View.new(
                authorized_collection,
                {},
                :limit => 5, :batch_size => 3
              )
            end

            it 'returns the limited number of documents' do
              expect(cursor.to_a.count).to eq(5)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end

          context 'when the batch size is more than the limit' do

            let(:view) do
              Mongo::Collection::View.new(
                authorized_collection,
                {},
                :limit => 5, :batch_size => 7
              )
            end

            it 'returns the limited number of documents' do
              expect(cursor.to_a.count).to eq(5)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end

          context 'when the batch size is the same as the limit' do

            let(:view) do
              Mongo::Collection::View.new(
                authorized_collection,
                {},
                :limit => 5, :batch_size => 5
              )
            end

            it 'returns the limited number of documents' do
              expect(cursor.to_a.count).to eq(5)
            end

            it 'iterates the documents' do
              cursor.each do |doc|
                expect(doc).to have_key('field')
              end
            end
          end
        end
      end
    end

    context 'when the cursor is not fully iterated and is garbage collected' do

      let(:documents) do
        (1..6).map{ |i| { field: "test#{i}" }}
      end

      let(:cluster) do
        authorized_client.cluster
      end

      before do
        authorized_collection.insert_many(documents)
        cluster.schedule_kill_cursor(
          cursor.kill_spec(cursor.instance_variable_get(:@server))
        )
      end

      let(:view) do
        Mongo::Collection::View.new(
            authorized_collection,
            {},
            :batch_size => 2,
        )
      end

      let!(:cursor) do
        view.to_enum.next
        view.instance_variable_get(:@cursor)
      end

      it 'schedules a kill cursors op' do
        cluster.instance_variable_get(:@periodic_executor).flush
        expect do
          cursor.to_a
        # Mongo::Error::SessionEnded is raised here because the periodic executor
        # called above kills the cursor and closes the session.
        # This code is normally scheduled in cursor finalizer, so the cursor object
        # is garbage collected when the code is executed. So, a user won't get
        # this exception.
        end.to raise_exception(Mongo::Error::SessionEnded)
      end

      context 'when the cursor is unregistered before the kill cursors operations are executed' do
        # Sometimes JRuby yields 4 documents even though we are allowing
        # repeated cursor iteration below
        fails_on_jruby

        it 'does not send a kill cursors operation for the unregistered cursor' do
          # We need to verify that the cursor was able to retrieve more documents
          # from the server so that more than one batch is successfully received

          cluster.unregister_cursor(cursor.id)

          # The initial read is done on an enum obtained from the cursor.
          # The read below is done directly on the cursor. These are two
          # different objects. In MRI, iterating like this yields all of the
          # documents, hence we retrieved one document in the setup and
          # we expect to retrieve the remaining 5 here. In JRuby it appears that
          # the enum may buffers the first batch, such that the second document
          # sometimes is lost to the iteration and we retrieve 4 documents below.
          # But sometimes we get all 5 documents. In either case, all of the
          # documents are retrieved via two batches thus fulfilling the
          # requirement of the test to continue iterating the cursor.

=begin When repeated iteration of cursors is prohibited, these are the expectations
          if BSON::Environment.jruby?
            expected_counts = [4, 5]
          else
            expected_counts = [5]
          end
=end

          # Since currently repeated iteration of cursors is allowed, calling
          # to_a on the cursor would perform such an iteration and return
          # all documents of the initial read.
          expected_counts = [6]

          expect(expected_counts).to include(cursor.to_a.size)
        end
      end
    end

    context 'when the cursor is fully iterated' do

      let(:documents) do
        (1..3).map{ |i| { field: "test#{i}" }}
      end

      before do
        authorized_collection.delete_many
        authorized_collection.insert_many(documents)
      end

      let(:view) do
        authorized_collection.find({}, batch_size: 2)
      end

      let(:cursor) do
        view.instance_variable_get(:@cursor)
      end

      let!(:cursor_id) do
        enum.next
        enum.next
        cursor.id
      end

      let(:enum) do
        view.to_enum
      end

      let(:cursor_reaper) do
        authorized_collection.client.cluster.instance_variable_get(:@cursor_reaper)
      end

      it 'removes the cursor id from the active cursors tracked by the cluster cursor manager' do
        enum.next
        expect(cursor_reaper.instance_variable_get(:@active_cursor_ids)).not_to include(cursor_id)
      end
    end
  end

  context 'when an implicit session is used' do
    min_server_fcv '3.6'

    let(:subscriber) { Mrss::EventSubscriber.new }

    let(:subscribed_client) do
      authorized_client.tap do |client|
        client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
      end
    end

    let(:collection) do
      subscribed_client[TEST_COLL]
    end

    before do
      collection.delete_many
      collection.insert_many(documents)
    end

    let(:cursor) do
      view.instance_variable_get(:@cursor)
    end

    let(:enum) do
      view.to_enum
    end

    let(:session_pool_ids) do
      queue = view.client.cluster.session_pool.instance_variable_get(:@queue)
      queue.collect { |s| s.session_id }
    end

    let(:find_events) do
      subscriber.started_events.select { |e| e.command_name == "find" }
    end

    context 'when all results are retrieved in the first response' do

      let(:documents) do
        (1..2).map{ |i| { field: "test#{i}" }}
      end

      let(:view) do
        collection.find
      end

      it 'returns the session to the cluster session pool' do
        1.times { enum.next }
        expect(find_events.collect { |event| event.command['lsid'] }.uniq.size).to eq(1)
        expect(session_pool_ids).to include(find_events.collect { |event| event.command['lsid'] }.uniq.first)
      end
    end

    context 'when a getMore is needed to retrieve all results' do
      min_server_fcv '3.6'
      require_topology :single, :replica_set

      let(:documents) do
        (1..4).map{ |i| { field: "test#{i}" }}
      end

      let(:view) do
        collection.find({}, batch_size: 2, limit: 4)
      end


      context 'when result set is not iterated fully but the known # of documents is retrieved' do
        # These tests set up a collection with 4 documents and find all
        # of them but, instead of iterating the result set to completion,
        # manually retrieve the 4 documents that are expected to exist.
        # On 4.9 and lower servers, the server closes the cursor after
        # retrieving the 4 documents.
        # On 5.0, the server does not close the cursor after the 4 documents
        # have been retrieved, and the client must attempt to retrieve the
        # next batch (which would be empty) for the server to realize that
        # the result set is fully iterated and close the cursor.
        max_server_version '4.9'

        context 'when not all documents are iterated' do

          it 'returns the session to the cluster session pool' do
            3.times { enum.next }
            expect(find_events.collect { |event| event.command['lsid'] }.uniq.size).to eq(1)
            expect(session_pool_ids).to include(find_events.collect { |event| event.command['lsid'] }.uniq.first)
          end
        end

        context 'when the same number of documents is iterated as # in the collection' do

          it 'returns the session to the cluster session pool' do
            4.times { enum.next }
            expect(find_events.collect { |event| event.command['lsid'] }.uniq.size).to eq(1)
            expect(session_pool_ids).to include(find_events.collect { |event| event.command['lsid'] }.uniq.first)
          end
        end
      end

      context 'when result set is iterated fully' do

        it 'returns the session to the cluster session pool' do
          # Iterate fully and assert that there are 4 documents total
          enum.to_a.length.should == 4
          expect(find_events.collect { |event| event.command['lsid'] }.uniq.size).to eq(1)
          expect(session_pool_ids).to include(find_events.collect { |event| event.command['lsid'] }.uniq.first)
        end
      end
    end

    context 'when the result set is iterated fully and the cursor id is non-zero' do
      min_server_fcv '5.0'

      let(:documents) do
        (1..5).map{ |i| { field: "test#{i}" }}
      end

      let(:view) { collection.find(field:{'$gte'=>BSON::MinKey.new}).sort(field:1).limit(5).batch_size(4) }

      before do
        view.to_a
      end

      it 'schedules a get more command' do
        get_more_commands = subscriber.started_events.select { |e| e.command_name == 'getMore' }
        expect(get_more_commands.length).to be 1
      end

      it 'has a non-zero cursor id on successful get more' do
        get_more_commands = subscriber.succeeded_events.select { |e| e.command_name == 'getMore' }
        expect(get_more_commands.length).to be 1
        expect(get_more_commands[0].reply['cursor']['id']).to_not be 0
      end

      it 'schedules a kill cursors command' do
        get_more_commands = subscriber.started_events.select { |e| e.command_name == 'killCursors' }
        expect(get_more_commands.length).to be 1
      end
    end
  end

  describe '#inspect' do

    let(:view) do
      Mongo::Collection::View.new(authorized_collection)
    end

    let(:query_spec) do
      { selector: {}, options: {},
        db_name: SpecConfig.instance.test_db, coll_name: TEST_COLL }
    end

    let(:conn_desc) do
      double('connection description').tap do |cd|
        allow(cd).to receive(:service_id).and_return(nil)
      end
    end

    let(:reply) do
      double('reply').tap do |reply|
        allow(reply).to receive(:is_a?).with(Mongo::Operation::Result).and_return(true)
        allow(reply).to receive(:namespace)
        allow(reply).to receive(:connection_description).and_return(conn_desc)
        allow(reply).to receive(:cursor_id).and_return(42)
        allow(reply).to receive(:connection_global_id).and_return(1)
        if SpecConfig.instance.connect_options[:connect] == :load_balanced
          allow(reply).to receive(:connection).and_return(nil)
        end
      end
    end

    let(:cursor) do
      described_class.new(view, reply, authorized_primary)
    end

    it 'returns a string' do
      expect(cursor.inspect).to be_a(String)
    end

    it 'returns a string containing the collection view inspect string' do
      expect(cursor.inspect).to match(/.*#{view.inspect}.*/)
    end
  end

  describe '#to_a' do

    let(:view) do
      Mongo::Collection::View.new(authorized_collection, {}, batch_size: 10)
    end

    let(:query_spec) do
      { :selector => {}, :options => {}, :db_name => SpecConfig.instance.test_db,
        :coll_name => authorized_collection.name }
    end

    let(:reply) do
      view.send(:send_initial_query, authorized_primary, context)
    end

    let(:cursor) do
      described_class.new(view, reply, authorized_primary)
    end

    context 'after partially iterating the cursor' do
      before do
        authorized_collection.drop
        docs = []
        100.times do |i|
          docs << {a: i}
        end
        authorized_collection.insert_many(docs)
      end

      context 'after #each was called once' do
        before do
          cursor.each do |doc|
            break
          end
        end

        it 'iterates from the beginning of the view' do
          expect(cursor.to_a.map { |doc| doc['a'] }).to eq((0..99).to_a)
        end
      end

      context 'after exactly one batch was iterated' do
        before do
          cursor.each_with_index do |doc, i|
            break if i == 9
          end
        end

        it 'iterates from the beginning of the view' do
          expect(cursor.to_a.map { |doc| doc['a'] }).to eq((0..99).to_a)
        end
      end

      context 'after two batches were iterated' do
        before do
          cursor.each_with_index do |doc, i|
            break if i == 19
          end
        end

=begin Behavior of pre-2.10 driver:
        it 'skips the second batch' do
          expect(cursor.to_a.map { |doc| doc['a'] }).to eq((0..9).to_a + (20..99).to_a)
        end
=end
        it 'raises InvalidCursorOperation' do
          expect do
            cursor.to_a
          end.to raise_error(Mongo::Error::InvalidCursorOperation, 'Cannot restart iteration of a cursor which issued a getMore')
        end
      end
    end
  end

  describe '#close' do
    let(:view) do
      Mongo::Collection::View.new(
        authorized_collection,
        {},
        batch_size: 2,
      )
    end

    let(:server) do
      view.send(:server_selector).select_server(authorized_client.cluster)
    end

    let(:reply) do
      view.send(:send_initial_query, server, context)
    end

    let(:cursor) do
      described_class.new(view, reply, server)
    end

    let(:documents) do
      (1..10).map{ |i| { field: "test#{i}" }}
    end

    before do
      authorized_collection.drop
      authorized_collection.insert_many(documents)
    end

    it 'closes' do
      expect(cursor).not_to be_closed
      cursor.close
      expect(cursor).to be_closed
    end

    context 'when closed from another thread' do
      it 'raises an error' do
        Thread.new do
          cursor.close
        end
        sleep(1)
        expect(cursor).to be_closed
        expect do
          cursor.to_a
        end.to raise_error Mongo::Error::InvalidCursorOperation
      end
    end

    context 'when there is a socket error during close' do
      clean_slate
      require_no_linting

      before do
        reset_pool(server)
      end

      after do
        server.pool.close
      end

      it 'does not raise an error' do
        cursor
        if SpecConfig.instance.connect_options[:connect] == :load_balanced
          expect(cursor.connection).to receive(:deliver)
              .at_least(:once)
              .and_raise(Mongo::Error::SocketError, "test error")
        else
          server.with_connection do |conn|
            expect(conn).to receive(:deliver)
              .at_least(:once)
              .and_raise(Mongo::Error::SocketError, "test error")
          end
        end
        expect do
          cursor.close
        end.not_to raise_error
      end
    end
  end

  describe '#batch_size' do
    let(:subscriber) { Mrss::EventSubscriber.new }

    let(:subscribed_client) do
      authorized_client.tap do |client|
        client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
      end
    end

    let(:collection) do
      subscribed_client[TEST_COLL]
    end

    let(:view) do
      collection.find({}, limit: limit)
    end

    before do
      collection.drop
      collection.insert_many([].fill({ "bar": "baz" }, 0, 102))
    end

    context 'when limit is 0 and batch_size is not set' do
      let(:limit) do
        0
      end

      it 'does not set batch_size' do
        view.to_a
        get_more_commands = subscriber.started_events.select { |e| e.command_name == 'getMore' }
        expect(get_more_commands.length).to eq(1)
        expect(get_more_commands.first.command.keys).not_to include('batchSize')
      end
    end

    context 'when limit is not zero and batch_size is not set' do
      let(:limit) do
        1000
      end

      it 'sets batch_size' do
        view.to_a
        get_more_commands = subscriber.started_events.select { |e| e.command_name == 'getMore' }

        expect(get_more_commands.length).to eq(1)
        expect(get_more_commands.first.command.keys).to include('batchSize')
      end
    end
  end
end
