require 'thread'
require 'concurrent/delay'
require 'concurrent/errors'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/concern/logging'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/immediate_executor'
require 'concurrent/executor/cached_thread_pool'
require 'concurrent/utility/processor_counter'

module Concurrent
  extend Concern::Logging
  extend Concern::Deprecation

  autoload :Options, 'concurrent/options'
  autoload :TimerSet, 'concurrent/executor/timer_set'
  autoload :ThreadPoolExecutor, 'concurrent/executor/thread_pool_executor'

  # @return [Logger] Logger with provided level and output.
  def self.create_simple_logger(level = Logger::FATAL, output = $stderr)
    # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking
    lambda do |severity, progname, message = nil, &block|
      return false if severity < level

      message           = block ? block.call : message
      formatted_message = case message
                          when String
                            message
                          when Exception
                            format "%s (%s)\n%s",
                                   message.message, message.class, (message.backtrace || []).join("\n")
                          else
                            message.inspect
                          end

      output.print format "[%s] %5s -- %s: %s\n",
                          Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),
                          Logger::SEV_LABEL[severity],
                          progname,
                          formatted_message
      true
    end
  end

  # Use logger created by #create_simple_logger to log concurrent-ruby messages.
  def self.use_simple_logger(level = Logger::FATAL, output = $stderr)
    Concurrent.global_logger = create_simple_logger level, output
  end

  # @return [Logger] Logger with provided level and output.
  # @deprecated
  def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
    logger           = Logger.new(output)
    logger.level     = level
    logger.formatter = lambda do |severity, datetime, progname, msg|
      formatted_message = case msg
                          when String
                            msg
                          when Exception
                            format "%s (%s)\n%s",
                                   msg.message, msg.class, (msg.backtrace || []).join("\n")
                          else
                            msg.inspect
                          end
      format "[%s] %5s -- %s: %s\n",
             datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
             severity,
             progname,
             formatted_message
    end

    lambda do |loglevel, progname, message = nil, &block|
      logger.add loglevel, message, progname, &block
    end
  end

  # Use logger created by #create_stdlib_logger to log concurrent-ruby messages.
  # @deprecated
  def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr)
    Concurrent.global_logger = create_stdlib_logger level, output
  end

  # TODO (pitr-ch 27-Dec-2016): remove deadlocking stdlib_logger methods

  # Suppresses all output when used for logging.
  NULL_LOGGER   = lambda { |level, progname, message = nil, &block| }

  # @!visibility private
  GLOBAL_LOGGER = AtomicReference.new(create_simple_logger(Logger::WARN))
  private_constant :GLOBAL_LOGGER

  def self.global_logger
    GLOBAL_LOGGER.value
  end

  def self.global_logger=(value)
    GLOBAL_LOGGER.value = value
  end

  # @!visibility private
  GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor }
  private_constant :GLOBAL_FAST_EXECUTOR

  # @!visibility private
  GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor }
  private_constant :GLOBAL_IO_EXECUTOR

  # @!visibility private
  GLOBAL_TIMER_SET = Delay.new { TimerSet.new }
  private_constant :GLOBAL_TIMER_SET

  # @!visibility private
  GLOBAL_IMMEDIATE_EXECUTOR = ImmediateExecutor.new
  private_constant :GLOBAL_IMMEDIATE_EXECUTOR

  # Disables AtExit handlers including pool auto-termination handlers.
  # When disabled it will be the application programmer's responsibility
  # to ensure that the handlers are shutdown properly prior to application
  # exit by calling `AtExit.run` method.
  #
  # @note this option should be needed only because of `at_exit` ordering
  #   issues which may arise when running some of the testing frameworks.
  #   E.g. Minitest's test-suite runs itself in `at_exit` callback which
  #   executes after the pools are already terminated. Then auto termination
  #   needs to be disabled and called manually after test-suite ends.
  # @note This method should *never* be called
  #   from within a gem. It should *only* be used from within the main
  #   application and even then it should be used only when necessary.
  # @deprecated Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
  #
  def self.disable_at_exit_handlers!
    deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
  end

  # Global thread pool optimized for short, fast *operations*.
  #
  # @return [ThreadPoolExecutor] the thread pool
  def self.global_fast_executor
    GLOBAL_FAST_EXECUTOR.value
  end

  # Global thread pool optimized for long, blocking (IO) *tasks*.
  #
  # @return [ThreadPoolExecutor] the thread pool
  def self.global_io_executor
    GLOBAL_IO_EXECUTOR.value
  end

  def self.global_immediate_executor
    GLOBAL_IMMEDIATE_EXECUTOR
  end

  # Global thread pool user for global *timers*.
  #
  # @return [Concurrent::TimerSet] the thread pool
  def self.global_timer_set
    GLOBAL_TIMER_SET.value
  end

  # General access point to global executors.
  # @param [Symbol, Executor] executor_identifier symbols:
  #   - :fast - {Concurrent.global_fast_executor}
  #   - :io - {Concurrent.global_io_executor}
  #   - :immediate - {Concurrent.global_immediate_executor}
  # @return [Executor]
  def self.executor(executor_identifier)
    Options.executor(executor_identifier)
  end

  def self.new_fast_executor(opts = {})
    FixedThreadPool.new(
        [2, Concurrent.processor_count].max,
        auto_terminate:  opts.fetch(:auto_terminate, true),
        idletime:        60, # 1 minute
        max_queue:       0, # unlimited
        fallback_policy: :abort, # shouldn't matter -- 0 max queue
        name:            "fast"
    )
  end

  def self.new_io_executor(opts = {})
    CachedThreadPool.new(
        auto_terminate:  opts.fetch(:auto_terminate, true),
        fallback_policy: :abort, # shouldn't matter -- 0 max queue
        name:            "io"
    )
  end
end
