# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'runners/crud/operation'
require 'runners/crud/test_base'
require 'runners/change_streams/outcome'

module Mongo
  module ChangeStreams

    class ChangeStreamsTest < Mongo::CRUD::CRUDTestBase

      def initialize(crud_spec, test, collection_name, collection2_name, database_name, database2_name)
        @spec = crud_spec
        @description = test['description']

        @fail_point_command = test['failPoint']

        @min_server_version = test['minServerVersion']
        @max_server_version = test['maxServerVersion']
        @target_type = test['target']
        @topologies = test['topology'].map do |topology|
          {'single' => :single, 'replicaset' => :replica_set, 'sharded' => :sharded}[topology]
        end
        @pipeline = test['changeStreamPipeline'] || []
        @options = test['changeStreamOptions'] || {}

        @operations = test['operations'].map do |op|
          Mongo::CRUD::Operation.new(self, op)
        end

        @expectations = test['expectations'] &&
          BSON::ExtJSON.parse_obj(test['expectations'], mode: :bson)

        @result = BSON::ExtJSON.parse_obj(test['result'], mode: :bson)
        @collection_name = collection_name
        @collection2_name = collection2_name
        @database_name = database_name
        @database2_name = database2_name

        @outcome = Outcome.new(test.fetch('result'))
      end

      attr_reader :topologies

      attr_reader :outcome

      attr_reader :result

      def setup_test
        clear_fail_point(global_client)

        @database = global_client.use(@database_name).database.tap(&:drop)
        if @database2_name
          @database2 = global_client.use(@database2_name).database.tap(&:drop)
        end

        # Work around https://jira.mongodb.org/browse/SERVER-17397
        if ClusterConfig.instance.server_version < '4.4' &&
          global_client.cluster.servers.length > 1
        then
          ::Utils.mongos_each_direct_client do |client|
            client.database.command(flushRouterConfig: 1)
          end
        end

        @database[@collection_name].create
        if @collection2_name
          @database2[@collection2_name].create
        end

        client = ClientRegistry.instance.global_client('root_authorized').with(
          database: @database_name,
          app_name: 'this is used solely to force the new client to create its own cluster')

        setup_fail_point(client)

        @subscriber = Mrss::EventSubscriber.new
        client.subscribe(Mongo::Monitoring::COMMAND, @subscriber)

        @target = case @target_type
                 when 'client'
                   client
                 when 'database'
                   client.database
                 when 'collection'
                   client[@collection_name]
                 end
      end

      def teardown_test
        if @fail_point_command
          clear_fail_point(global_client)
        end
      end

      def run
        change_stream = begin
          @target.watch(@pipeline, ::Utils.snakeize_hash(@options))
        rescue Mongo::Error::OperationFailure::Family => e
          return {
            result: {
              error: {
                code: e.code,
                labels: e.labels,
              },
            },
            events: events,
          }
        end

        # JRuby must iterate the same object, not switch from
        # enum to change stream
        enum = change_stream.to_enum

        @operations.each do |op|
          db = case op.spec['database']
            when @database_name
              @database
            when @database2_name
              @database2
            else
              raise "Unknown database name #{op.spec['database']}"
            end
          collection = db[op.spec['collection']]
          op.execute(collection)
        end

        changes = []

        # attempt first next call (catch NonResumableChangeStreamError errors)
        begin
          change = enum.next
          changes << change
        rescue Mongo::Error::OperationFailure::Family => e
          return {
            result: {
              error: {
                code: e.code,
                labels: e.labels,
              },
            },
            events: events,
          }
        end

        # continue until changeStream has received as many changes as there
        # are in result.success
        if @result['success'] && changes.length < @result['success'].length
          while changes.length < @result['success'].length
            changes << enum.next
          end
        end

        change_stream.close

        {
          result: { 'success' => changes },
          events: events,
        }
      end

      def server_version_satisfied?(client)
        lower_bound_satisfied?(client) && upper_bound_satisfied?(client)
      end

      private

      IGNORE_COMMANDS = %w(saslStart saslContinue killCursors)

      def global_client
        @global_client ||= ClientRegistry.instance.global_client('root_authorized').use('admin')
      end

      def events
        @subscriber.started_events.reduce([]) do |evs, e|
          next evs if IGNORE_COMMANDS.include?(e.command_name)

          command = e.command.dup
          if command['aggregate'] && command['pipeline']
            command['pipeline'] = command['pipeline'].map do |stage|
              if stage['$changeStream']
                cs = stage['$changeStream'].dup
                cs.delete('resumeAfter')
                stage.merge('$changeStream' => cs)
              else
                stage
              end
            end
          end

          evs << {
            'command_started_event' => {
              'command' => command,
              'command_name' => e.command_name.to_s,
              'database_name' => e.database_name,
            }
          }
        end
      end

      def server_version(client)
        @server_version ||= client.database.command(buildInfo: 1).first['version']
      end

      def upper_bound_satisfied?(client)
        return true unless @max_server_version
        ClusterConfig.instance.server_version <= @max_server_version
      end

      def lower_bound_satisfied?(client)
        return true unless @min_server_version
        #@min_server_version <= server_version(client)
        @min_server_version <= ClusterConfig.instance.fcv_ish
      end
    end
  end
end
