File: save_page_blob.rb

package info (click to toggle)
ruby-gitlab-fog-azure-rm 1.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 756 kB
  • sloc: ruby: 5,926; sh: 9; makefile: 4
file content (111 lines) | stat: -rw-r--r-- 3,888 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
module Fog
  module AzureRM
    class Storage
      # This class provides the actual implementation for service calls.
      class Real
        # This class is used to store chunk data for page blob before uploading.
        class BlobChunk
          attr_reader :id # For debug
          attr_reader :start_range
          attr_reader :data

          def initialize(id, start_range, data)
            @id = id
            @start_range = start_range
            @data = data
          end

          def end_range
            @start_range + @data.size - 1
          end
        end

        # This class is a stream to read chunk data.
        class BlobFileStream
          def initialize(body)
            if body.respond_to?(:read)
              if body.respond_to?(:rewind)
                begin
                  body.rewind
                rescue => ex
                  Fog::Logger.debug "save_page_blob - body responds to :rewind but throws an exception when calling :rewind: #{ex.inspect}"
                end
              end
              @stream = body
            else
              @stream = StringIO.new(body)
            end
            @mutex = Mutex.new
            @count = 0
          end

          def read(size)
            data = nil
            id = 0
            start_range = 0
            @mutex.synchronize do
              start_range = @stream.pos
              data = @stream.read(size)
              return nil if data.nil?
              @count += 1
              id = @count
            end
            BlobChunk.new(id, start_range, data)
          end
        end

        def save_page_blob(container_name, blob_name, body, options)
          threads_num = options.delete(:worker_thread_num)
          threads_num = Fog::AzureRM::UPLOAD_BLOB_WORKER_THREAD_COUNT if threads_num.nil? || !threads_num.is_a?(Integer) || threads_num < 1

          begin
            blob_size = Fog::Storage.get_body_size(body)
            raise "The page blob size must be aligned to a 512-byte boundary. But the file size is #{blob_size}." if (blob_size % 512).nonzero?

            # Initiate the upload
            Fog::Logger.debug "Creating the page blob #{container_name}/#{blob_name}. options: #{options}"
            create_page_blob(container_name, blob_name, blob_size, options)
            options.delete(:content_md5)

            # Uploading content
            iostream = BlobFileStream.new(body)

            threads = []
            threads_num.times do |id|
              thread = Thread.new do
                Fog::Logger.debug "Created upload thread #{id}."
                while (chunk = iostream.read(Fog::AzureRM::MAXIMUM_CHUNK_SIZE))
                  Fog::Logger.debug "Upload thread #{id} is uploading #{chunk.id}, start_range: #{chunk.start_range}, size: #{chunk.data.size}."
                  put_blob_pages(container_name, blob_name, chunk.start_range, chunk.end_range, chunk.data, options) if Digest::MD5.hexdigest(chunk.data) != Fog::AzureRM::HASH_OF_4MB_EMPTY_CONTENT
                end
                Fog::Logger.debug "Upload thread #{id} finished."
              end
              thread.abort_on_exception = true
              threads << thread
            end

            threads.each(&:join)
          rescue
            # Abort the upload & reraise
            begin
              delete_blob(container_name, blob_name)
            rescue => ex
              Fog::Logger.debug "Cannot delete the blob: #{container_name}/#{blob_name} after save_page_blob failed. #{ex.inspect}"
            end
            raise
          end

          Fog::Logger.debug "Successfully save the page blob: #{container_name}/#{blob_name}."
          true
        end
      end

      # This class provides the mock implementation for unit tests.
      class Mock
        def save_page_blob(*)
          true
        end
      end
    end
  end
end