# frozen-string-literal: true

module Sequel
  class Dataset
    # ---------------------
    # :section: 3 - User Methods relating to SQL Creation
    # These are methods you can call to see what SQL will be generated by the dataset.
    # ---------------------
    
    # Returns an EXISTS clause for the dataset as an SQL::PlaceholderLiteralString.
    #
    #   DB.select(1).where(DB[:items].exists)
    #   # SELECT 1 WHERE (EXISTS (SELECT * FROM items))
    def exists
      SQL::PlaceholderLiteralString.new(EXISTS, [self], true)
    end
    
    # Returns an INSERT SQL query string.  See +insert+.
    #
    #   DB[:items].insert_sql(:a=>1)
    #   # => "INSERT INTO items (a) VALUES (1)"
    def insert_sql(*values)
      return static_sql(@opts[:sql]) if @opts[:sql]

      check_modification_allowed!

      columns = []

      case values.size
      when 0
        return insert_sql({})
      when 1
        case vals = values.at(0)
        when Hash
          values = []
          vals.each do |k,v| 
            columns << k
            values << v
          end
        when Dataset, Array, LiteralString
          values = vals
        end
      when 2
        if (v0 = values.at(0)).is_a?(Array) && ((v1 = values.at(1)).is_a?(Array) || v1.is_a?(Dataset) || v1.is_a?(LiteralString))
          columns, values = v0, v1
          raise(Error, "Different number of values and columns given to insert_sql") if values.is_a?(Array) and columns.length != values.length
        end
      end

      if values.is_a?(Array) && values.empty? && !insert_supports_empty_values? 
        columns, values = insert_empty_columns_values
      end
      clone(:columns=>columns, :values=>values).send(:_insert_sql)
    end

    # Append a literal representation of a value to the given SQL string.
    # 
    # If an unsupported object is given, an +Error+ is raised.
    def literal_append(sql, v)
      case v
      when Symbol
        if skip_symbol_cache?
          literal_symbol_append(sql, v)
        else 
          unless l = db.literal_symbol(v)
            l = String.new
            literal_symbol_append(l, v)
            db.literal_symbol_set(v, l)
          end
          sql << l
        end
      when String
        case v
        when LiteralString
          sql << v
        when SQL::Blob
          literal_blob_append(sql, v)
        else
          literal_string_append(sql, v)
        end
      when Integer
        sql << literal_integer(v)
      when Hash
        literal_hash_append(sql, v)
      when SQL::Expression
        literal_expression_append(sql, v)
      when Float
        sql << literal_float(v)
      when BigDecimal
        sql << literal_big_decimal(v)
      when NilClass
        sql << literal_nil
      when TrueClass
        sql << literal_true
      when FalseClass
        sql << literal_false
      when Array
        literal_array_append(sql, v)
      when Time
        v.is_a?(SQLTime) ? literal_sqltime_append(sql, v) : literal_time_append(sql, v)
      when DateTime
        literal_datetime_append(sql, v)
      when Date
        sql << literal_date(v)
      when Dataset
        literal_dataset_append(sql, v)
      else
        literal_other_append(sql, v)
      end
    end
    
    # Returns an array of insert statements for inserting multiple records.
    # This method is used by +multi_insert+ to format insert statements and
    # expects a keys array and and an array of value arrays.
    #
    # This method should be overridden by descendants if the support
    # inserting multiple records in a single SQL statement.
    def multi_insert_sql(columns, values)
      case multi_insert_sql_strategy
      when :values
        sql = LiteralString.new('VALUES ')
        expression_list_append(sql, values.map{|r| Array(r)})
        [insert_sql(columns, sql)]
      when :union
        c = false
        sql = LiteralString.new
        u = UNION_ALL_SELECT
        f = empty_from_sql
        values.each do |v|
          if c
            sql << u
          else
            sql << SELECT << SPACE
            c = true
          end
          expression_list_append(sql, v)
          sql << f if f
        end
        [insert_sql(columns, sql)]
      else
        values.map{|r| insert_sql(columns, r)}
      end
    end
    
    # Same as +select_sql+, not aliased directly to make subclassing simpler.
    def sql
      select_sql
    end
    
    # Returns a TRUNCATE SQL query string.  See +truncate+
    #
    #   DB[:items].truncate_sql # => 'TRUNCATE items'
    def truncate_sql
      if opts[:sql]
        static_sql(opts[:sql])
      else
        check_truncation_allowed!
        raise(InvalidOperation, "Can't truncate filtered datasets") if opts[:where] || opts[:having]
        t = String.new
        source_list_append(t, opts[:from])
        _truncate_sql(t)
      end
    end

    # Formats an UPDATE statement using the given values.  See +update+.
    #
    #   DB[:items].update_sql(:price => 100, :category => 'software')
    #   # => "UPDATE items SET price = 100, category = 'software'
    #
    # Raises an +Error+ if the dataset is grouped or includes more
    # than one table.
    def update_sql(values = OPTS)
      return static_sql(opts[:sql]) if opts[:sql]
      check_modification_allowed!
      clone(:values=>values).send(:_update_sql)
    end
    
    # ---------------------
    # :section: 9 - Internal Methods relating to SQL Creation
    # These methods, while public, are not designed to be used directly by the end user.
    # ---------------------
    
    # Given a type (e.g. select) and an array of clauses,
    # return an array of methods to call to build the SQL string.
    def self.clause_methods(type, clauses)
      clauses.map{|clause| :"#{type}_#{clause}_sql"}.freeze
    end

    # Define a dataset literalization method for the given type in the given module,
    # using the given clauses.
    #
    # Arguments:
    # mod :: Module in which to define method
    # type :: Type of SQL literalization method to create, either :select, :insert, :update, or :delete
    # clauses :: array of clauses that make up the SQL query for the type.  This can either be a single
    #            array of symbols/strings, or it can be an array of pairs, with the first element in
    #            each pair being an if/elsif/else code fragment, and the second element in each pair
    #            being an array of symbol/strings for the appropriate branch.
    def self.def_sql_method(mod, type, clauses)
      priv = type == :update || type == :insert

      lines = []
      lines << 'private' if priv
      lines << "def #{'_' if priv}#{type}_sql"
      lines << 'if sql = opts[:sql]; return static_sql(sql) end' unless priv
      lines << 'check_modification_allowed!' if type == :delete
      lines << 'sql = @opts[:append_sql] || sql_string_origin'

      if clauses.all?{|c| c.is_a?(Array)}
        clauses.each do |i, cs|
          lines << i
          lines.concat(clause_methods(type, cs).map{|x| "#{x}(sql)"}) 
        end 
        lines << 'end'
      else
        lines.concat(clause_methods(type, clauses).map{|x| "#{x}(sql)"})
      end

      lines << 'sql'
      lines << 'end'

      mod.class_eval lines.join("\n"), __FILE__, __LINE__
    end

    def_sql_method(self, :delete, %w'delete from where')
    def_sql_method(self, :insert, %w'insert into columns values')
    def_sql_method(self, :select, %w'with select distinct columns from join where group having compounds order limit lock')
    def_sql_method(self, :update, %w'update table set where')

    # Map of emulated function names to native function names.
    EMULATED_FUNCTION_MAP = {}

    WILDCARD = LiteralString.new('*').freeze
    ALL = ' ALL'.freeze
    AND_SEPARATOR = " AND ".freeze
    APOS = "'".freeze
    APOS_RE = /'/.freeze
    ARRAY_EMPTY = '(NULL)'.freeze
    AS = ' AS '.freeze
    ASC = ' ASC'.freeze
    BACKSLASH = "\\".freeze
    BITCOMP_CLOSE = ") - 1)".freeze
    BITCOMP_OPEN = "((0 - ".freeze
    BITWISE_METHOD_MAP = {:& =>:BITAND, :| => :BITOR, :^ => :BITXOR}
    BOOL_FALSE = "'f'".freeze
    BOOL_TRUE = "'t'".freeze
    BRACKET_CLOSE =  ']'.freeze
    BRACKET_OPEN = '['.freeze
    CASE_ELSE = " ELSE ".freeze
    CASE_END = " END)".freeze
    CASE_OPEN = '(CASE'.freeze
    CASE_THEN = " THEN ".freeze
    CASE_WHEN = " WHEN ".freeze
    CAST_OPEN = 'CAST('.freeze
    COLON = ':'.freeze
    COLUMN_REF_RE1 = Sequel::COLUMN_REF_RE1
    COLUMN_REF_RE2 = Sequel::COLUMN_REF_RE2
    COLUMN_REF_RE3 = Sequel::COLUMN_REF_RE3
    COMMA = ', '.freeze
    COMMA_SEPARATOR = COMMA
    CONDITION_FALSE = '(1 = 0)'.freeze
    CONDITION_TRUE = '(1 = 1)'.freeze
    COUNT_FROM_SELF_OPTS = [:distinct, :group, :sql, :limit, :offset, :compounds]
    COUNT_OF_ALL_AS_COUNT = SQL::Function.new(:count, WILDCARD).as(:count)
    DATASET_ALIAS_BASE_NAME = 't'.freeze
    DEFAULT = LiteralString.new('DEFAULT').freeze
    DEFAULT_VALUES = " DEFAULT VALUES".freeze
    DELETE = 'DELETE'.freeze
    DESC = ' DESC'.freeze
    DISTINCT = " DISTINCT".freeze
    DOT = '.'.freeze
    DOUBLE_APOS = "''".freeze
    DOUBLE_QUOTE = '""'.freeze
    EQUAL = ' = '.freeze
    EMPTY_PARENS = '()'.freeze
    ESCAPE = " ESCAPE ".freeze
    EXTRACT = 'extract('.freeze
    EXISTS = ['EXISTS '.freeze].freeze
    FILTER = " FILTER (WHERE ".freeze
    FOR_UPDATE = ' FOR UPDATE'.freeze
    FORMAT_DATE = "'%Y-%m-%d'".freeze
    FORMAT_DATE_STANDARD = "DATE '%Y-%m-%d'".freeze
    FORMAT_OFFSET = "%+03i%02i".freeze
    FORMAT_TIMESTAMP_RE = /%[Nz]/.freeze
    FORMAT_USEC = '%N'.freeze
    FRAME_ALL = "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING".freeze
    FRAME_ROWS = "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".freeze
    FROM = ' FROM '.freeze
    FUNCTION_DISTINCT = "DISTINCT ".freeze
    GROUP_BY = " GROUP BY ".freeze
    HAVING = " HAVING ".freeze
    INSERT = "INSERT".freeze
    INTO = " INTO ".freeze
    IS_LITERALS = {nil=>'NULL'.freeze, true=>'TRUE'.freeze, false=>'FALSE'.freeze}.freeze
    IS_OPERATORS = ::Sequel::SQL::ComplexExpression::IS_OPERATORS
    LATERAL = 'LATERAL '.freeze
    LIKE_OPERATORS = ::Sequel::SQL::ComplexExpression::LIKE_OPERATORS
    LIMIT = " LIMIT ".freeze
    N_ARITY_OPERATORS = ::Sequel::SQL::ComplexExpression::N_ARITY_OPERATORS
    NOT_SPACE = 'NOT '.freeze
    NULL = "NULL".freeze
    NULLS_FIRST = " NULLS FIRST".freeze
    NULLS_LAST = " NULLS LAST".freeze
    OFFSET = " OFFSET ".freeze
    ON = ' ON '.freeze
    ON_PAREN = " ON (".freeze
    ORDER_BY = " ORDER BY ".freeze
    ORDER_BY_NS = "ORDER BY ".freeze
    OVER = ' OVER '.freeze
    PAREN_CLOSE = ')'.freeze
    PAREN_OPEN = '('.freeze
    PAREN_SPACE_OPEN = ' ('.freeze
    PARTITION_BY = "PARTITION BY ".freeze
    QUALIFY_KEYS = [:select, :where, :having, :order, :group]
    QUESTION_MARK = '?'.freeze
    QUESTION_MARK_RE = /\?/.freeze
    QUOTE = '"'.freeze
    QUOTE_RE = /"/.freeze
    RETURNING = " RETURNING ".freeze
    SELECT = 'SELECT'.freeze
    SET = ' SET '.freeze
    SPACE = ' '.freeze
    SQL_WITH = "WITH ".freeze
    SPACE_WITH = " WITH ".freeze
    TILDE = '~'.freeze
    TIMESTAMP_FORMAT = "'%Y-%m-%d %H:%M:%S%N%z'".freeze
    STANDARD_TIMESTAMP_FORMAT = "TIMESTAMP #{TIMESTAMP_FORMAT}".freeze
    TWO_ARITY_OPERATORS = ::Sequel::SQL::ComplexExpression::TWO_ARITY_OPERATORS
    REGEXP_OPERATORS = ::Sequel::SQL::ComplexExpression::REGEXP_OPERATORS
    UNDERSCORE = '_'.freeze
    UPDATE = 'UPDATE'.freeze
    USING = ' USING ('.freeze
    UNION_ALL_SELECT = ' UNION ALL SELECT '.freeze
    VALUES = " VALUES ".freeze
    WHERE = " WHERE ".freeze
    WITH_ORDINALITY = " WITH ORDINALITY".freeze
    WITHIN_GROUP = " WITHIN GROUP (ORDER BY ".freeze

    DATETIME_SECFRACTION_ARG = RUBY_VERSION >= '1.9.0' ? 1000000 : 86400000000

    [:literal, :quote_identifier, :quote_schema_table].each do |meth|
      class_eval(<<-END, __FILE__, __LINE__ + 1)
        def #{meth}(*args, &block)
          s = ''.dup
          #{meth}_append(s, *args, &block)
          s
        end
      END
    end

    # Append literalization of aliased expression to SQL string.
    def aliased_expression_sql_append(sql, ae)
      literal_append(sql, ae.expression)
      as_sql_append(sql, ae.alias, ae.columns)
    end

    # Append literalization of array to SQL string.
    def array_sql_append(sql, a)
      if a.empty?
        sql << ARRAY_EMPTY
      else
        sql << PAREN_OPEN
        expression_list_append(sql, a)
        sql << PAREN_CLOSE
      end
    end

    # Append literalization of boolean constant to SQL string.
    def boolean_constant_sql_append(sql, constant)
      if (constant == true || constant == false) && !supports_where_true?
        sql << (constant == true ? CONDITION_TRUE : CONDITION_FALSE)
      else
        literal_append(sql, constant)
      end
    end

    # Append literalization of case expression to SQL string.
    def case_expression_sql_append(sql, ce)
      sql << CASE_OPEN
      if ce.expression?
        sql << SPACE
        literal_append(sql, ce.expression)
      end
      w = CASE_WHEN
      t = CASE_THEN
      ce.conditions.each do |c,r|
        sql << w
        literal_append(sql, c)
        sql << t
        literal_append(sql, r)
      end
      sql << CASE_ELSE
      literal_append(sql, ce.default)
      sql << CASE_END
    end

    # Append literalization of cast expression to SQL string.
    def cast_sql_append(sql, expr, type)
      sql << CAST_OPEN
      literal_append(sql, expr)
      sql << AS << db.cast_type_literal(type).to_s
      sql << PAREN_CLOSE
    end

    # Append literalization of column all selection to SQL string.
    def column_all_sql_append(sql, ca)
      qualified_identifier_sql_append(sql, ca.table, WILDCARD)
    end

    # Append literalization of complex expression to SQL string.
    def complex_expression_sql_append(sql, op, args)
      case op
      when *IS_OPERATORS
        r = args.at(1)
        if r.nil? || supports_is_true?
          raise(InvalidOperation, 'Invalid argument used for IS operator') unless val = IS_LITERALS[r]
          sql << PAREN_OPEN
          literal_append(sql, args.at(0))
          sql << SPACE << op.to_s << SPACE
          sql << val << PAREN_CLOSE
        elsif op == :IS
          complex_expression_sql_append(sql, :"=", args)
        else
          complex_expression_sql_append(sql, :OR, [SQL::BooleanExpression.new(:"!=", *args), SQL::BooleanExpression.new(:IS, args.at(0), nil)])
        end
      when :IN, :"NOT IN"
        cols = args.at(0)
        vals = args.at(1)
        col_array = true if cols.is_a?(Array)
        if vals.is_a?(Array)
          val_array = true
          empty_val_array = vals == []
        end
        if empty_val_array
          literal_append(sql, empty_array_value(op, cols))
        elsif col_array
          if !supports_multiple_column_in?
            if val_array
              expr = SQL::BooleanExpression.new(:OR, *vals.to_a.map{|vs| SQL::BooleanExpression.from_value_pairs(cols.to_a.zip(vs).map{|c, v| [c, v]})})
              literal_append(sql, op == :IN ? expr : ~expr)
            else
              old_vals = vals
              vals = vals.naked if vals.is_a?(Sequel::Dataset)
              vals = vals.to_a
              val_cols = old_vals.columns
              complex_expression_sql_append(sql, op, [cols, vals.map!{|x| x.values_at(*val_cols)}])
            end
          else
            # If the columns and values are both arrays, use array_sql instead of
            # literal so that if values is an array of two element arrays, it
            # will be treated as a value list instead of a condition specifier.
            sql << PAREN_OPEN
            literal_append(sql, cols)
            sql << SPACE << op.to_s << SPACE
            if val_array
              array_sql_append(sql, vals)
            else
              literal_append(sql, vals)
            end
            sql << PAREN_CLOSE
          end
        else
          sql << PAREN_OPEN
          literal_append(sql, cols)
          sql << SPACE << op.to_s << SPACE
          literal_append(sql, vals)
          sql << PAREN_CLOSE
        end
      when :LIKE, :'NOT LIKE'
        sql << PAREN_OPEN
        literal_append(sql, args.at(0))
        sql << SPACE << op.to_s << SPACE
        literal_append(sql, args.at(1))
        sql << ESCAPE
        literal_append(sql, BACKSLASH)
        sql << PAREN_CLOSE
      when :ILIKE, :'NOT ILIKE'
        complex_expression_sql_append(sql, (op == :ILIKE ? :LIKE : :"NOT LIKE"), args.map{|v| Sequel.function(:UPPER, v)})
      when *TWO_ARITY_OPERATORS
        if REGEXP_OPERATORS.include?(op) && !supports_regexp?
          raise InvalidOperation, "Pattern matching via regular expressions is not supported on #{db.database_type}"
        end
        sql << PAREN_OPEN
        literal_append(sql, args.at(0))
        sql << SPACE << op.to_s << SPACE
        literal_append(sql, args.at(1))
        sql << PAREN_CLOSE
      when *N_ARITY_OPERATORS
        sql << PAREN_OPEN
        c = false
        op_str = " #{op} "
        args.each do |a|
          sql << op_str if c
          literal_append(sql, a)
          c ||= true
        end
        sql << PAREN_CLOSE
      when :NOT
        sql << NOT_SPACE
        literal_append(sql, args.at(0))
      when :NOOP
        literal_append(sql, args.at(0))
      when :'B~'
        sql << TILDE
        literal_append(sql, args.at(0))
      when :extract
        sql << EXTRACT << args.at(0).to_s << FROM
        literal_append(sql, args.at(1))
        sql << PAREN_CLOSE
      else
        raise(InvalidOperation, "invalid operator #{op}")
      end
    end
    
    # Append literalization of constant to SQL string.
    def constant_sql_append(sql, constant)
      sql << constant.to_s
    end

    # Append literalization of delayed evaluation to SQL string,
    # causing the delayed evaluation proc to be evaluated.
    def delayed_evaluation_sql_append(sql, delay)
      if recorder = @opts[:placeholder_literalizer]
        recorder.use(sql, lambda{delay.call(self)}, nil)
      else
        literal_append(sql, delay.call(self))
      end
    end

    # Append literalization of function call to SQL string.
    def function_sql_append(sql, f)
      name = f.name
      opts = f.opts

      if opts[:emulate]
        if emulate_function?(name)
          emulate_function_sql_append(sql, f)
          return
        end

        name = native_function_name(name) 
      end

      sql << LATERAL if opts[:lateral]

      case name
      when SQL::Identifier
        if supports_quoted_function_names? && opts[:quoted] != false
          literal_append(sql, name)
        else
          sql << name.value.to_s
        end
      when SQL::QualifiedIdentifier
        if supports_quoted_function_names? && opts[:quoted] != false
          literal_append(sql, name)
        else
          sql << split_qualifiers(name).join(DOT)
        end
      else
        if supports_quoted_function_names? && opts[:quoted]
          quote_identifier_append(sql, name)
        else
          sql << name.to_s
        end
      end

      sql << PAREN_OPEN
      if opts[:*]
        sql << WILDCARD
      else
        sql << FUNCTION_DISTINCT if opts[:distinct]
        expression_list_append(sql, f.args)
        if order = opts[:order]
          sql << ORDER_BY
          expression_list_append(sql, order)
        end
      end
      sql << PAREN_CLOSE

      if group = opts[:within_group]
        sql << WITHIN_GROUP
        expression_list_append(sql, group)
        sql << PAREN_CLOSE
      end

      if filter = opts[:filter]
        sql << FILTER
        literal_append(sql, filter_expr(filter, &opts[:filter_block]))
        sql << PAREN_CLOSE
      end

      if window = opts[:over]
        sql << OVER
        window_sql_append(sql, window.opts)
      end

      if opts[:with_ordinality]
        sql << WITH_ORDINALITY
      end
    end

    # Append literalization of JOIN clause without ON or USING to SQL string.
    def join_clause_sql_append(sql, jc)
      table = jc.table
      table_alias = jc.table_alias
      table_alias = nil if table == table_alias && !jc.column_aliases
      sql << SPACE << join_type_sql(jc.join_type) << SPACE
      identifier_append(sql, table)
      as_sql_append(sql, table_alias, jc.column_aliases) if table_alias
    end

    # Append literalization of JOIN ON clause to SQL string.
    def join_on_clause_sql_append(sql, jc)
      join_clause_sql_append(sql, jc)
      sql << ON
      literal_append(sql, filter_expr(jc.on))
    end

    # Append literalization of JOIN USING clause to SQL string.
    def join_using_clause_sql_append(sql, jc)
      join_clause_sql_append(sql, jc)
      sql << USING
      column_list_append(sql, jc.using)
      sql << PAREN_CLOSE
    end
    
    # Append literalization of negative boolean constant to SQL string.
    def negative_boolean_constant_sql_append(sql, constant)
      sql << NOT_SPACE
      boolean_constant_sql_append(sql, constant)
    end

    # Append literalization of ordered expression to SQL string.
    def ordered_expression_sql_append(sql, oe)
      literal_append(sql, oe.expression)
      sql << (oe.descending ? DESC : ASC)
      case oe.nulls
      when :first
        sql << NULLS_FIRST
      when :last
        sql << NULLS_LAST
      end
    end

    # Append literalization of placeholder literal string to SQL string.
    def placeholder_literal_string_sql_append(sql, pls)
      args = pls.args
      str = pls.str
      sql << PAREN_OPEN if pls.parens
      if args.is_a?(Hash)
        if args.empty?
          sql << str
        else
          re = /:(#{args.keys.map{|k| Regexp.escape(k.to_s)}.join('|')})\b/
          loop do
            previous, q, str = str.partition(re)
            sql << previous
            literal_append(sql, args[($1||q[1..-1].to_s).to_sym]) unless q.empty?
            break if str.empty?
          end
        end
      elsif str.is_a?(Array)
        len = args.length
        str.each_with_index do |s, i|
          sql << s
          literal_append(sql, args[i]) unless i == len
        end
        unless str.length == args.length || str.length == args.length + 1
          raise Error, "Mismatched number of placeholders (#{str.length}) and placeholder arguments (#{args.length}) when using placeholder array"
        end
      else
        i = -1
        match_len = args.length - 1
        loop do
          previous, q, str = str.partition(QUESTION_MARK)
          sql << previous
          literal_append(sql, args.at(i+=1)) unless q.empty?
          if str.empty?
            unless i == match_len
              raise Error, "Mismatched number of placeholders (#{i+1}) and placeholder arguments (#{args.length}) when using placeholder array"
            end
            break
          end
        end
      end
      sql << PAREN_CLOSE if pls.parens
    end

    # Append literalization of qualified identifier to SQL string.
    # If 3 arguments are given, the 2nd should be the table/qualifier and the third should be
    # column/qualified.  If 2 arguments are given, the 2nd should be an SQL::QualifiedIdentifier.
    def qualified_identifier_sql_append(sql, table, column=(c = table.column; table = table.table; c))
      identifier_append(sql, table)
      sql << DOT
      identifier_append(sql, column)
    end

    # Append literalization of unqualified identifier to SQL string.
    # Adds quoting to identifiers (columns and tables). If identifiers are not
    # being quoted, returns name as a string.  If identifiers are being quoted
    # quote the name with quoted_identifier.
    def quote_identifier_append(sql, name)
      if name.is_a?(LiteralString)
        sql << name
      else
        name = name.value if name.is_a?(SQL::Identifier)
        name = input_identifier(name)
        if quote_identifiers?
          quoted_identifier_append(sql, name)
        else
          sql << name
        end
      end
    end

    # Append literalization of identifier or unqualified identifier to SQL string.
    def quote_schema_table_append(sql, table)
      schema, table = schema_and_table(table)
      if schema
        quote_identifier_append(sql, schema)
        sql << DOT
      end
      quote_identifier_append(sql, table)
    end

    # Append literalization of quoted identifier to SQL string.
    # This method quotes the given name with the SQL standard double quote. 
    # should be overridden by subclasses to provide quoting not matching the
    # SQL standard, such as backtick (used by MySQL and SQLite).
    def quoted_identifier_append(sql, name)
      sql << QUOTE << name.to_s.gsub(QUOTE_RE, DOUBLE_QUOTE) << QUOTE
    end

    # Split the schema information from the table, returning two strings,
    # one for the schema and one for the table.  The returned schema may
    # be nil, but the table will always have a string value.
    #
    # Note that this function does not handle tables with more than one
    # level of qualification (e.g. database.schema.table on Microsoft
    # SQL Server).
    def schema_and_table(table_name, sch=nil)
      sch = sch.to_s if sch
      case table_name
      when Symbol
        s, t, _ = split_symbol(table_name)
        [s||sch, t]
      when SQL::QualifiedIdentifier
        [table_name.table.to_s, table_name.column.to_s]
      when SQL::Identifier
        [sch, table_name.value.to_s]
      when String
        [sch, table_name]
      else
        raise Error, 'table_name should be a Symbol, SQL::QualifiedIdentifier, SQL::Identifier, or String'
      end
    end

    # Splits table_name into an array of strings.
    #
    #   ds.split_qualifiers(:s) # ['s']
    #   ds.split_qualifiers(:t__s) # ['t', 's']
    #   ds.split_qualifiers(Sequel.qualify(:d, :t__s)) # ['d', 't', 's']
    #   ds.split_qualifiers(Sequel.qualify(:h__d, :t__s)) # ['h', 'd', 't', 's']
    def split_qualifiers(table_name, *args)
      case table_name
      when SQL::QualifiedIdentifier
        split_qualifiers(table_name.table, nil) + split_qualifiers(table_name.column, nil)
      else
        sch, table = schema_and_table(table_name, *args)
        sch ? [sch, table] : [table]
      end
    end

    # Append literalization of subscripts (SQL array accesses) to SQL string.
    def subscript_sql_append(sql, s)
      literal_append(sql, s.f)
      sql << BRACKET_OPEN
      if s.sub.length == 1 && (range = s.sub.first).is_a?(Range)
        literal_append(sql, range.begin)
        sql << COLON
        e = range.end
        e -= 1 if range.exclude_end? && e.is_a?(Integer)
        literal_append(sql, e)
      else
        expression_list_append(sql, s.sub)
      end
      sql << BRACKET_CLOSE
    end

    # Append literalization of windows (for window functions) to SQL string.
    def window_sql_append(sql, opts)
      raise(Error, 'This dataset does not support window functions') unless supports_window_functions?
      sql << PAREN_OPEN
      window, part, order, frame = opts.values_at(:window, :partition, :order, :frame)
      space = false
      space_s = SPACE
      if window
        literal_append(sql, window)
        space = true
      end
      if part
        sql << space_s if space
        sql << PARTITION_BY
        expression_list_append(sql, Array(part))
        space = true
      end
      if order
        sql << space_s if space
        sql << ORDER_BY_NS
        expression_list_append(sql, Array(order))
        space = true
      end
      case frame
        when nil
          # nothing
        when :all
          sql << space_s if space
          sql << FRAME_ALL
        when :rows
          sql << space_s if space
          sql << FRAME_ROWS
        when String
          sql << space_s if space
          sql << frame
        else
          raise Error, "invalid window frame clause, should be :all, :rows, a string, or nil"
      end
      sql << PAREN_CLOSE
    end

    protected

    # Return a from_self dataset if an order or limit is specified, so it works as expected
    # with UNION, EXCEPT, and INTERSECT clauses.
    def compound_from_self
      (@opts[:sql] || @opts[:limit] || @opts[:order]) ? from_self : self
    end
    
    private

    # Formats the truncate statement.  Assumes the table given has already been
    # literalized.
    def _truncate_sql(table)
      "TRUNCATE TABLE #{table}"
    end

    # Returns an appropriate symbol for the alias represented by s.
    def alias_alias_symbol(s)
      case s
      when Symbol
        s
      when String
        s.to_sym
      when SQL::Identifier
        s.value.to_s.to_sym
      else
        raise Error, "Invalid alias for alias_alias_symbol: #{s.inspect}"
      end
    end

    # Returns an appropriate alias symbol for the given object, which can be
    # a Symbol, String, SQL::Identifier, SQL::QualifiedIdentifier, or
    # SQL::AliasedExpression.
    def alias_symbol(sym)
      case sym
      when Symbol
        s, t, a = split_symbol(sym)
        a || s ? (a || t).to_sym : sym
      when String
        sym.to_sym
      when SQL::Identifier
        sym.value.to_s.to_sym
      when SQL::QualifiedIdentifier
        alias_symbol(sym.column)
      when SQL::AliasedExpression
        alias_alias_symbol(sym.alias)
      else
        raise Error, "Invalid alias for alias_symbol: #{sym.inspect}"
      end
    end

    # Clone of this dataset usable in aggregate operations.  Does
    # a from_self if dataset contains any parameters that would
    # affect normal aggregation, or just removes an existing
    # order if not.
    def aggregate_dataset
      options_overlap(COUNT_FROM_SELF_OPTS) ? from_self : unordered
    end

    # Append aliasing expression to SQL string.
    def as_sql_append(sql, aliaz, column_aliases=nil)
      sql << AS
      quote_identifier_append(sql, aliaz)
      if column_aliases
        raise Error, "#{db.database_type} does not support derived column lists" unless supports_derived_column_lists?
        sql << PAREN_OPEN
        identifier_list_append(sql, column_aliases)
        sql << PAREN_CLOSE
      end
    end
    
    # Raise an InvalidOperation exception if deletion is not allowed
    # for this dataset
    def check_modification_allowed!
      raise(InvalidOperation, "Grouped datasets cannot be modified") if opts[:group]
      raise(InvalidOperation, "Joined datasets cannot be modified") if !supports_modifying_joins? && joined_dataset?
    end

    # Alias of check_modification_allowed!
    def check_truncation_allowed!
      check_modification_allowed!
    end

    # Append column list to SQL string.
    # Converts an array of column names into a comma seperated string of 
    # column names. If the array is empty, a wildcard (*) is returned.
    def column_list_append(sql, columns)
      if (columns.nil? || columns.empty?)
        sql << WILDCARD
      else
        expression_list_append(sql, columns)
      end
    end

    # Yield each pair of arguments to the block, which should
    # return an object representing the SQL expression for those
    # two arguments.  For more than two arguments, the first
    # argument to the block will be result of the previous block call.
    def complex_expression_arg_pairs(args)
      case args.length
      when 1
        args.at(0)
      when 2
        yield args.at(0), args.at(1)
      else
        args.inject{|m, a| yield(m, a)}
      end
    end

    # Append the literalization of the args using complex_expression_arg_pairs
    # to the given SQL string, used when database operator/function is 2-ary
    # where Sequel expression is N-ary.
    def complex_expression_arg_pairs_append(sql, args, &block)
      literal_append(sql, complex_expression_arg_pairs(args, &block))
    end

    # Append literalization of complex expression to SQL string, for
    # operators unsupported by some databases. Used by adapters for databases
    # that don't support the operators natively.
    def complex_expression_emulate_append(sql, op, args)
      case op
      when :%
        complex_expression_arg_pairs_append(sql, args){|a, b| Sequel.function(:MOD, a, b)}
      when :>>
        complex_expression_arg_pairs_append(sql, args){|a, b| Sequel./(a, Sequel.function(:power, 2, b))}
      when :<<
        complex_expression_arg_pairs_append(sql, args){|a, b| Sequel.*(a, Sequel.function(:power, 2, b))}
      when :&, :|, :^
        f = BITWISE_METHOD_MAP[op]
        complex_expression_arg_pairs_append(sql, args){|a, b| Sequel.function(f, a, b)}
      when :'B~'
        sql << BITCOMP_OPEN
        literal_append(sql, args.at(0))
        sql << BITCOMP_CLOSE
      end
    end

    # Append literalization of dataset used in UNION/INTERSECT/EXCEPT clause to SQL string.
    def compound_dataset_sql_append(sql, ds)
      subselect_sql_append(sql, ds)
    end

    # The alias to use for datasets, takes a number to make sure the name is unique.
    def dataset_alias(number)
      :"#{DATASET_ALIAS_BASE_NAME}#{number}"
    end
    
    # The strftime format to use when literalizing the time.
    def default_timestamp_format
      requires_sql_standard_datetimes? ? STANDARD_TIMESTAMP_FORMAT : TIMESTAMP_FORMAT
    end

    def delete_delete_sql(sql)
      sql << DELETE
    end

    def delete_from_sql(sql)
      if f = @opts[:from]
        sql << FROM
        source_list_append(sql, f)
      end
    end

    # An SQL FROM clause to use in SELECT statements where the dataset has
    # no from tables.
    def empty_from_sql
      nil
    end

    # Whether to emulate the function with the given name.  This should only be true
    # if the emulation goes beyond choosing a function with a different name.
    def emulate_function?(name)
      false
    end

    # Append literalization of array of expressions to SQL string.
    def expression_list_append(sql, columns)
      c = false
      co = COMMA
      columns.each do |col|
        sql << co if c
        literal_append(sql, col)
        c ||= true
      end
    end

    # Append literalization of array of grouping elements to SQL string.
    def grouping_element_list_append(sql, columns)
      c = false
      co = COMMA
      columns.each do |col|
        sql << co if c
        if col.is_a?(Array) && col.empty?
          sql << EMPTY_PARENS
        else
          literal_append(sql, Array(col))
        end
        c ||= true
      end
    end

    # An expression for how to handle an empty array lookup.
    def empty_array_value(op, cols)
      {1 => ((op == :IN) ? 0 : 1)}
    end
    
    # Format the timestamp based on the default_timestamp_format, with a couple
    # of modifiers.  First, allow %N to be used for fractions seconds (if the
    # database supports them), and override %z to always use a numeric offset
    # of hours and minutes.
    def format_timestamp(v)
      v2 = db.from_application_timestamp(v)
      fmt = default_timestamp_format.gsub(FORMAT_TIMESTAMP_RE) do |m|
        if m == FORMAT_USEC
          format_timestamp_usec(v.is_a?(DateTime) ? v.sec_fraction*(DATETIME_SECFRACTION_ARG) : v.usec) if supports_timestamp_usecs?
        else
          if supports_timestamp_timezones?
            # Would like to just use %z format, but it doesn't appear to work on Windows
            # Instead, the offset fragment is constructed manually
            minutes = (v2.is_a?(DateTime) ? v2.offset * 1440 : v2.utc_offset/60).to_i
            format_timestamp_offset(*minutes.divmod(60))
          end
        end
      end
      v2.strftime(fmt)
    end
    
    # Return the SQL timestamp fragment to use for the timezone offset.
    def format_timestamp_offset(hour, minute)
      sprintf(FORMAT_OFFSET, hour, minute)
    end

    # Return the SQL timestamp fragment to use for the fractional time part.
    # Should start with the decimal point.  Uses 6 decimal places by default.
    def format_timestamp_usec(usec)
      unless (ts = timestamp_precision) == 6
        usec = usec/(10 ** (6 - ts))
      end
      sprintf(".%0#{ts}d", usec)
    end

    # Append literalization of identifier to SQL string, considering regular strings
    # as SQL identifiers instead of SQL strings.
    def identifier_append(sql, v)
      if v.is_a?(String)
        case v
        when LiteralString
          sql << v
        when SQL::Blob
          literal_append(sql, v)
        else
          quote_identifier_append(sql, v)
        end
      else
        literal_append(sql, v)
      end
    end

    # Append literalization of array of identifiers to SQL string.
    def identifier_list_append(sql, args)
      c = false
      comma = COMMA
      args.each do |a|
        sql << comma if c
        identifier_append(sql, a)
        c ||= true
      end
    end

    # Modify the identifier returned from the database based on the
    # identifier_output_method.
    def input_identifier(v)
      (i = identifier_input_method) ? v.to_s.send(i) : v.to_s
    end

    def insert_into_sql(sql)
      sql << INTO
      if (f = @opts[:from]) && f.length == 1
        identifier_append(sql, unaliased_identifier(f.first))
      else
        source_list_append(sql, f)
      end
    end

    def insert_columns_sql(sql)
      columns = opts[:columns]
      if columns && !columns.empty?
        sql << PAREN_SPACE_OPEN
        identifier_list_append(sql, columns)
        sql << PAREN_CLOSE
      end 
    end

    # The columns and values to use for an empty insert if the database doesn't support
    # INSERT with DEFAULT VALUES.
    def insert_empty_columns_values
      [[columns.last], [DEFAULT]]
    end
    
    def insert_insert_sql(sql)
      sql << INSERT
    end

    def insert_values_sql(sql)
      case values = opts[:values]
      when Array
        if values.empty?
          sql << DEFAULT_VALUES
        else
          sql << VALUES
          literal_append(sql, values)
        end
      when Dataset
        sql << SPACE
        subselect_sql_append(sql, values)
      when LiteralString
        sql << SPACE << values
      else
        raise Error, "Unsupported INSERT values type, should be an Array or Dataset: #{values.inspect}"
      end
    end

    def insert_returning_sql(sql)
      if opts.has_key?(:returning)
        sql << RETURNING
        column_list_append(sql, Array(opts[:returning]))
      end
    end
    alias delete_returning_sql insert_returning_sql
    alias update_returning_sql insert_returning_sql

    # SQL fragment specifying a JOIN type, converts underscores to
    # spaces and upcases.
    def join_type_sql(join_type)
      "#{join_type.to_s.gsub(UNDERSCORE, SPACE).upcase} JOIN"
    end

    # Append a literalization of the array to SQL string.
    # Treats as an expression if an array of all two pairs, or as a SQL array otherwise.
    def literal_array_append(sql, v)
      if Sequel.condition_specifier?(v)
        literal_expression_append(sql, SQL::BooleanExpression.from_value_pairs(v))
      else
        array_sql_append(sql, v)
      end
    end

    # SQL fragment for BigDecimal
    def literal_big_decimal(v)
      d = v.to_s("F")
      v.nan? || v.infinite? ?  "'#{d}'" : d
    end

    # Append literalization of SQL::Blob to SQL string.
    def literal_blob_append(sql, v)
      literal_string_append(sql, v)
    end

    # Append literalization of dataset to SQL string.  Does a subselect inside parantheses.
    def literal_dataset_append(sql, v)
      sql << LATERAL if v.opts[:lateral]
      sql << PAREN_OPEN
      subselect_sql_append(sql, v)
      sql << PAREN_CLOSE
    end

    # SQL fragment for Date, using the ISO8601 format.
    def literal_date(v)
      if requires_sql_standard_datetimes?
        v.strftime(FORMAT_DATE_STANDARD)
      else
        v.strftime(FORMAT_DATE)
      end
    end

    # SQL fragment for DateTime
    def literal_datetime(v)
      format_timestamp(v)
    end

    # Append literalization of DateTime to SQL string.
    def literal_datetime_append(sql, v)
      sql << literal_datetime(v)
    end

    # Append literalization of SQL::Expression to SQL string.
    def literal_expression_append(sql, v)
      v.to_s_append(self, sql)
    end

    # SQL fragment for false
    def literal_false
      BOOL_FALSE
    end

    # SQL fragment for Float
    def literal_float(v)
      v.to_s
    end

    # Append literalization of Hash to SQL string, treating hash as a boolean expression.
    def literal_hash_append(sql, v)
      literal_expression_append(sql, SQL::BooleanExpression.from_value_pairs(v))
    end

    # SQL fragment for Integer
    def literal_integer(v)
      v.to_s
    end

    # SQL fragment for nil
    def literal_nil
      NULL
    end

    # Append a literalization of the object to the given SQL string.
    # Calls +sql_literal_append+ if object responds to it, otherwise
    # calls +sql_literal+ if object responds to it, otherwise raises an error.
    # If a database specific type is allowed, this should be overriden in a subclass.
    def literal_other_append(sql, v)
      if v.respond_to?(:sql_literal_append)
        v.sql_literal_append(self, sql)
      elsif v.respond_to?(:sql_literal)
        sql << v.sql_literal(self)
      else
        raise Error, "can't express #{v.inspect} as a SQL literal"
      end
    end

    # SQL fragment for Sequel::SQLTime, containing just the time part
    def literal_sqltime(v)
      v.strftime("'%H:%M:%S#{format_timestamp_usec(v.usec) if supports_timestamp_usecs?}'")
    end

    # Append literalization of Sequel::SQLTime to SQL string.
    def literal_sqltime_append(sql, v)
      sql << literal_sqltime(v)
    end

    # Append literalization of string to SQL string.
    def literal_string_append(sql, v)
      sql << APOS << v.gsub(APOS_RE, DOUBLE_APOS) << APOS
    end

    # Append literalization of symbol to SQL string.
    def literal_symbol_append(sql, v)
      c_table, column, c_alias = split_symbol(v)
      if c_table
        quote_identifier_append(sql, c_table)
        sql << DOT
      end
      quote_identifier_append(sql, column)
      as_sql_append(sql, c_alias) if c_alias
    end

    # SQL fragment for Time
    def literal_time(v)
      format_timestamp(v)
    end

    # Append literalization of Time to SQL string.
    def literal_time_append(sql, v)
      sql << literal_time(v)
    end

    # SQL fragment for true
    def literal_true
      BOOL_TRUE
    end

    # What strategy to use for import/multi_insert.  While SQL-92 defaults
    # to allowing multiple rows in a VALUES clause, there are enough databases
    # that don't allow that that it can't be the default.  Use separate queries
    # by default, which works everywhere.
    def multi_insert_sql_strategy
      :separate
    end

    # Get the native function name given the emulated function name.
    def native_function_name(emulated_function)
      self.class.const_get(:EMULATED_FUNCTION_MAP).fetch(emulated_function, emulated_function)
    end

    # Returns a qualified column name (including a table name) if the column
    # name isn't already qualified.
    def qualified_column_name(column, table)
      if column.is_a?(Symbol)
        c_table, column, _ = split_symbol(column)
        unless c_table
          case table
          when Symbol
            schema, table, t_alias = split_symbol(table)
            t_alias ||= Sequel::SQL::QualifiedIdentifier.new(schema, table) if schema
          when Sequel::SQL::AliasedExpression
            t_alias = table.alias
          end
          c_table = t_alias || table
        end
        ::Sequel::SQL::QualifiedIdentifier.new(c_table, column)
      else
        column
      end
    end
    
    # Qualify the given expression e to the given table.
    def qualified_expression(e, table)
      Qualifier.new(self, table).transform(e)
    end

    def select_columns_sql(sql)
      sql << SPACE
      column_list_append(sql, @opts[:select])
    end

    def select_distinct_sql(sql)
      if distinct = @opts[:distinct]
        sql << DISTINCT
        unless distinct.empty?
          sql << ON_PAREN
          expression_list_append(sql, distinct)
          sql << PAREN_CLOSE
        end
      end
    end

    # Modify the sql to add a dataset to the via an EXCEPT, INTERSECT, or UNION clause.
    # This uses a subselect for the compound datasets used, because using parantheses doesn't
    # work on all databases.  I consider this an ugly hack, but can't I think of a better default.
    def select_compounds_sql(sql)
      return unless c = @opts[:compounds]
      c.each do |type, dataset, all|
        sql << SPACE << type.to_s.upcase
        sql << ALL if all
        sql << SPACE
        compound_dataset_sql_append(sql, dataset)
      end
    end

    def select_from_sql(sql)
      if f = @opts[:from]
        sql << FROM
        source_list_append(sql, f)
      elsif f = empty_from_sql
        sql << f
      end
    end

    def select_group_sql(sql)
      if group = @opts[:group]
        sql << GROUP_BY
        if go = @opts[:group_options]
          if go == :"grouping sets"
            sql << go.to_s.upcase << PAREN_OPEN
            grouping_element_list_append(sql, group)
            sql << PAREN_CLOSE
          elsif uses_with_rollup?
            expression_list_append(sql, group)
            sql << SPACE_WITH << go.to_s.upcase
          else
            sql << go.to_s.upcase << PAREN_OPEN
            expression_list_append(sql, group)
            sql << PAREN_CLOSE
          end
        else
          expression_list_append(sql, group)
        end
      end
    end

    def select_having_sql(sql)
      if having = @opts[:having]
        sql << HAVING
        literal_append(sql, having)
      end
    end

    def select_join_sql(sql)
      if js = @opts[:join]
        js.each{|j| literal_append(sql, j)}
      end
    end

    def select_limit_sql(sql)
      if l = @opts[:limit]
        sql << LIMIT
        literal_append(sql, l)
        if o = @opts[:offset]
          sql << OFFSET
          literal_append(sql, o)
        end
      elsif @opts[:offset]
        select_only_offset_sql(sql)
      end
    end

    def select_lock_sql(sql)
      case l = @opts[:lock]
      when :update
        sql << FOR_UPDATE
      when String
        sql << SPACE << l
      end
    end

    # Used only if there is an offset and no limit, making it easier to override
    # in the adapter, as many databases do not support just a plain offset with
    # no limit.
    def select_only_offset_sql(sql)
      sql << OFFSET
      literal_append(sql, @opts[:offset])
    end
  
    def select_order_sql(sql)
      if o = @opts[:order]
        sql << ORDER_BY
        expression_list_append(sql, o)
      end
    end
    alias delete_order_sql select_order_sql
    alias update_order_sql select_order_sql

    def select_select_sql(sql)
      sql << SELECT
    end

    def select_where_sql(sql)
      if w = @opts[:where]
        sql << WHERE
        literal_append(sql, w)
      end
    end
    alias delete_where_sql select_where_sql
    alias update_where_sql select_where_sql
    
    def select_with_sql(sql)
      return unless supports_cte?
      ws = opts[:with]
      return if !ws || ws.empty?
      sql << select_with_sql_base
      c = false
      comma = COMMA
      ws.each do |w|
        sql << comma if c
        quote_identifier_append(sql, w[:name])
        if args = w[:args]
         sql << PAREN_OPEN
         identifier_list_append(sql, args)
         sql << PAREN_CLOSE
        end
        sql << AS
        literal_dataset_append(sql, w[:dataset])
        c ||= true
      end
      sql << SPACE
    end
    alias delete_with_sql select_with_sql
    alias insert_with_sql select_with_sql
    alias update_with_sql select_with_sql
    
    # The base keyword to use for the SQL WITH clause
    def select_with_sql_base
      SQL_WITH
    end

    # Whether the symbol cache should be skipped when literalizing the dataset
    def skip_symbol_cache?
      @skip_symbol_cache
    end

    # Set the dataset to skip the symbol cache
    def skip_symbol_cache!
      @skip_symbol_cache = true
    end

    # Append literalization of array of sources/tables to SQL string, raising an Error if there
    # are no sources.
    def source_list_append(sql, sources)
      raise(Error, 'No source specified for query') if sources.nil? || sources == []
      identifier_list_append(sql, sources)
    end
    
    # Delegate to Sequel.split_symbol.
    def split_symbol(sym)
      Sequel.split_symbol(sym)
    end

    # The string that is appended to to create the SQL query, the empty
    # string by default
    def sql_string_origin
      String.new
    end
    
    # SQL to use if this dataset uses static SQL.  Since static SQL
    # can be a PlaceholderLiteralString in addition to a String,
    # we literalize nonstrings.  If there is an append_sql for this
    # dataset, append to that SQL instead of returning the value.
    def static_sql(sql)
      if append_sql = @opts[:append_sql]
        if sql.is_a?(String)
          append_sql << sql
        else
          literal_append(append_sql, sql)
        end
      else
        if sql.is_a?(String)
          sql
        else
          literal(sql)
        end
      end
    end

    # Append literalization of the subselect to SQL String.
    def subselect_sql_append(sql, ds)
      ds.clone(:append_sql=>sql).sql
    end

    # The number of decimal digits of precision to use in timestamps.
    def timestamp_precision
      supports_timestamp_usecs? ? 6 : 0
    end

    def update_table_sql(sql)
      sql << SPACE
      source_list_append(sql, @opts[:from])
      select_join_sql(sql) if supports_modifying_joins?
    end

    def update_set_sql(sql)
      sql << SET
      values = @opts[:values]
      if values.is_a?(Hash)
        update_sql_values_hash(sql, values)
      else
        sql << values
      end
    end

    def update_sql_values_hash(sql, values)
      c = false
      eq = EQUAL
      values.each do |k, v|
        sql << COMMA if c
        if k.is_a?(String) && !k.is_a?(LiteralString)
          quote_identifier_append(sql, k)
        else
          literal_append(sql, k)
        end
        sql << eq
        literal_append(sql, v)
        c ||= true
      end
    end

    def update_update_sql(sql)
      sql << UPDATE
    end
  end
end
