File: test_git_merge.py

package info (click to toggle)
python-hypothesis 3.6.1-1%2Bdeb9u1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,788 kB
  • sloc: python: 15,048; sh: 226; makefile: 160
file content (140 lines) | stat: -rw-r--r-- 4,550 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# coding=utf-8
#
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis-python
#
# Most of this work is copyright (C) 2013-2016 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
#
# END HEADER

from __future__ import division, print_function, absolute_import

import base64
from collections import namedtuple

import hypothesis.strategies as s
from hypothesis import settings
from hypothesis.database import SQLiteExampleDatabase
from hypothesis.stateful import GenericStateMachine
from hypothesis.tools.mergedbs import merge_dbs

FORK_NOW = u'fork'
Insert = namedtuple(u'Insert', (u'key', u'value', u'target'))
Delete = namedtuple(u'Delete', (u'key', u'value', u'target'))


class BackendForTesting(SQLiteExampleDatabase):

    def __init__(self):
        super(BackendForTesting, self).__init__()
        self.create_db_if_needed()
        self.mirror = set()

    def save(self, key, value):
        super(BackendForTesting, self).save(key, value)
        self.mirror.add((key, value))

    def delete(self, key, value):
        super(BackendForTesting, self).delete(key, value)
        try:
            self.mirror.remove((key, value))
        except KeyError:
            pass

    def refresh_mirror(self):
        self.mirror = set()
        with self.cursor() as cursor:
            cursor.execute("""
                select key, value
                from hypothesis_data_mapping
            """)
            for r in cursor:
                self.mirror.add(tuple(map(base64.b64decode, r)))


class DatabaseMergingState(GenericStateMachine):

    def __init__(self):
        super(DatabaseMergingState, self).__init__()
        self.forked = False
        self.original = BackendForTesting()
        self.left = BackendForTesting()
        self.right = BackendForTesting()
        self.seen_strings = set()

    def values(self):
        base = s.binary()
        if self.seen_strings:
            return s.sampled_from(sorted(self.seen_strings)) | base
        else:
            return base

    def steps(self):
        values = self.values()
        if not self.forked:
            return (
                s.just(FORK_NOW) |
                s.builds(Insert, values, values, s.none()) |
                s.builds(Delete, values, values, s.none())
            )
        else:
            targets = s.sampled_from((self.left, self.right))
            return (
                s.builds(Insert, values, values, targets) |
                s.builds(Delete, values, values, targets)
            )

    def execute_step(self, step):
        if step == FORK_NOW:
            self.forked = True
        else:
            assert isinstance(step, (Insert, Delete))
            self.seen_strings.add(step.key)
            self.seen_strings.add(step.value)
            if self.forked:
                targets = (step.target,)
            else:
                targets = (self.original, self.left, self.right)
            for target in targets:
                if isinstance(step, Insert):
                    target.save(step.key, step.value)
                else:
                    assert isinstance(step, Delete)
                    target.delete(step.key, step.value)

    def teardown(self):
        target_mirror = (self.left.mirror | self.right.mirror) - (
            (self.original.mirror - self.left.mirror) |
            (self.original.mirror - self.right.mirror)
        )

        n_inserts = len(
            self.right.mirror - self.left.mirror - self.original.mirror)
        n_deletes = len(
            (self.original.mirror - self.right.mirror) & self.left.mirror)

        result = merge_dbs(
            self.original.connection(),
            self.left.connection(),
            self.right.connection()
        )
        assert result.inserts == n_inserts
        assert result.deletes == n_deletes
        self.left.refresh_mirror()
        self.original.close()
        self.left.close()
        self.right.close()
        assert self.left.mirror == target_mirror


TestMerging = DatabaseMergingState.TestCase
TestMerging.settings = settings(
    TestMerging.settings, timeout=60)