1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
# encoding: utf-8
module Origin
# Represents an aggregation pipeline.
#
# @since 2.0.0
class Pipeline < Array
# @attribute [r] aliases The field aliases.
attr_reader :aliases
# Deep copy the aggregation pipeline. Will clone all the values in the
# pipeline as well as the pipeline itself.
#
# @example Deep copy the pipeline.
# pipeline.__deep_copy__
#
# @return [ Pipeline ] The cloned pipeline.
#
# @since 2.0.0
def __deep_copy__
self.class.new(aliases) do |copy|
each do |entry|
copy.push(entry.__deep_copy__)
end
end
end
# Add a group operation to the aggregation pipeline.
#
# @example Add a group operation.
# pipeline.group(:count.sum => 1, :max.max => "likes")
#
# @param [ Hash ] entry The group entry.
#
# @return [ Pipeline ] The pipeline.
#
# @since 2.0.0
def group(entry)
push("$group" => evolve(entry.__expand_complex__))
end
# Initialize the new pipeline.
#
# @example Initialize the new pipeline.
# Origin::Pipeline.new(aliases)
#
# @param [ Hash ] aliases A hash of mappings from aliases to the actual
# field names in the database.
#
# @since 2.0.0
def initialize(aliases = {})
@aliases = aliases
yield(self) if block_given?
end
# Adds a $project entry to the aggregation pipeline.
#
# @example Add the projection.
# pipeline.project(name: 1)
#
# @param [ Hash ] entry The projection.
#
# @return [ Pipeline ] The pipeline.
def project(entry)
push("$project" => evolve(entry))
end
# Add the $unwind entry to the pipeline.
#
# @example Add the unwind.
# pipeline.unwind(:field)
#
# @param [ String, Symbol ] field The name of the field.
#
# @return [ Pipeline ] The pipeline.
#
# @since 2.0.0
def unwind(field)
normalized = field.to_s
name = aliases[normalized] || normalized
push("$unwind" => name.__mongo_expression__)
end
private
# Evolve the entry using the aliases.
#
# @api private
#
# @example Evolve the entry.
# pipeline.evolve(name: 1)
#
# @param [ Hash ] entry The entry to evolve.
#
# @return [ Hash ] The evolved entry.
#
# @since 2.0.0
def evolve(entry)
aggregate = Selector.new(aliases)
entry.each_pair do |field, value|
aggregate.merge!(field.to_s => value)
end
aggregate
end
end
end
|