File: mockDockerDaemon.py

package info (click to toggle)
subuser 0.6.2-3.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,216 kB
  • sloc: python: 5,204; sh: 380; makefile: 73; javascript: 43
file content (99 lines) | stat: -rwxr-xr-x 3,673 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument

"""
In order to make our test suit work, we must use a MockDockerDaemon rather than communicating with a real Docker instance.
"""

#external imports
import json
import os
from collections import OrderedDict
#internal imports
from subuserlib.classes.userOwnedObject import UserOwnedObject
import subuserlib.classes.docker.dockerDaemon
import subuserlib.print

class MockDockerDaemon(UserOwnedObject):
  def __init__(self,user):
    self.images = {}
    self.nextImageId = 1
    self.newId = None
    UserOwnedObject.__init__(self,user)
    self.imagesPath = os.path.join(user.homeDir,"docker/images.json")
    self.__load()
    self.dockerDaemon = subuserlib.classes.docker.dockerDaemon.RealDockerDaemon(user)
    self.connection = MockConnection(self)
    self.dockerDaemon.getConnection = self.getConnection
    self.dockerDaemon.getImageProperties = self.getImageProperties

  def __load(self):
    with open(self.imagesPath,"r") as imagesFile:
      self.images = json.load(imagesFile, object_pairs_hook=OrderedDict)

  def __save(self):
    with open(self.imagesPath,"w") as imagesFile:
      json.dump(self.images,imagesFile)

  def getConnection(self):
    return self.connection

  def getImageProperties(self,imageTagOrId):
    """
     Returns a dictionary of image properties, or None if the image does not exist.
    """
    if imageTagOrId in self.images:
      return self.images[imageTagOrId]
    else:
      return None

  def build(self,relativeBuildContextPath=None,repositoryFileStructure=None,useCache=True,rm=True,forceRm=True,quiet=False,quietClient=False,tag=None,dockerfile=None):
    """
    Build a Docker image.  If a the dockerfile argument is set to a string, use that string as the Dockerfile.  Return the newly created images Id or raises an exception if the build fails.
    """
    while str(self.nextImageId) in self.images:
      self.nextImageId = self.nextImageId+1
    self.newId = str(self.nextImageId)
    parent = dockerfile.split("\n")[0].split(" ")[1].rstrip()
    if "debian" in dockerfile:
      parent = ""
    self.images[self.newId] = {"Id":self.newId,"Parent":parent,"Created":str(len(self.images))}
    self.__save()
    self.dockerDaemon.build(relativeBuildContextPath=relativeBuildContextPath,repositoryFileStructure=repositoryFileStructure,useCache=useCache,rm=rm,forceRm=forceRm,quiet=quiet,tag=tag,dockerfile=dockerfile,quietClient=quietClient)
    return self.newId

  def removeImage(self,imageId):
    del self.images[imageId]
    self.__save()

  def getInfo(self):
    return {"Foo":"bar"}

  def execute(self,args,cwd=None,background=False,backgroundSuppressOutput=True,backgroundCollectStdout=False,backgroundCollectStderr=False):
    subuserlib.print.printWithoutCrashing("Execute docker with args: "+str(args))
    subuserlib.print.printWithoutCrashing("Cwd:"+str(cwd))

class MockResponse():
  def __init__(self,mockDockerDaemon):
    self.mockDockerDaemon = mockDockerDaemon
    self.status = 200
    self.body = b"{\"stream\":\"Building"+"→→→".encode("utf-8")+b"\"}\n{\"stream\":\"Building...\"}\n{\"stream\":\"Building...\"}\n{\"stream\":\"Successfully built "+mockDockerDaemon.newId.encode("utf-8")+b"\"}"

  def read(self,bytes=None):
    if bytes:
      value = self.body[:bytes]
      self.body = self.body[bytes:]
      return value
    else:
      return self.body

class MockConnection():
  def __init__(self,mockDockerDaemon):
    self.mockDockerDaemon = mockDockerDaemon
    self.newId = None

  def request(self,method,url,body=None,headers=None):
    pass

  def getresponse(self):
    return MockResponse(self.mockDockerDaemon)