require 'fugit'
require 'sidekiq'
require 'sidekiq/cron/support'
require 'sidekiq/options'

module Sidekiq
  module Cron
    class Job
      # How long we would like to store informations about previous enqueues.
      REMEMBER_THRESHOLD = 24 * 60 * 60

      # Time format for enqueued jobs.
      LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z'

      # Use the exists? method if we're on a newer version of Redis.
      REDIS_EXISTS_METHOD = Gem.loaded_specs['redis'].version < Gem::Version.new('4.2') ? :exists : :exists?

      # Crucial part of whole enqueuing job.
      def should_enque? time
        enqueue = false
        enqueue = Sidekiq.redis do |conn|
          status == "enabled" &&
            not_past_scheduled_time?(time) &&
            not_enqueued_after?(time) &&
            conn.zadd(job_enqueued_key, formated_enqueue_time(time), formated_last_time(time))
        end
        enqueue
      end

      # Remove previous information about run times,
      # this will clear Redis and make sure that Redis will not overflow with memory.
      def remove_previous_enques time
        Sidekiq.redis do |conn|
          conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
        end
      end

      # Test if job should be enqueued.
      def test_and_enque_for_time! time
        if should_enque?(time)
          enque!

          remove_previous_enques(time)
        end
      end

      # Enqueue cron job to queue.
      def enque! time = Time.now.utc
        @last_enqueue_time = time.strftime(LAST_ENQUEUE_TIME_FORMAT)

        klass_const =
            begin
              Sidekiq::Cron::Support.constantize(@klass.to_s)
            rescue NameError
              nil
            end

        jid =
          if klass_const
            if defined?(ActiveJob::Base) && klass_const < ActiveJob::Base
              enqueue_active_job(klass_const).try :provider_job_id
            else
              enqueue_sidekiq_worker(klass_const)
            end
          else
            if @active_job
              Sidekiq::Client.push(active_job_message)
            else
              Sidekiq::Client.push(sidekiq_worker_message)
            end
          end

        save_last_enqueue_time
        add_jid_history jid
        Sidekiq.logger.debug { "enqueued #{@name}: #{@message}" }
      end

      def is_active_job?
        @active_job || defined?(ActiveJob::Base) && Sidekiq::Cron::Support.constantize(@klass.to_s) < ActiveJob::Base
      rescue NameError
        false
      end

      def date_as_argument?
        !!@date_as_argument
      end

      def enqueue_args
        date_as_argument? ? @args + [Time.now.to_f] : @args
      end

      def enqueue_active_job(klass_const)
        klass_const.set(queue: @queue).perform_later(*enqueue_args)
      end

      def enqueue_sidekiq_worker(klass_const)
        klass_const.set(queue: queue_name_with_prefix).perform_async(*enqueue_args)
      end

      # Sidekiq worker message.
      def sidekiq_worker_message
        message = @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
        message["args"] = enqueue_args
        message
      end

      def queue_name_with_prefix
        return @queue unless is_active_job?

        if !"#{@active_job_queue_name_delimiter}".empty?
          queue_name_delimiter = @active_job_queue_name_delimiter
        elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_delimiter) && !ActiveJob::Base.queue_name_delimiter.empty?
          queue_name_delimiter = ActiveJob::Base.queue_name_delimiter
        else
          queue_name_delimiter = '_'
        end

        if !"#{@active_job_queue_name_prefix}".empty?
          queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
        elsif defined?(ActiveJob::Base) && defined?(ActiveJob::Base.queue_name_prefix) && !"#{ActiveJob::Base.queue_name_prefix}".empty?
          queue_name = "#{ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
        else
          queue_name = @queue
        end

        queue_name
      end

      # Active Job has different structure how it is loading data from Sidekiq
      # queue, it creates a wrapper around job.
      def active_job_message
        {
          'class'        => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
          'wrapped'      => @klass,
          'queue'        => @queue_name_with_prefix,
          'description'  => @description,
          'args'         => [{
            'job_class'  => @klass,
            'job_id'     => SecureRandom.uuid,
            'queue_name' => @queue_name_with_prefix,
            'arguments'  => enqueue_args
          }]
        }
      end

      # Load cron jobs from Hash.
      # Input structure should look like:
      # {
      #   'name_of_job' => {
      #     'class'       => 'MyClass',
      #     'cron'        => '1 * * * *',
      #     'args'        => '(OPTIONAL) [Array or Hash]',
      #     'description' => '(OPTIONAL) Description of job'
      #   },
      #   'My super iber cool job' => {
      #     'class' => 'SecondClass',
      #     'cron'  => '*/5 * * * *'
      #   }
      # }
      #
      def self.load_from_hash hash
        array = hash.inject([]) do |out,(key, job)|
          job['name'] = key
          out << job
        end
        load_from_array array
      end

      # Like #load_from_hash.
      # If exists old jobs in Redis but removed from args, destroy old jobs.
      def self.load_from_hash! hash
        destroy_removed_jobs(hash.keys)
        load_from_hash(hash)
      end

      # Load cron jobs from Array.
      # Input structure should look like:
      # [
      #   {
      #     'name'        => 'name_of_job',
      #     'class'       => 'MyClass',
      #     'cron'        => '1 * * * *',
      #     'args'        => '(OPTIONAL) [Array or Hash]',
      #     'description' => '(OPTIONAL) Description of job'
      #   },
      #   {
      #     'name'  => 'Cool Job for Second Class',
      #     'class' => 'SecondClass',
      #     'cron'  => '*/5 * * * *'
      #   }
      # ]
      #
      def self.load_from_array array
        errors = {}
        array.each do |job_data|
          job = new(job_data)
          errors[job.name] = job.errors unless job.save
        end
        errors
      end

      # Like #load_from_array.
      # If exists old jobs in Redis but removed from args, destroy old jobs.
      def self.load_from_array! array
        job_names = array.map { |job| job["name"] }
        destroy_removed_jobs(job_names)
        load_from_array(array)
      end

      # Get all cron jobs.
      def self.all
        job_hashes = nil
        Sidekiq.redis do |conn|
          set_members = conn.smembers(jobs_key)
          job_hashes = conn.pipelined do |pipeline|
            set_members.each do |key|
              pipeline.hgetall(key)
            end
          end
        end
        job_hashes.compact.reject(&:empty?).collect do |h|
          # No need to fetch missing args from Redis since we just got this hash from there
          Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
        end
      end

      def self.count
        out = 0
        Sidekiq.redis do |conn|
          out = conn.scard(jobs_key)
        end
        out
      end

      def self.find name
        # If name is hash try to get name from it.
        name = name[:name] || name['name'] if name.is_a?(Hash)

        output = nil
        Sidekiq.redis do |conn|
          if exists? name
            output = Job.new conn.hgetall( redis_key(name) )
          end
        end
        output if output && output.valid?
      end

      # Create new instance of cron job.
      def self.create hash
        new(hash).save
      end

      # Destroy job by name.
      def self.destroy name
        # If name is hash try to get name from it.
        name = name[:name] || name['name'] if name.is_a?(Hash)

        if job = find(name)
          job.destroy
        else
          false
        end
      end

      attr_accessor :name, :cron, :description, :klass, :args, :message
      attr_reader   :last_enqueue_time, :fetch_missing_args

      def initialize input_args = {}
        args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
        @fetch_missing_args = args.delete('fetch_missing_args')
        @fetch_missing_args = true if @fetch_missing_args.nil?

        @name = args["name"]
        @cron = args["cron"]
        @description = args["description"] if args["description"]

        # Get class from klass or class.
        @klass = args["klass"] || args["class"]

        # Set status of job.
        @status = args['status'] || status_from_redis

        # Set last enqueue time - from args or from existing job.
        if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
          @last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
        else
          @last_enqueue_time = last_enqueue_time_from_redis
        end

        # Get right arguments for job.
        @symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
        @args = args["args"].nil? ? [] : parse_args( args["args"] )

        @date_as_argument = args["date_as_argument"] == true || ("#{args["date_as_argument"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false

        @active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
        @active_job_queue_name_prefix = args["queue_name_prefix"]
        @active_job_queue_name_delimiter = args["queue_name_delimiter"]

        if args["message"]
          @message = args["message"]
          message_data = Sidekiq.load_json(@message) || {}
          @queue = message_data['queue'] || "default"
        elsif @klass
          message_data = {
            "class" => @klass.to_s,
            "args"  => @args,
          }

          # Get right data for message,
          # only if message wasn't specified before.
          klass_data = case @klass
            when Class
              @klass.get_sidekiq_options
            when String
              begin
                Sidekiq::Cron::Support.constantize(@klass).get_sidekiq_options
              rescue Exception => e
                # Unknown class
                {"queue"=>"default"}
              end
          end

          message_data = klass_data.merge(message_data)

          # Override queue if setted in config,
          # only if message is hash - can be string (dumped JSON).
          if args['queue']
            @queue = message_data['queue'] = args['queue']
          else
            @queue = message_data['queue'] || "default"
          end

          @message = message_data
        end

        @queue_name_with_prefix = queue_name_with_prefix
      end

      def status
        @status
      end

      def disable!
        @status = "disabled"
        save
      end

      def enable!
        @status = "enabled"
        save
      end

      def enabled?
        @status == "enabled"
      end

      def disabled?
        !enabled?
      end

      def pretty_message
        JSON.pretty_generate Sidekiq.load_json(message)
      rescue JSON::ParserError
        message
      end

      def status_from_redis
        out = "enabled"
        if fetch_missing_args
          Sidekiq.redis do |conn|
            status = conn.hget redis_key, "status"
            out = status if status
          end
        end
        out
      end

      def last_enqueue_time_from_redis
        out = nil
        if fetch_missing_args
          Sidekiq.redis do |conn|
            out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
          end
        end
        out
      end

      def jid_history_from_redis
        out =
          Sidekiq.redis do |conn|
            conn.lrange(jid_history_key, 0, -1) rescue nil
          end

        out && out.map do |jid_history_raw|
          Sidekiq.load_json jid_history_raw
        end
      end

      # Export job data to hash.
      def to_hash
        {
          name: @name,
          klass: @klass,
          cron: @cron,
          description: @description,
          args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
          date_as_argument: @date_as_argument,
          message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
          status: @status,
          active_job: @active_job,
          queue_name_prefix: @active_job_queue_name_prefix,
          queue_name_delimiter: @active_job_queue_name_delimiter,
          last_enqueue_time: @last_enqueue_time,
          symbolize_args: @symbolize_args,
        }
      end

      def errors
        @errors ||= []
      end

      def valid?
        # Clear previous errors.
        @errors = []

        errors << "'name' must be set" if @name.nil? || @name.size == 0
        if @cron.nil? || @cron.size == 0
          errors << "'cron' must be set"
        else
          begin
            c = Fugit.do_parse(@cron)

            # Since `Fugit.do_parse` might yield a Fugit::Duration or an EtOrbi::EoTime
            # https://github.com/floraison/fugit#fugitparses
            if c.is_a?(Fugit::Cron)
              @parsed_cron = c
            else
              errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}"
            end
          rescue => e
            errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
          end
        end

        errors << "'klass' (or class) must be set" unless klass_valid

        errors.empty?
      end

      def klass_valid
        case @klass
          when Class
            true
          when String
            @klass.size > 0
          else
        end
      end

      def save
        # If job is invalid, return false.
        return false unless valid?

        Sidekiq.redis do |conn|

          # Add to set of all jobs
          conn.sadd self.class.jobs_key, [redis_key]

          # Add informations for this job!
          conn.hmset redis_key, *hash_to_redis(to_hash)

          # Add information about last time! - don't enque right after scheduler poller starts!
          time = Time.now.utc
          conn.zadd(job_enqueued_key, time.to_f.to_s, formated_last_time(time).to_s) unless conn.public_send(REDIS_EXISTS_METHOD, job_enqueued_key)
        end
        Sidekiq.logger.info { "Cron Jobs - added job with name: #{@name}" }
      end

      def save_last_enqueue_time
        Sidekiq.redis do |conn|
          # Update last enqueue time.
          conn.hset redis_key, 'last_enqueue_time', @last_enqueue_time
        end
      end

      def add_jid_history(jid)
        jid_history = {
          jid: jid,
          enqueued: @last_enqueue_time
        }

        @history_size ||= (Sidekiq::Options[:cron_history_size] || 10).to_i - 1
        Sidekiq.redis do |conn|
          conn.lpush jid_history_key,
                     Sidekiq.dump_json(jid_history)
          # Keep only last 10 entries in a fifo manner.
          conn.ltrim jid_history_key, 0, @history_size
        end
      end

      def destroy
        Sidekiq.redis do |conn|
          # Delete from set.
          conn.srem self.class.jobs_key, [redis_key]

          # Delete runned timestamps.
          conn.del job_enqueued_key

          # Delete jid_history.
          conn.del jid_history_key

          # Delete main job.
          conn.del redis_key
        end

        Sidekiq.logger.info { "Cron Jobs - deleted job with name: #{@name}" }
      end

      # Remove all job from cron.
      def self.destroy_all!
        all.each do |job|
          job.destroy
        end
        Sidekiq.logger.info { "Cron Jobs - deleted all jobs" }
      end

      # Remove "removed jobs" between current jobs and new jobs
      def self.destroy_removed_jobs new_job_names
        current_job_names = Sidekiq::Cron::Job.all.map(&:name)
        removed_job_names = current_job_names - new_job_names
        removed_job_names.each { |j| Sidekiq::Cron::Job.destroy(j) }
        removed_job_names
      end

      # Parse cron specification '* * * * *' and returns
      # time when last run should be performed
      def last_time now = Time.now.utc
        parsed_cron.previous_time(now.utc).utc
      end

      def formated_enqueue_time now = Time.now.utc
        last_time(now).getutc.to_f.to_s
      end

      def formated_last_time now = Time.now.utc
        last_time(now).getutc.iso8601
      end

      def self.exists? name
        out = false
        Sidekiq.redis do |conn|
          out = conn.public_send(REDIS_EXISTS_METHOD, redis_key(name))
        end
        out
      end

      def exists?
        self.class.exists? @name
      end

      def sort_name
        "#{status == "enabled" ? 0 : 1}_#{name}".downcase
      end

      private

      def parsed_cron
        @parsed_cron ||= begin
                           c = Fugit.parse(@cron)

                           # Since `Fugit.parse` might yield a Fugit::Duration or an EtOrbi::EoTime
                           # https://github.com/floraison/fugit#fugitparses
                           if c.is_a?(Fugit::Cron)
                             c
                           else
                             errors << "'cron' -> #{@cron.inspect} -> not a cron but a #{c.class}"
                           end
                         rescue => e
                           errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
                         end
      end

      def not_enqueued_after?(time)
        @last_enqueue_time.nil? || @last_enqueue_time.to_i < last_time(time).to_i
      end

      # Try parsing inbound args into an array.
      # Args from Redis will be encoded JSON,
      # try to load JSON, then failover to string array.
      def parse_args(args)
        case args
        when String
          begin
            parsed_args = Sidekiq.load_json(args)
            symbolize_args? ? symbolize_args(parsed_args) : parsed_args
          rescue JSON::ParserError
            [*args]
          end
        when Hash
          symbolize_args? ? [symbolize_args(args)] : [args]
        when Array
          symbolize_args? ? symbolize_args(args) : args
        else
          [*args]
        end
      end

      def symbolize_args?
        @symbolize_args
      end

      def symbolize_args(input)
        if input.is_a?(Array)
          input.map do |arg|
            if arg.respond_to?(:symbolize_keys)
              arg.symbolize_keys
            else
              arg
            end
          end
        elsif input.is_a?(Hash) && input.respond_to?(:symbolize_keys)
          input.symbolize_keys
        else
          input
        end
      end

      def parse_enqueue_time(timestamp)
        DateTime.strptime(timestamp, LAST_ENQUEUE_TIME_FORMAT).to_time.utc
      rescue ArgumentError
        DateTime.parse(timestamp).to_time.utc
      end

      def not_past_scheduled_time?(current_time)
        last_cron_time = parsed_cron.previous_time(current_time).utc
        return false if (current_time.to_i - last_cron_time.to_i) > 60
        true
      end

      # Redis key for set of all cron jobs.
      def self.jobs_key
        "cron_jobs"
      end

      # Redis key for storing one cron job.
      def self.redis_key name
        "cron_job:#{name}"
      end

      # Redis key for storing one cron job.
      def redis_key
        self.class.redis_key @name
      end

      # Redis key for storing one cron job run times (when poller added job to queue)
      def self.job_enqueued_key name
        "cron_job:#{name}:enqueued"
      end

      def self.jid_history_key name
        "cron_job:#{name}:jid_history"
      end

      def job_enqueued_key
        self.class.job_enqueued_key @name
      end

      def jid_history_key
        self.class.jid_history_key @name
      end

      # Give Hash returns array for using it for redis.hmset
      def hash_to_redis hash
        hash.inject([]){ |arr,kv| arr + [kv[0], kv[1]] }
      end
    end
  end
end
