# frozen_string_literal: true
# rubocop:todo all

require 'runners/crud/requirement'
require 'runners/unified/ambiguous_operations'
require 'runners/unified/client_side_encryption_operations'
require 'runners/unified/crud_operations'
require 'runners/unified/grid_fs_operations'
require 'runners/unified/ddl_operations'
require 'runners/unified/change_stream_operations'
require 'runners/unified/support_operations'
require 'runners/unified/thread_operations'
require 'runners/unified/search_index_operations'
require 'runners/unified/assertions'
require 'support/utils'
require 'support/crypt'

module Unified

  class Test
    include AmbiguousOperations
    include ClientSideEncryptionOperations
    include CrudOperations
    include GridFsOperations
    include DdlOperations
    include ChangeStreamOperations
    include SupportOperations
    include ThreadOperations
    include SearchIndexOperations
    include Assertions
    include RSpec::Core::Pending

    def initialize(spec, **opts)
      @spec = spec
      @entities = EntityMap.new
      @test_spec = UsingHash[@spec.fetch('test')]
      @description = @test_spec.use('description')
      @outcome = @test_spec.use('outcome')
      @expected_events = @test_spec.use('expectEvents')
      @skip_reason = @test_spec.use('skipReason')
      if req = @test_spec.use('runOnRequirements')
        @reqs = req.map { |r| Mongo::CRUD::Requirement.new(r) }
      end
      if req = @spec['group_runOnRequirements']
        @group_reqs = req.map { |r| Mongo::CRUD::Requirement.new(r) }
      end
      if @spec['createEntities']
        mongoses = @spec['createEntities'].select do |spec|
          spec['client']
        end.map do |spec|
          spec['client']['useMultipleMongoses']
        end.compact.uniq
        @multiple_mongoses = mongoses.any? { |v| v }
      end
      @test_spec.freeze
      @subscribers = {}
      @observe_sensitive = {}
      @options = opts
    end

    attr_reader :test_spec
    attr_reader :description
    attr_reader :outcome
    attr_reader :skip_reason
    attr_reader :reqs, :group_reqs
    attr_reader :options

    def retry?
      @description =~ /KMS/i
    end

    def skip?
      !!@skip_reason
    end

    def require_multiple_mongoses?
      @multiple_mongoses == true
    end

    def require_single_mongos?
      @multiple_mongoses == false
    end

    attr_reader :entities

    def create_spec_entities
      return if @entities_created
      generate_entities(@spec['createEntities'])
    end

    def generate_entities(es)
      return if es.nil?

      es.each do |entity_spec|
        unless entity_spec.keys.length == 1
          raise NotImplementedError, "Entity must have exactly one key"
        end

        type, spec = entity_spec.first
        spec = UsingHash[spec]
        id = spec.use!('id')

        entity = case type
        when 'client'
          if smc_opts = spec.use('uriOptions')
            opts = Mongo::URI::OptionsMapper.new.smc_to_ruby(smc_opts)
          else
            opts = {}
          end

          # max_pool_size gets automatically set to 3 if not explicitly set by
          # the test, therefore, if min_pool_size is set, make sure to set the
          # max_pool_size as well to something greater.
          if !opts.key?('max_pool_size') && min_pool_size = opts[:min_pool_size]
            opts[:max_pool_size] = min_pool_size + 3
          end

          if spec.use('useMultipleMongoses')
            if ClusterConfig.instance.topology == :sharded
              unless SpecConfig.instance.addresses.length > 1
                raise "useMultipleMongoses requires more than one address in MONGODB_URI"
              end
            end
          else
            # If useMultipleMongoses isn't true, truncate the address
            # list to the first address.
            # This works OK in replica sets because the driver will discover
            # the other set members, in standalone deployments because
            # there is only one server, but changes behavior in
            # sharded clusters compared to how the test suite is configured.
            options[:single_address] = true
          end

          if store_events = spec.use('storeEventsAsEntities')
            store_event_names = {}
            store_events.each do |spec|
              entity_name = spec['id']
              event_names = spec['events']
              event_names.each do |event_name|
                store_event_names[event_name] = entity_name
              end
            end
            store_event_names.values.uniq.each do |entity_name|
              entities.set(:event_list, entity_name, [])
            end
            subscriber = StoringEventSubscriber.new do |payload|
              if entity_name = store_event_names[payload['name']]
                entities.get(:event_list, entity_name) << payload
              end
            end
            opts[:sdam_proc] = lambda do |client|
              client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
              client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber)
            end
          end

          if server_api = spec.use('serverApi')
            server_api = ::Utils.underscore_hash(server_api)
            opts[:server_api] = server_api
          end

          observe_events = spec.use('observeEvents')
          subscriber = EventSubscriber.new
          current_proc = opts[:sdam_proc]
          opts[:sdam_proc] = lambda do |client|
            current_proc.call(client) if current_proc
            if oe = observe_events
              oe.each do |event|
                case event
                when 'commandStartedEvent', 'commandSucceededEvent', 'commandFailedEvent'
                  unless client.send(:monitoring).subscribers[Mongo::Monitoring::COMMAND].include?(subscriber)
                    client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
                  end
                  kind = event.sub('command', '').sub('Event', '').downcase.to_sym
                  subscriber.add_wanted_events(kind)
                  if ignore_events = spec.use('ignoreCommandMonitoringEvents')
                    subscriber.ignore_commands(ignore_events)
                  end
                when /\A(?:pool|connection)/
                  unless client.send(:monitoring).subscribers[Mongo::Monitoring::CONNECTION_POOL]&.include?(subscriber)
                    client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber)
                  end
                  kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.sub('pool', 'Pool').downcase.to_sym
                  subscriber.add_wanted_events(kind)
                when 'serverDescriptionChangedEvent'
                  unless client.send(:monitoring).subscribers[Mongo::Monitoring::SERVER_DESCRIPTION_CHANGED]&.include?(subscriber)
                    client.subscribe(Mongo::Monitoring::SERVER_DESCRIPTION_CHANGED, subscriber)
                  end
                  kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym
                  subscriber.add_wanted_events(kind)
                else
                  raise NotImplementedError, "Unknown event #{event}"
                end
              end
            end
          end

          create_client(**opts).tap do |client|
            @observe_sensitive[id] = spec.use('observeSensitiveCommands')
            @subscribers[client] ||= subscriber
          end
        when 'database'
          client = entities.get(:client, spec.use!('client'))
          opts = Utils.snakeize_hash(spec.use('databaseOptions') || {})
            .merge(database: spec.use!('databaseName'))
          if opts.key?(:read_preference)
            opts[:read] = opts.delete(:read_preference)
            if opts[:read].key?(:max_staleness_seconds)
              opts[:read][:max_staleness] = opts[:read].delete(:max_staleness_seconds)
            end
          end
          client.with(opts).database
        when 'collection'
          database = entities.get(:database, spec.use!('database'))
          # TODO verify
          opts = Utils.snakeize_hash(spec.use('collectionOptions') || {})
          if opts.key?(:read_preference)
            opts[:read] = opts.delete(:read_preference)
            if opts[:read].key?(:max_staleness_seconds)
              opts[:read][:max_staleness] = opts[:read].delete(:max_staleness_seconds)
            end
          end
          database[spec.use!('collectionName'), opts]
        when 'bucket'
          database = entities.get(:database, spec.use!('database'))
          database.fs
        when 'session'
          client = entities.get(:client, spec.use!('client'))

          if smc_opts = spec.use('sessionOptions')
            opts = ::Utils.underscore_hash(smc_opts)
          else
            opts = {}
          end

          client.start_session(**opts).tap do |session|
            session.advance_cluster_time(@cluster_time)
          end
        when 'clientEncryption'
          client_encryption_opts = spec.use!('clientEncryptionOpts')
          key_vault_client = entities.get(:client, client_encryption_opts['keyVaultClient'])
          opts = {
            key_vault_namespace: client_encryption_opts['keyVaultNamespace'],
            kms_providers: Utils.snakeize_hash(client_encryption_opts['kmsProviders']),
            kms_tls_options: {
              kmip: {
                ssl_cert: SpecConfig.instance.fle_kmip_tls_certificate_key_file,
                ssl_key: SpecConfig.instance.fle_kmip_tls_certificate_key_file,
                ssl_ca_cert: SpecConfig.instance.fle_kmip_tls_ca_file
              }
            }
          }
          opts[:kms_providers] = opts[:kms_providers].map do |provider, options|
            converted_options = options.map do |key, value|
              converted_value = if value == { '$$placeholder'.to_sym => 1 }
                case provider
                when :aws
                  case key
                  when :access_key_id then SpecConfig.instance.fle_aws_key
                  when :secret_access_key then SpecConfig.instance.fle_aws_secret
                  end
                when :azure
                  case key
                  when :tenant_id then SpecConfig.instance.fle_azure_tenant_id
                  when :client_id then SpecConfig.instance.fle_azure_client_id
                  when :client_secret then SpecConfig.instance.fle_azure_client_secret
                  end
                when :gcp
                  case key
                  when :email then SpecConfig.instance.fle_gcp_email
                  when :private_key then SpecConfig.instance.fle_gcp_private_key
                  end
                when :kmip
                  case key
                  when :endpoint then SpecConfig.instance.fle_kmip_endpoint
                  end
                when :local
                  case key
                  when :key then Crypt::LOCAL_MASTER_KEY
                  end
                end
              else
                value
              end
              [key, converted_value]
            end.to_h
            [provider, converted_options]
          end.to_h

          Mongo::ClientEncryption.new(
            key_vault_client,
            opts
          )
        when 'thread'
          thread_context = ThreadContext.new
          thread = Thread.new do
            loop do
              begin
                op_spec = thread_context.operations.pop(true)
                execute_operation(op_spec)
              rescue ThreadError
                # Queue is empty
              end
              if thread_context.stop?
                break
              else
                sleep 0.1
              end
            end
          end
          class << thread
            attr_accessor :context
          end
          thread.context = thread_context
          thread
        else
          raise NotImplementedError, "Unknown type #{type}"
        end
        unless spec.empty?
          raise NotImplementedError, "Unhandled spec keys: #{spec}"
        end
        entities.set(type.to_sym, id, entity)
      end
      @entities_created = true
    end

    def set_initial_data
      @spec['initialData']&.each do |entity_spec|
        spec = UsingHash[entity_spec]
        collection = root_authorized_client.with(write_concern: {w: :majority}).
          use(spec.use!('databaseName'))[spec.use!('collectionName')]
        collection.drop
        create_options = spec.use('createOptions') || {}
        docs = spec.use!('documents')
        begin
          collection.create(create_options)
        rescue Mongo::Error => e
          if Mongo::Error::OperationFailure::Family === e && (
              e.code == 48 || e.message =~ /collection already exists/
          )
            # Already exists
          else
            raise
          end
        end
        if docs.any?
          collection.insert_many(docs)
        end
        unless spec.empty?
          raise NotImplementedError, "Unhandled spec keys: #{spec}"
        end
      end

      # the cluster time is used to advance the cluster time of any
      # sessions created during this test.
      # -> see DRIVERS-2816
      @cluster_time = root_authorized_client.command(ping: 1).cluster_time
    end

    def run
      kill_sessions

      test_spec = UsingHash[self.test_spec]
      ops = test_spec.use!('operations')
      execute_operations(ops)
      unless test_spec.empty?
        raise NotImplementedError, "Unhandled spec keys: #{test_spec}"
      end
    ensure
      disable_fail_points
    end

    def stop!
      @stop = true
    end

    def stop?
      !!@stop
    end

    def cleanup
      if $kill_transactions || true
        kill_sessions
        $kill_transactions = nil
      end

      entities[:client]&.each do |id, client|
        client.close
      end
    end

    private

    def execute_operations(ops)
      ops.each do |op|
        execute_operation(op)
      end
    end

    def execute_operation(op)
      use_all(op, 'operation', op) do |op|
        name = Utils.underscore(op.use!('name'))
        method_name = name
        if name.to_s == 'loop'
          method_name = "_#{name}"
        end

        if ["modify_collection", "list_index_names"].include?(name.to_s)
          skip "Mongo Ruby Driver does not support #{name.to_s}"
        end

        if expected_error = op.use('expectError')
          begin
            unless respond_to?(method_name)
              raise Error::UnsupportedOperation, "Mongo Ruby Driver does not support #{name.to_s}"
            end

            public_send(method_name, op)
          rescue Mongo::Error, bson_error, Mongo::Auth::Unauthorized, ArgumentError => e
            if expected_error.use('isTimeoutError')
              unless Mongo::Error::TimeoutError === e
                raise e
                raise Error::ErrorMismatch, %Q,Expected TimeoutError ("isTimeoutError") but got #{e},
              end
            end
            if expected_error.use('isClientError')
              # isClientError doesn't actually mean a client error.
              # It means anything other than OperationFailure. DRIVERS-1799
              if Mongo::Error::OperationFailure::Family === e
                raise Error::ErrorMismatch, %Q,Expected not OperationFailure ("isClientError") but got #{e},
              end
            end
            if code = expected_error.use('errorCode')
              unless e.code == code
                raise Error::ErrorMismatch, "Expected #{code} code but had #{e.code}"
              end
            end
            if code_name = expected_error.use('errorCodeName')
              unless e.code_name == code_name
                raise Error::ErrorMismatch, "Expected #{code_name} code name but had #{e.code_name}"
              end
            end
            if text = expected_error.use('errorContains')
              unless e.to_s.include?(text)
                raise Error::ErrorMismatch, "Expected #{text} in the message but had #{e}"
              end
            end
            if labels = expected_error.use('errorLabelsContain')
              labels.each do |label|
                unless e.label?(label)
                  raise Error::ErrorMismatch, "Expected error to contain label #{label} but it did not"
                end
              end
            end
            if omit_labels = expected_error.use('errorLabelsOmit')
              omit_labels.each do |label|
                if e.label?(label)
                  raise Error::ErrorMismatch, "Expected error to not contain label #{label} but it did"
                end
              end
            end
            if error_response = expected_error.use("errorResponse")
              assert_result_matches(e.document, error_response)
            end
            if expected_result = expected_error.use('expectResult')
              assert_result_matches(e.result, expected_result)
            # Important: this must be the last branch.
            elsif expected_error.use('isError')
              # Nothing but we consume the key.
            end
            unless expected_error.empty?
              raise NotImplementedError, "Unhandled keys: #{expected_error}"
            end
          else
            raise Error::ErrorMismatch, "Expected exception but none was raised"
          end
        elsif op.use('ignoreResultAndError')
          unless respond_to?(method_name)
            raise Error::UnsupportedOperation, "Mongo Ruby Driver does not support #{name.to_s}"
          end

          begin
            send(method_name, op)
          # We can possibly rescue more errors here, add as needed.
          rescue Mongo::Error
          end
        else
          unless respond_to?(method_name, true)
            raise Error::UnsupportedOperation, "Mongo Ruby Driver does not support #{name.to_s}"
          end

          result = send(method_name, op)
          if expected_result = op.use('expectResult')
            if result.nil? && expected_result.keys == ["$$unsetOrMatches"]
              return
            elsif result.nil? && !expected_result.empty?
              raise Error::ResultMismatch, "expected #{expected_result} but got nil"
            elsif Array === expected_result
              assert_documents_match(result, expected_result)
            else
              assert_result_matches(result, expected_result)
            end
            #expected_result.clear
          end
          if save_entity = op.use('saveResultAsEntity')
            entities.set(:result, save_entity, result)
          end
        end
      end
    end

    def use_sub(hash, key, &block)
      v = hash.use!(key)
      use_all(hash, key, v, &block)
    end

    def use_all(hash, key, v)
      orig_v = v.dup
      (yield v).tap do
        unless v.empty?
          raise NotImplementedError, "Unconsumed items for #{key}: #{v}\nOriginal hash: #{orig_v}"
        end
      end
    end

    def use_arguments(op, &block)
      if op.key?('arguments')
        use_sub(op, 'arguments', &block)
      else
        yield UsingHash.new
      end
    end

    def disable_fail_points
      if $disable_fail_points
        $disable_fail_points.each do |(fail_point_command, address)|
          client = ClusterTools.instance.direct_client(address,
            database: 'admin')
          client.command(configureFailPoint: fail_point_command['configureFailPoint'],
            mode: 'off')
        end
        $disable_fail_points = nil
      end
    end

    def kill_sessions
      begin
        root_authorized_client.command(
          killAllSessions: [],
        )
      rescue Mongo::Error::OperationFailure::Family => e
        if e.code == 11601
          # operation was interrupted, ignore. SERVER-38335
        elsif e.code == 13
          # Unauthorized - e.g. when running in Atlas as part of
          # drivers-atlas-testing, ignore. SERVER-54216
        elsif e.code == 59
          # no such command (old server), ignore
        elsif e.code == 8000
          # CMD_NOT_ALLOWED: killAllSessions - running against a serverless instance
        else
          raise
        end
      end
    end

    def root_authorized_client
      @root_authorized_client ||= ClientRegistry.instance.global_client('root_authorized')
    end

    def create_client(**opts)
      args = case v = options[:client_args]
      when Array
        unless v.length == 2
          raise NotImplementedError, 'Client args array must have two elements'
        end
        [v.first, v.last.dup]
      when String
        [v, {}]
      else
        addresses = SpecConfig.instance.addresses
        if options[:single_address]
          addresses = [addresses.first]
        end
        [
          addresses,
          SpecConfig.instance.all_test_options,
        ]
      end
      args.last.update(
        max_read_retries: 0,
        max_write_retries: 0,
      ).update(opts)
      Mongo::Client.new(*args)
    end

    # The error to rescue BSON tests for. If we still define
    # BSON::String::IllegalKey then we should rescue that particular error,
    # otherwise, rescue an arbitrary BSON::Error
    def bson_error
      BSON::String.const_defined?(:IllegalKey) ?
        BSON::String.const_get(:IllegalKey) :
        BSON::Error
    end
  end
end
