1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
module Fog
module AWS
class DataPipeline
module Shared
class JSONObject
def initialize(object)
@json_fields = object.clone
@id = @json_fields.delete('id')
@name = @json_fields.delete('name') || @id
end
def to_api
{
'id' => @id,
'name' => @name,
'fields' => fields
}
end
private
def fields
@json_fields.map{|k,v| field_for_kv(k,v)}.flatten
end
def field_for_kv(key, value)
if value.is_a?(Hash)
{ 'key' => key, 'refValue' => value['ref'], 'stringValue' => value['stringValue'] }
elsif value.is_a?(Array)
value.map { |subvalue| field_for_kv(key, subvalue) }
else
{ 'key' => key, 'stringValue' => value }
end
end
end
# Take a list of pipeline object hashes as specified in the Data Pipeline JSON format
# and transform it into the format expected by the API
def transform_objects(objects)
objects.map { |object| JSONObject.new(object).to_api }
end
end
class Real
include Shared
# Put raw pipeline definition JSON
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_PutPipelineDefinition.html
# ==== Parameters
# * PipelineId <~String> - The ID of the pipeline
# * PipelineObjects <~String> - Objects in the pipeline
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def put_pipeline_definition(id, pipeline_objects, options={})
params = {
'pipelineId' => id,
'pipelineObjects' => transform_objects(pipeline_objects),
}.merge(options)
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.PutPipelineDefinition' },
})
end
end
class Mock
include Shared
def put_pipeline_definition(id, pipeline_objects, _options={})
response = Excon::Response.new
options = _options.dup
pipeline = find_pipeline(id)
stringified_objects = if pipeline_objects.any?
transform_objects(stringify_keys(pipeline_objects))
else
options.each { |k,v| options[k] = transform_objects(stringify_keys(v)) }
end
if stringified_objects.is_a?(Array)
stringified_objects = {"pipelineObjects" => stringified_objects}
end
self.data[:pipeline_definitions][id] = stringified_objects
response.body = {"errored" => false, "validationErrors" => [], "validationWarnings" => []}
response
end
end
end
end
end
|