require 'spec_helper'

describe Mongo::Collection::View::ChangeStream, if: test_change_streams? do

  let(:pipeline) do
    []
  end

  let(:options) do
    {}
  end

  let(:view_options) do
    {}
  end

  let(:view) do
    Mongo::Collection::View.new(authorized_collection, {}, view_options)
  end

  let(:change_stream) do
    described_class.new(view, pipeline, options)
  end

  let(:change_stream_document) do
    change_stream.send(:pipeline)[0]['$changeStream']
  end

  let!(:sample_resume_token) do
    stream = authorized_collection.watch
    authorized_collection.insert_one(a: 1)
    doc = stream.to_enum.next
    stream.close
    doc[:_id]
  end

  let(:command_selector) do
    command_spec[:selector]
  end

  let(:command_spec) do
    change_stream.send(:aggregate_spec, double('session'))
  end

  let(:cursor) do
    change_stream.instance_variable_get(:@cursor)
  end

  let(:error) do
    begin
      change_stream
    rescue => e
      e
    end
  end

  after do
    authorized_collection.delete_many
    begin; change_stream.close; rescue; end
  end

  describe '#initialize' do

    it 'sets the view' do
      expect(change_stream.view).to be(view)
    end

    it 'sets the options' do
      expect(change_stream.options).to eq(options)
    end

    context 'when full_document is provided' do

      context "when the value is 'default'" do

        let(:options) do
          { full_document: 'default' }
        end

        it 'sets the fullDocument value to default' do
          expect(change_stream_document[:fullDocument]).to eq('default')
        end
      end

      context "when the value is 'updateLookup'" do

        let(:options) do
          { full_document: 'updateLookup' }
        end

        it 'sets the fullDocument value to updateLookup' do
          expect(change_stream_document[:fullDocument]).to eq('updateLookup')
        end
      end
    end

    context 'when full_document is not provided' do

      it "defaults to use the 'default' value" do
        expect(change_stream_document[:fullDocument]).to eq('default')
      end
    end

    context 'when resume_after is provided' do

      let(:options) do
        { resume_after: sample_resume_token }
      end

      it 'sets the resumeAfter value to the provided document' do
        expect(change_stream_document[:resumeAfter]).to eq(sample_resume_token)
      end
    end

    context 'when max_await_time_ms is provided' do

      let(:options) do
        { max_await_time_ms: 10 }
      end

      it 'sets the maxTimeMS value to the provided document' do
        expect(command_selector[:maxTimeMS]).to eq(10)
      end
    end

    context 'when batch_size is provided' do

      let(:options) do
        { batch_size: 5 }
      end

      it 'sets the batchSize value to the provided document' do
        expect(command_selector[:cursor][:batchSize]).to eq(5)
      end
    end

    context 'when collation is provided'  do

      let(:options) do
        { 'collation' => { locale: 'en_US', strength: 2 } }
      end

      it 'sets the collation value to the provided document' do
        expect(command_selector['collation']).to eq(BSON::Document.new(options['collation']))
      end
    end

    context 'when a changeStream operator is provided by the user as well' do

      let(:pipeline) do
        [ { '$changeStream' => { fullDocument: 'default' } }]
      end

      it 'raises the error from the server' do
        expect(error).to be_a(Mongo::Error::OperationFailure)
        expect(error.message).to include('$changeStream is only valid as the first stage in a pipeline')
      end
    end

    context 'when the collection has a readConcern' do

      let(:collection) do
        authorized_collection.with(read_concern: { level: 'majority' })
      end

      let(:view) do
        Mongo::Collection::View.new(collection, {}, options)
      end

      it 'uses the read concern of the collection' do
        expect(command_selector[:readConcern]).to eq('level' => 'majority')
      end
    end

    context 'when no pipeline is supplied' do

      it 'uses an empty pipeline' do
        expect(command_selector[:pipeline][0].keys).to eq(['$changeStream'])
      end
    end

    context 'when other pipeline operators are supplied' do

      context 'when the other pipeline operators are supported' do

        let(:pipeline) do
          [{ '$project' => { '_id' => 0 }}]
        end

        it 'uses the pipeline operators' do
          expect(command_selector[:pipeline][1]).to eq(pipeline[0])
        end
      end

      context 'when the other pipeline operators are not supported' do

        let(:pipeline) do
          [{ '$unwind' => '$test' }]
        end

        it 'sends the pipeline to the server without a custom error' do
          expect {
            change_stream
          }.to raise_exception(Mongo::Error::OperationFailure)
        end

        context 'when the operation fails', if: test_change_streams? do

          let!(:before_last_use) do
            session.instance_variable_get(:@server_session).last_use
          end

          let!(:before_operation_time) do
            (session.operation_time || 0)
          end

          let(:pipeline) do
            [ { '$invalid' => '$test' }]
          end

          let(:options) do
            { session: session }
          end

          let!(:operation_result) do
            begin; change_stream; rescue => e; e; end
          end

          let(:session) do
            client.start_session
          end

          let(:client) do
            authorized_client
          end

          it 'raises an error' do
            expect(operation_result.class).to eq(Mongo::Error::OperationFailure)
          end

          it 'updates the last use value' do
            expect(session.instance_variable_get(:@server_session).last_use).not_to eq(before_last_use)
          end

          it 'updates the operation time value' do
            expect(session.operation_time).not_to eq(before_operation_time)
          end
        end
      end
    end

    context 'when the initial batch is empty' do

      before do
        change_stream
      end

      it 'does not close the cursor' do
        expect(cursor).to be_a(Mongo::Cursor)
      end
    end

    context 'when provided a session', if: sessions_enabled? && test_change_streams? do

      let(:options) do
        { session: session }
      end

      let(:operation) do
        change_stream
        authorized_collection.insert_one(a: 1)
        change_stream.to_enum.next
      end

      let(:client) do
        authorized_client
      end

      context 'when the session is created from the same client used for the operation' do

        let(:session) do
          client.start_session
        end

        let(:server_session) do
          session.instance_variable_get(:@server_session)
        end

        let!(:before_last_use) do
          server_session.last_use
        end

        let!(:before_operation_time) do
          (session.operation_time || 0)
        end

        let!(:operation_result) do
          operation
        end

        it 'updates the last use value' do
          expect(server_session.last_use).not_to eq(before_last_use)
        end

        it 'updates the operation time value' do
          expect(session.operation_time).not_to eq(before_operation_time)
        end

        it 'does not close the session when the operation completes' do
          expect(session.ended?).to be(false)
        end
      end

      context 'when a session from another client is provided' do

        let(:session) do
          authorized_client_with_retry_writes.start_session
        end

        let(:operation_result) do
          operation
        end

        it 'raises an exception' do
          expect {
            operation_result
          }.to raise_exception(Mongo::Error::InvalidSession)
        end
      end

      context 'when the session is ended before it is used' do

        let(:session) do
          client.start_session
        end

        before do
          session.end_session
        end

        let(:operation_result) do
          operation
        end

        it 'raises an exception' do
          expect {
            operation_result
          }.to raise_exception(Mongo::Error::InvalidSession)
        end
      end
    end
  end

  describe '#close' do

    context 'when documents have not been retrieved and the stream is closed' do

      before do
        expect(cursor).to receive(:kill_cursors)
        change_stream.close
      end

      it 'closes the cursor' do
        expect(change_stream.instance_variable_get(:@cursor)).to be(nil)
        expect(change_stream.closed?).to be(true)
      end

      it 'raises an error when the stream is attempted to be iterated' do
        expect {
          change_stream.to_enum.next
        }.to raise_exception(StopIteration)
      end
    end

    context 'when some documents have been retrieved and the stream is closed before sending getmore' do

      before do
        change_stream
        authorized_collection.insert_one(a: 1)
        enum.next
        change_stream.close
      end

      let(:enum) do
        change_stream.to_enum
      end

      it 'raises an error' do
        expect {
          enum.next
        }.to raise_exception(StopIteration)
      end
    end
  end

  describe '#closed?' do

    context 'when the change stream has not been closed' do

      it 'returns false' do
        expect(change_stream.closed?).to be(false)
      end
    end

    context 'when the change stream has been closed' do

      before do
        change_stream.close
      end

      it 'returns false' do
        expect(change_stream.closed?).to be(true)
      end
    end
  end

  context 'when the first response does not contain the resume token' do

    let(:pipeline) do
      [{ '$project' => { _id: 0 } }]
    end

    before do
      change_stream
      authorized_collection.insert_one(a: 1)
    end

    it 'raises an exception and closes the cursor' do
      expect(cursor).to receive(:kill_cursors).and_call_original
      expect {
        change_stream.to_enum.next
      }.to raise_exception(Mongo::Error::MissingResumeToken)
    end
  end

  context 'when an error is encountered the first time the command is run' do

    let(:primary_socket) do
      primary = authorized_collection.client.cluster.servers.find { |s| s.primary? }
      connection = primary.pool.checkout
      primary.pool.checkin(connection)
      connection.send(:socket)
    end

    context 'when the error is a resumable error' do

      shared_examples_for 'a resumable change stream' do

        before do
          expect(primary_socket).to receive(:write).and_raise(error).once
          expect(view.send(:server_selector)).to receive(:select_server).twice.and_call_original
          change_stream
          authorized_collection.insert_one(a: 1)
        end

        let(:document) do
          change_stream.to_enum.next
        end

        it 'runs the command again while using the same read preference and caches the resume token' do
          expect(document[:fullDocument][:a]).to eq(1)
          expect(change_stream_document[:resumeAfter]).to eq(document[:_id])
        end

        context 'when provided a session' do

          let(:options) do
            { session: session}
          end

          let(:session) do
            authorized_client.start_session
          end

          before do
            change_stream.to_enum.next
          end

          it 'does not close the session' do
            expect(session.ended?).to be(false)
          end
        end
      end

      context 'when the error is a SocketError' do

        let(:error) do
          Mongo::Error::SocketError
        end

        it_behaves_like 'a resumable change stream'
      end

      context 'when the error is a SocketTimeoutError' do

        let(:error) do
          Mongo::Error::SocketTimeoutError
        end

        it_behaves_like 'a resumable change stream'
      end

      context "when the error is a 'not master' error" do

        let(:error) do
          Mongo::Error::OperationFailure.new('not master')
        end

        it_behaves_like 'a resumable change stream'
      end

      context "when the error is a 'cursor not found (43)' error" do

        let(:error) do
          Mongo::Error::OperationFailure.new('cursor not found (43)')
        end

        it_behaves_like 'a resumable change stream'
      end
    end

    context 'when the error is another server error' do

      before do
        expect(primary_socket).to receive(:write).and_raise(Mongo::Error::OperationFailure)
        #expect twice because of kill_cursors in after block
        expect(view.send(:server_selector)).to receive(:select_server).twice.and_call_original
      end

      it 'does not run the command again and instead raises the error' do
        expect {
          change_stream
        }.to raise_exception(Mongo::Error::OperationFailure)
      end

      context 'when provided a session' do

        let(:options) do
          { session: session}
        end

        let(:session) do
          authorized_client.start_session
        end

        before do
          begin; change_stream; rescue; end
        end

        it 'does not close the session' do
          expect(session.ended?).to be(false)
        end
      end
    end
  end

  context 'when a server error is encountered during a getmore' do

    context 'when the error is a resumable error' do

      shared_examples_for 'a change stream that encounters an error from a getmore' do

        before do
          change_stream
          authorized_collection.insert_one(a: 1)
          enum.next
          authorized_collection.insert_one(a: 2)
          expect(cursor).to receive(:get_more).once.and_raise(error)
          expect(cursor).to receive(:kill_cursors).and_call_original
          expect(Mongo::Operation::Commands::Aggregate).to receive(:new).and_call_original
        end

        let(:enum) do
          change_stream.to_enum
        end

        let(:document) do
          enum.next
        end

        it 'runs the command again while using the same read preference and caching the resume token' do
          expect(document[:fullDocument][:a]).to eq(2)
          expect(change_stream_document[:resumeAfter]).to eq(document[:_id])
        end

        context 'when provided a session' do

          let(:options) do
            { session: session}
          end

          let(:session) do
            authorized_client.start_session
          end

          before do
            enum.next
          end

          it 'does not close the session' do
            expect(session.ended?).to be(false)
          end
        end
      end

      context 'when the error is a SocketError' do

        let(:error) do
          Mongo::Error::SocketError
        end

        it_behaves_like 'a change stream that encounters an error from a getmore'
      end

      context 'when the error is a SocketTimeoutError' do

        let(:error) do
          Mongo::Error::SocketTimeoutError
        end

        it_behaves_like 'a change stream that encounters an error from a getmore'
      end

      context "when the error is a not 'master error'" do

        let(:error) do
          Mongo::Error::OperationFailure.new('not master')
        end

        it_behaves_like 'a change stream that encounters an error from a getmore'
      end

      context "when the error is a not 'cursor not found error'" do

        let(:error) do
          Mongo::Error::OperationFailure.new('cursor not found (43)')
        end

        it_behaves_like 'a change stream that encounters an error from a getmore'
      end
    end

    context 'when the error is another server error' do

      before do
        change_stream
        authorized_collection.insert_one(a: 1)
        enum.next
        authorized_collection.insert_one(a: 2)
        expect(cursor).to receive(:get_more).and_raise(Mongo::Error::OperationFailure)
        expect(cursor).to receive(:kill_cursors).and_call_original
        expect(Mongo::Operation::Commands::Aggregate).not_to receive(:new)
      end

      let(:enum) do
        change_stream.to_enum
      end

      it 'does not run the command again and instead raises the error' do
        expect {
          enum.next
        }.to raise_exception(Mongo::Error::OperationFailure)
      end

      context 'when provided a session' do

        let(:options) do
          { session: session}
        end

        let(:session) do
          authorized_client.start_session
        end

        before do
          begin; enum.next; rescue; end
        end

        it 'does not close the session' do
          expect(session.ended?).to be(false)
        end
      end
    end
  end

  context 'when a server error is encountered during the command following an error during getmore' do

    context 'when the error is a resumable error' do

      shared_examples_for 'a change stream that sent getmores, that then encounters an error when resuming' do

        before do
          change_stream
          authorized_collection.insert_one(a: 1)
          enum.next
          authorized_collection.insert_one(a: 2)
          expect(cursor).to receive(:get_more).and_raise(error)
          expect(cursor).to receive(:kill_cursors).and_call_original
          expect(change_stream).to receive(:send_initial_query).and_raise(error).once.ordered
        end

        let(:enum) do
          change_stream.to_enum
        end

        let(:document) do
          enum.next
        end

        it 'raises the error' do
          expect {
            document
          }.to raise_exception(error)
        end

        context 'when provided a session' do

          let(:options) do
            { session: session}
          end

          let(:session) do
            authorized_client.start_session
          end

          before do
            begin; document; rescue; end
          end

          it 'does not close the session' do
            expect(session.ended?).to be(false)
          end
        end
      end

      context 'when the error is a SocketError' do

        let(:error) do
          Mongo::Error::SocketError
        end

        it_behaves_like 'a change stream that sent getmores, that then encounters an error when resuming'
      end

      context 'when the error is a SocketTimeoutError' do

        let(:error) do
          Mongo::Error::SocketTimeoutError
        end

        it_behaves_like 'a change stream that sent getmores, that then encounters an error when resuming'
      end

      context "when the error is a 'not master error'" do

        let(:error) do
          Mongo::Error::OperationFailure.new('not master')
        end

        it_behaves_like 'a change stream that sent getmores, that then encounters an error when resuming'
      end

      context "when the error is a not 'cursor not found error'" do

        let(:error) do
          Mongo::Error::OperationFailure.new('cursor not found (43)')
        end

        it_behaves_like 'a change stream that sent getmores, that then encounters an error when resuming'
      end
    end

    context 'when the error is another server error' do

      before do
        change_stream
        authorized_collection.insert_one(a: 1)
        enum.next
        authorized_collection.insert_one(a: 2)
        expect(cursor).to receive(:get_more).and_raise(Mongo::Error::OperationFailure.new('not master'))
        expect(cursor).to receive(:kill_cursors).and_call_original
        expect(change_stream).to receive(:send_initial_query).and_raise(Mongo::Error::OperationFailure).once.ordered
      end

      let(:enum) do
        change_stream.to_enum
      end

      it 'does not run the command again and instead raises the error' do
        expect {
          enum.next
        }.to raise_exception(Mongo::Error::OperationFailure)
      end

      context 'when provided a session' do

        let(:options) do
          { session: session}
        end

        let(:session) do
          authorized_client.start_session
        end

        before do
          begin; enum.next; rescue; end
        end

        it 'does not close the session' do
          expect(session.ended?).to be(false)
        end
      end
    end
  end

  describe '#inspect' do

    it 'includes the Ruby object_id in the formatted string' do
      expect(change_stream.inspect).to include(change_stream.object_id.to_s)
    end

    context 'when resume_after is provided' do

      let(:options) do
        { resume_after: sample_resume_token }
      end

      it 'includes resume_after value in the formatted string' do
        expect(change_stream.inspect).to include(sample_resume_token.to_s)
      end
    end

    context 'when max_await_time_ms is provided' do

      let(:options) do
        { max_await_time_ms: 10 }
      end

      it 'includes the max_await_time value in the formatted string' do
        expect(change_stream.inspect).to include({ max_await_time_ms: 10 }.to_s)
      end
    end

    context 'when batch_size is provided' do

      let(:options) do
        { batch_size: 5 }
      end

      it 'includes the batch_size value in the formatted string' do
        expect(change_stream.inspect).to include({ batch_size: 5 }.to_s)
      end
    end

    context 'when collation is provided'  do

      let(:options) do
        { 'collation' => { locale: 'en_US', strength: 2 } }
      end

      it 'includes the collation value in the formatted string' do
        expect(change_stream.inspect).to include({ 'collation' => { locale: 'en_US', strength: 2 } }.to_s)
      end
    end

    context 'when pipeline operators are provided' do

      let(:pipeline) do
        [{ '$project' => { '_id' => 0 }}]
      end

      it 'includes the filters in the formatted string' do
        expect(change_stream.inspect).to include([{ '$project' => { '_id' => 0 }}].to_s)
      end
    end
  end
end
