# Copyright (C) 2009 Aaron Bentley
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


from cStringIO import StringIO
import os

from bzrlib import (
    branch,
    bzrdir,
    conflicts,
    directory_service,
    errors,
    revision,
    switch,
    workingtree,
    tests,
    transform,
)
from bzrlib.plugins.pipeline.pipeline import (
    BranchInTheWay,
    ChangesAlreadyStored,
    DuplicatePipe,
    dwim_pipe,
    NoSuchPipe,
    PipeManager,
    PipeStorage,
    SwitchWithConflicts,
    tree_to_pipeline,
    UnknownRemotePipe,
)
from bzrlib.plugins.pipeline.tests import TestCaseWithPipes


class TestPipeStorage(TestCaseWithPipes):

    def test_connect(self):
        bar = self.make_branch('bar')
        baz = self.make_branch('baz')
        PipeStorage.connect(bar, baz)
        bar_manager = PipeStorage(bar)
        self.assertEqual('../baz/', bar_manager.get_next())
        self.assertIs(None, bar_manager.get_prev())
        baz_manager = PipeStorage(baz)
        self.assertEqual('../bar/', baz_manager.get_prev())
        self.assertIs(None, baz_manager.get_next())

    def test_connect_relative_url(self):
        foo = self.make_branch('foo')
        bar = self.make_branch('bar')
        PipeStorage.connect(foo, bar)
        self.assertEqual('../bar/', PipeStorage(foo).get_next())
        self.assertEqual('../foo/', PipeStorage(bar).get_prev())

    def test_disconnect_middle(self):
        foo = self.make_branch('foo')
        foo_manager = PipeManager(foo)
        bar = foo_manager.insert_pipe('bar')
        bar_manager = PipeManager(bar)
        baz = bar_manager.insert_pipe('baz')
        bar_manager.storage.disconnect()
        self.assertEqual('../baz/', foo_manager.storage.get_next())
        self.assertEqual('../foo/', PipeStorage(baz).get_prev())
        self.assertIs(None, bar_manager.get_next_pipe())

    def test_disconnect_first(self):
        foo = self.make_branch('foo')
        foo_manager = PipeManager(foo)
        bar = foo_manager.insert_pipe('bar')
        bar_manager = PipeStorage(bar)
        foo_manager.storage.disconnect()
        self.assertIs(None, foo_manager.get_next_pipe())
        self.assertIs(None, bar_manager.get_prev())

    def test_disconnect_last(self):
        foo = self.make_branch('foo')
        foo_manager = PipeManager(foo)
        bar = foo_manager.insert_pipe('bar')
        bar_manager = PipeStorage(bar)
        bar_manager.disconnect()
        self.assertIs(None, foo_manager.get_next_pipe())
        self.assertIs(None, bar_manager.get_prev())

    def test_get_put_transform(self):
        manager = PipeStorage(self.make_branch('branch'))
        self.assertIs(None, manager._get_transform())
        manager._put_transform(StringIO('Hello'))
        self.assertEqual('Hello', manager._get_transform().read())

    def test_put_transform_none(self):
        manager = PipeStorage(self.make_branch('branch'))
        manager._put_transform(StringIO('Hello'))
        manager._put_transform(None)
        self.assertIs(None, manager._get_transform())
        manager._put_transform(None)


class TestPipeManager(TestCaseWithPipes):

    def assertBaseEqual(self, left, right):
        self.assertEqual([l.base for l in left], [r.base for r in right])

    def test_insert_pipe(self):
        foo = self.make_branch('foo')
        baz = self.make_branch('baz')
        PipeStorage.connect(foo, baz)
        manager = PipeManager(foo)
        bar = manager.insert_pipe('bar')
        self.assertBaseEqual([foo, bar, baz], manager.list_pipes())

    def test_insert_pipe_before(self):
        foo = self.make_branch('foo')
        manager = PipeManager(foo)
        bar = manager.insert_pipe('bar', before=True)
        self.assertBaseEqual([bar, foo], manager.list_pipes())

    def test_insert_pipe_before_middle(self):
        foo = self.make_branch('foo')
        baz = self.make_branch('baz')
        PipeStorage.connect(foo, baz)
        manager = PipeManager(baz)
        bar = manager.insert_pipe('bar', before=True)
        self.assertBaseEqual([foo, bar, baz], manager.list_pipes())

    def test_insert_pipe_revision(self):
        foo = self.make_branch_and_tree('foo')
        rev1 = foo.commit('rev1')
        foo.commit('rev2')
        manager = PipeManager(foo.branch)
        bar = manager.insert_pipe('bar', rev1)
        self.assertEqual(rev1, bar.last_revision())

    def test_insert_pipe_checks_dupes(self):
        foo = self.make_branch_and_tree('foo')
        manager = PipeManager(foo.branch)
        e = self.assertRaises(DuplicatePipe, manager.insert_pipe, 'foo')
        self.assertEqual('There is already a pipe named "foo."', str(e))

    def create_long_pipeline(self):
        foo = self.make_branch('foo')
        bar = self.make_branch('bar')
        baz = self.make_branch('baz')
        qux = self.make_branch('qux')
        quxx = self.make_branch('quxx')
        self.connect_many(foo, bar, baz, qux, quxx)
        return [foo, bar, baz, qux, quxx]

    def test_list_pipes(self):
        pipe_list = self.create_long_pipeline()
        self.assertBaseEqual(pipe_list,
                             PipeManager(pipe_list[2]).list_pipes())

    def test_find_pipe(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[2])
        found = manager.find_pipe('foo')
        self.assertEqual(pipe_list[0].base, manager.find_pipe('foo').base)
        self.assertEqual(pipe_list[4].base, manager.find_pipe('quxx').base)

    def test_find_pipe_not_present(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[2])
        e = self.assertRaises(NoSuchPipe, manager.find_pipe, 'not-present')
        self.assertEqual('There is no pipe with nick "not-present".', str(e))

    def test_get_first_pipe(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[2])
        self.assertEqual(pipe_list[0].base, manager.get_first_pipe().base)

    def test_get_first_pipe_from_first(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[0])
        self.assertEqual(pipe_list[0].base, manager.get_first_pipe().base)

    def test_get_last_pipe(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[2])
        self.assertEqual(pipe_list[-1].base, manager.get_last_pipe().base)

    def test_get_first_pipe_from_first(self):
        pipe_list = self.create_long_pipeline()
        manager = PipeManager(pipe_list[-1])
        self.assertEqual(pipe_list[-1].base, manager.get_last_pipe().base)

    def store_uncommitted(self):
        tree = self.make_branch_and_tree('tree')
        tree.commit('get root in there')
        self.build_tree_contents([('tree/file', 'content')])
        tree.add('file', 'file-id')
        manager = PipeManager(tree.branch)
        manager.store_uncommitted(tree)
        return tree, manager

    def test_store_uncommitted(self):
        self.store_uncommitted()
        self.failIfExists('tree/file')

    def test_store_uncommitted_no_change(self):
        tree = self.make_branch_and_tree('tree')
        tree.commit('get root in there')
        manager = PipeManager(tree.branch)
        manager.store_uncommitted(tree)
        self.assertFalse(manager.has_stored_changes())

    def test_store_uncommitted_errors_on_overwrite(self):
        branch = self.make_branch('branch')
        checkout = branch.create_checkout('checkout', lightweight=True)
        manager = PipeManager(branch)
        manager.storage._put_transform(StringIO('Hello'))
        self.build_tree(['checkout/file'])
        checkout.add('file')
        e = self.assertRaises(ChangesAlreadyStored, manager.store_uncommitted,
                              checkout)
        self.assertEqual('Cannot store uncommitted changes because this pipe'
                         ' already stores uncommitted changes.', str(e))

    def test_store_all(self):
        tree = self.make_branch_and_tree('tree')
        self.build_tree_contents([('tree/file', 'contents1')])
        tree.add('file')
        tree.commit('message')
        self.build_tree_contents([('tree/file', 'contents2')])
        PipeManager(tree.branch).store_all(tree)
        self.assertFileEqual('contents1', 'tree/file')

    def test_restore_uncommitted(self):
        tree, manager = self.store_uncommitted()
        manager.restore_uncommitted(tree)
        self.failUnlessExists('tree/file')
        self.assertFalse(None, manager.has_stored_changes())

    def test_restore_uncommitted_none(self):
        tree = self.make_branch_and_tree('tree')
        manager = PipeManager(tree.branch)
        manager.restore_uncommitted(tree)

    def test_restore_uncommitted_no_delete(self):
        tree, manager = self.store_uncommitted()
        manager.restore_uncommitted(tree, delete=False)
        self.assertTrue(manager.has_stored_changes())

    def make_checkout_first_second(self):
        tree = self.make_branch_and_checkout('first')
        tree.commit('get root in there', rev_id='rev1-id')
        manager = PipeManager(tree.branch)
        second = manager.insert_pipe('second')
        tree.commit('second commit.', rev_id='rev2-id')
        return tree, manager, second

    def test_switch_to_pipe(self):
        tree, manager, second = self.make_checkout_first_second()
        manager.switch_to_pipe(tree, second)
        tree = workingtree.WorkingTree.open('checkout')
        self.assertEqual(second.base, tree.branch.base)
        self.assertEqual('rev1-id', tree.branch.last_revision())

    def test_switch_to_pipe_store_restore(self):
        tree, manager, second = self.make_checkout_first_second()
        self.build_tree(['checkout/file'])
        tree.add('file')
        manager.switch_to_pipe(tree, second)
        self.failIfExists('checkout/file')
        tree = workingtree.WorkingTree.open('checkout')
        first = branch.Branch.open('first')
        manager = PipeManager(tree.branch).switch_to_pipe(tree, first)
        self.failUnlessExists('checkout/file')
        self.assertFalse(manager.storage.has_stored_changes())
        tree = workingtree.WorkingTree.open('checkout')
        self.assertEqual(manager.storage.branch.base, tree.branch.base)
        self.assertEqual('rev2-id', tree.last_revision())

    def test_switch_to_pipe_restore_outdated(self):
        tree, manager, second = self.make_checkout_first_second()
        checkout2 = tree.branch.create_checkout('checkout2', lightweight=True)
        self.build_tree(['checkout/file'])
        tree.add('file')
        manager.switch_to_pipe(tree, second)
        self.build_tree(['checkout2/file2'])
        checkout2.add('file2')
        checkout2.commit('rev3', rev_id='rev3-id')
        tree = workingtree.WorkingTree.open('checkout')
        first = branch.Branch.open('first')
        manager = PipeManager(tree.branch).switch_to_pipe(tree, first)
        self.assertFalse(manager.storage.has_stored_changes())
        tree = workingtree.WorkingTree.open('checkout')
        self.assertEqual(manager.storage.branch.base, tree.branch.base)
        self.assertEqual('rev3-id', tree.last_revision())
        self.failUnlessExists('checkout/file2')

    def test_switch_to_pipe_local_conflicts(self):
        tree, manager, second = self.make_checkout_first_second()
        tree.set_conflicts(conflicts.ConflictList(
            [conflicts.ContentsConflict('lala')]))
        self.assertRaises(SwitchWithConflicts, manager.switch_to_pipe, tree,
                          second)

    def do_conflict_free_pipeline_merge(self):
        one = self.make_branch('one')
        checkout = one.create_checkout('checkout', lightweight=True)
        checkout.commit('rev1')
        pipe_one = PipeManager(one)
        two = pipe_one.insert_pipe('two')
        self.build_tree(['checkout/one'])
        checkout.add('one')
        rev2 = checkout.commit('rev2', rev_id='rev2-id')
        switch.switch(checkout.bzrdir, two)
        checkout = workingtree.WorkingTree.open('checkout')
        self.build_tree(['checkout/two'])
        checkout.add('two')
        rev3 = checkout.commit('rev3', rev_id='rev3-id')
        switch.switch(checkout.bzrdir, one)
        checkout = workingtree.WorkingTree.open('checkout')
        pipe_one.pipeline_merge(checkout)
        checkout = workingtree.WorkingTree.open('checkout')
        self.assertEqual('one', checkout.branch.nick)
        return two.basis_tree()

    def test_pipeline_merge(self):
        """When we do a pipeline merge, we merge into later pipes.

        When finished, we return to the initial pipe.
        """
        two_basis = self.do_conflict_free_pipeline_merge()
        self.assertIsNot(None, two_basis.path2id('two'))
        self.assertIsNot(None, two_basis.path2id('one'))

    def test_pipeline_merge_updates_parents(self):
        """When we do a pipeline merge, we merge into later pipes.

        When finished, we return to the initial pipe.
        """
        two_basis = self.do_conflict_free_pipeline_merge()
        self.assertEqual(['rev3-id', 'rev2-id'], two_basis.get_parent_ids())

    def pipeline_merge(self, trees):
        self.connect_many(*[t.branch for t in trees])
        checkout = trees[0].branch.create_checkout('checkout',
                                                   lightweight=True)
        PipeManager(checkout.branch).pipeline_merge(checkout)
        return workingtree.WorkingTree.open('checkout')

    def test_pipeline_merge_stops_on_conflict(self):
        """When a merge is encountered, pipeline merging stops."""
        one = self.make_branch_and_tree('one')
        self.build_tree_contents([('one/conflictable', 'base')])
        one.add('conflictable')
        one.commit('base commit')
        two = one.bzrdir.sprout('two').open_workingtree()
        self.build_tree_contents([('one/conflictable', 'other')])
        one.commit('other commit')
        self.build_tree_contents([('two/conflictable', 'this')])
        this_revision = two.commit('this commit')
        checkout = self.pipeline_merge([one, two])
        self.assertEqual('two', checkout.branch.nick)
        self.assertEqual(this_revision, checkout.branch.last_revision())
        self.assertFileEqual('this', 'checkout/conflictable.THIS')
        self.assertFileEqual('other', 'checkout/conflictable.OTHER')

    def test_pipeline_merge_skips_unchanged(self):
        """Changes are only committed if there was an actual change."""
        one = self.make_branch_and_tree('one')
        one.commit('base commit', rev_id='rev1-id')
        two = one.bzrdir.sprout('two').open_workingtree()
        three = one.bzrdir.sprout('three').open_workingtree()
        two.commit('new commit', rev_id='rev2-id')
        self.pipeline_merge([one, two, three])
        self.assertEqual('rev2-id', two.branch.last_revision())
        self.assertEqual('rev2-id', three.branch.last_revision())

    def test_pipeline_merge_stores_and_restores_uncommitted(self):
        """When a merge is encountered, pipeline merging stops."""
        one = self.make_branch('one')
        checkout = one.create_checkout('checkout', lightweight=True)
        self.build_tree(['checkout/file', 'checkout/uncommitted'])
        checkout.add('file')
        checkout.commit('base commit')
        manager = PipeManager(one)
        two = manager.insert_pipe('two')
        checkout.commit('pointless commit')
        checkout.add('uncommitted')
        manager.pipeline_merge(checkout)
        self.assertIs(None, two.basis_tree().path2id('uncommitted'))
        self.assertIsNot(None, checkout.path2id('uncommitted'))

    def test_pipeline_merge_from_submit(self):
        submit = self.make_branch_and_tree('submit')
        submit.commit('empty commit')
        one = submit.bzrdir.sprout('one').open_branch()
        one.set_submit_branch(submit.branch.base)
        self.build_tree(['submit/submit-file'])
        submit.add('submit-file')
        submit.commit('added submit-file.')
        checkout = one.create_checkout('checkout', lightweight=True)
        PipeManager(checkout.branch).pipeline_merge(checkout, from_submit=True)
        basis = checkout.branch.basis_tree()
        self.assertIsNot(None, basis.path2id('submit-file'))
        self.failUnlessExists('checkout/submit-file')

    def test_pipeline_merge_from_submit_no_change(self):
        submit = self.make_branch_and_tree('submit')
        submit.commit('empty commit')
        one = submit.bzrdir.sprout('one').open_branch()
        one.set_submit_branch(submit.branch.base)
        self.build_tree(['submit/submit-file'])
        checkout = one.create_checkout('checkout', lightweight=True)
        PipeManager(checkout.branch).pipeline_merge(checkout, from_submit=True)

    def test_pipeline_merge_pulls_if_possible(self):
        tree = self.make_branch_and_checkout('first')
        tree.commit('first commit')
        first_manager = PipeManager(tree.branch)
        second = first_manager.insert_pipe('second')
        third = PipeManager(second).insert_pipe('third')
        tree.commit('second commit')
        first_manager.switch_to_pipe(tree, second)
        tree = workingtree.WorkingTree.open('checkout')
        tree.commit('second pipe commit')
        first_manager.pipeline_merge(tree)
        self.assertEqual(third.last_revision(), second.last_revision())

    def test_sync_pipeline_creates_branches(self):
        self.build_tree(['a/', 'b/'])
        a_foo = PipeManager(self.make_branch('a/foo'))
        a_foo.insert_pipe('bar')
        a_foo.sync_pipeline('b/foo')
        b_foo = PipeManager(branch.Branch.open('b/foo'))
        b_bar = PipeManager(branch.Branch.open('b/bar'))
        self.assertEqual('../bar/', b_foo.storage.get_next())
        self.assertEqual('../foo/', b_bar.storage.get_prev())

    def test_sync_add_pipe(self):
        self.build_tree(['local/', 'remote/'])
        foo = PipeManager(self.make_branch('local/foo'))
        foo.sync_pipeline('remote/foo')
        remote_foo = PipeManager(branch.Branch.open('remote/foo'))
        self.assertIs(None, remote_foo.get_next_pipe())
        foo.insert_pipe('bar')
        foo.sync_pipeline('remote/foo')
        self.assertEqual('../bar/', remote_foo.storage.get_next())

    def test_sync_add_pipe_three(self):
        self.build_tree(['local/', 'remote/'])
        foo = PipeManager(self.make_branch('local/foo'))
        bar = PipeManager(foo.insert_pipe('bar'))
        foo.sync_pipeline('remote/foo')
        remote_foo = PipeManager(branch.Branch.open('remote/foo'))
        self.assertEqual('../bar/', remote_foo.storage.get_next())
        bar.insert_pipe('baz')
        foo.sync_pipeline('remote/foo')
        self.assertEqual('../bar/', remote_foo.storage.get_next())

    def test_sync_pipeline_push_pull(self):
        self.build_tree(['local/', 'remote/'])
        foo = PipeManager(self.make_branch('local/foo'))
        checkout = foo.storage.branch.create_checkout('local/checkout',
                                                      lightweight=True)
        checkout.commit('rev1', rev_id='rev1-id')
        bar = foo.insert_pipe('bar')
        foo.insert_pipe('baz')
        # Not part of the test.  Just the easiest way to copy a pipeline.
        foo.sync_pipeline('remote/foo')
        checkout.commit('rev2', rev_id='rev2-id')
        remote_bar = branch.Branch.open('remote/bar')
        remote_checkout = remote_bar.create_checkout('remote/checkout',
                                                     lightweight=True)
        remote_checkout.commit('rev3', rev_id='rev3-id')
        foo.sync_pipeline('remote/foo')
        remote_foo = branch.Branch.open('remote/foo')
        self.assertEqual('rev2-id', remote_foo.last_revision())
        self.assertEqual('rev3-id', bar.last_revision())

    def test_sync_pipeline_diverged(self):
        self.build_tree(['local/', 'remote/'])
        foo = PipeManager(self.make_branch('local/foo'))
        foo.sync_pipeline('remote/foo')
        checkout = foo.storage.branch.create_checkout('local/checkout',
                                                      lightweight=True)
        checkout.commit('rev1', rev_id='rev1-id')
        remote_foo = branch.Branch.open('remote/foo')
        remote_checkout = remote_foo.create_checkout('remote/checkout',
                                                     lightweight=True)
        remote_checkout.commit('rev3', rev_id='rev3-id')
        self.assertRaises(errors.DivergedBranches, foo.sync_pipeline,
                          'remote/foo')

    def test_sync_pipeline_sets_push_location(self):
        self.build_tree(['local/', 'remote/'])
        foo = self.make_branch('local/foo')
        self.assertIs(None, foo.get_push_location())
        foo_manager = PipeManager(foo)
        foo_manager.sync_pipeline('remote/foo')
        self.assertContainsRe(foo.get_push_location(), 'remote/foo/$')

    def test_sync_pipeline_retains_previous_push_location(self):
        self.build_tree(['local/', 'remote/'])
        foo = self.make_branch('local/foo')
        foo.set_push_location('bar')
        foo_manager = PipeManager(foo)
        foo_manager.sync_pipeline('remote/foo')
        self.assertEqual(foo.get_push_location(), 'bar')

    def test_sync_pipeline_preserves_revision_id_on_create(self):
        tree = self.make_branch_and_checkout('first')
        tree.commit('rev1')
        first_manager = PipeManager(tree.branch)
        second_pipe = first_manager.insert_pipe('second')
        first_manager.switch_to_pipe(tree, second_pipe)
        tree = workingtree.WorkingTree.open('checkout')
        tree.commit('rev2', rev_id='rev2-id')
        os.mkdir('remote')
        first_manager.sync_pipeline('remote/first')
        remote_second = branch.Branch.open('remote/second')
        self.assertEqual('rev2-id', remote_second.last_revision())

    def test_sync_pipeline_errors_on_bad_target(self):
        tree = self.make_branch_and_checkout('first')
        manager = PipeManager(tree.branch)
        os.mkdir('remote')
        e = self.assertRaises(UnknownRemotePipe, manager.sync_pipeline,
                              'remote/random')
        self.assertEqual('Pipeline has no pipe named "random".', str(e))

    def prepare_use_existing(self):
        os.mkdir('local')
        tree = self.make_branch_and_checkout('local/first')
        manager = PipeManager(tree.branch)
        manager.insert_pipe('second')
        os.mkdir('remote')
        remote_second = self.make_branch_and_tree('remote/second')
        return tree, manager, remote_second

    def test_sync_pipeline_uses_existing_remote_branches(self):
        tree, manager, remote_second = self.prepare_use_existing()
        manager.sync_pipeline('remote/first')

    def test_sync_pipeline_errors_if_remote_is_not_related(self):
        tree, manager, remote_second = self.prepare_use_existing()
        tree.commit('foo')
        tree.branch.push(manager.get_next_pipe())
        remote_second.commit('bar')
        e = self.assertRaises(BranchInTheWay, manager.sync_pipeline,
                              'remote/first')

    def test_sync_pipeline_errors_if_remote_in_pipeline(self):
        tree, manager, remote_second = self.prepare_use_existing()
        PipeManager(remote_second.branch).insert_pipe('third')
        e = self.assertRaises(BranchInTheWay, manager.sync_pipeline,
                              'remote/first')

    def test_merge_nick_order(self):
        """All nicks must appear once in the result.

        Order is preserved withinin non-matching sections.
        Order is based on the first item of a non-matching section."""
        result = PipeManager._merge_nick_order(['a', 'b', 'd'],
                                               ['a', 'c', 'd'])
        self.assertEqual(['a', 'b', 'c', 'd'], result)
        result = PipeManager._merge_nick_order(['a', 'c', 'd'],
                                               ['a', 'b', 'd'])
        self.assertEqual(['a', 'b', 'c', 'd'], result)
        result = PipeManager._merge_nick_order(['a', 'b', 'z'],
                                               ['a', 'c', 'y'])
        self.assertEqual(['a', 'b', 'z', 'c', 'y'], result)

    def test_connections_to_create(self):
        connections = PipeManager._connections_to_create(['a', 'b', 'c'], [])
        self.assertEqual([('a', 'b'), ('b', 'c')], connections)
        connections = PipeManager._connections_to_create(['a', 'b', 'c'],
                                                         ['a', 'b'])
        self.assertEqual([('b', 'c')], connections)
        connections = PipeManager._connections_to_create(['a', 'c'],
                                                         ['a', 'b', 'c'])
        self.assertEqual([('a', 'c')], connections)

    def test_get_prev_revision_id_second(self):
        checkout, manager, second = self.make_checkout_first_second()
        self.assertEqual('rev2-id', PipeManager(second).get_prev_revision_id())

    def test_get_prev_revision_id_first(self):
        branch = self.make_branch('first')
        self.assertIs(None, PipeManager(branch).get_prev_revision_id())

    def prepare_related(self):
        parent = self.make_branch_and_tree('parent')
        parent.commit('parent', rev_id='parent-1')
        child = self.make_branch_and_tree('child')
        child.pull(parent.branch)
        child.lock_write()
        self.addCleanup(child.unlock)
        parent.commit('parent', rev_id='parent-2')
        child.commit('child', rev_id='child-1')
        return parent, child

    def test_get_prev_revision_id_parent(self):
        parent, child = self.prepare_related()
        child.branch.set_parent(parent.branch.base)
        manager = PipeManager(child.branch)
        self.assertEqual('parent-1', manager.get_prev_revision_id())

    def test_get_prev_revision_id_submit(self):
        parent, child = self.prepare_related()
        child.branch.set_submit_branch(parent.branch.base)
        manager = PipeManager(child.branch)
        self.assertEqual('parent-1', manager.get_prev_revision_id())


class TestGetDiff(tests.TestCaseWithMemoryTransport):

    def commit_file(self, branch, path, contents):
        basis = branch.basis_tree()
        tt = transform.TransformPreview(basis)
        if basis.get_root_id() is None:
            root = tt.new_directory('', transform.ROOT_PARENT, 'root-id')
        existing_id = basis.path2id(path)
        if existing_id is None:
            tt.new_file(path, root, contents, path + '-id')
        else:
            trans_id = tt.trans_id_file_id(existing_id)
            tt.delete_contents(trans_id)
            tt.create_file(contents, trans_id)
        tt.commit(branch, 'message')

    def make_writeable_branch(self, path):
        branch = self.make_branch(path)
        branch.lock_write()
        self.addCleanup(branch.unlock)
        return branch

    def make_branch_with_parent(self):
        parent = self.make_writeable_branch('parent')
        self.commit_file(parent, 'file', 'A\n')
        child_url = parent.bzrdir.root_transport.clone('../child').base
        child = parent.bzrdir.sprout(child_url).open_branch()
        self.commit_file(parent, 'file', 'B\n')
        child.lock_write()
        self.addCleanup(child.unlock)
        self.commit_file(child, 'file', 'C\n')
        return child

    def test_write_patch_no_parent(self):
        """If no parent is defined for the first pipe, return None."""
        first_pipe = self.make_branch('first')
        patch = PipeManager(first_pipe).get_patch()
        self.assertIs(None, patch)

    def test_write_patch_first_parent(self):
        """If a parent is defined for the first pipe, use that."""
        first_pipe = self.make_branch_with_parent()
        patch = PipeManager(first_pipe).get_patch()
        self.assertContainsRe(patch, '-A\n\+C')

    def test_write_patch_first_submit(self):
        """If a submit branch is defined, it overrides the parent."""
        child = self.make_branch_with_parent()
        submit = self.make_writeable_branch('submit')
        submit.pull(child)
        child.set_submit_branch(submit.base)
        self.commit_file(child, 'file', 'D\n')
        patch = PipeManager(child).get_patch()
        self.assertContainsRe(patch, '-C\n\+D')

    def test_write_patch_second(self):
        """The previous pipe, if defined, overrides the submit branch."""
        submit = self.make_writeable_branch('submit')
        first = self.make_writeable_branch('first')
        self.commit_file(first, 'file', 'A\n')
        second = PipeManager(first).insert_pipe('second')
        second.lock_write()
        self.addCleanup(second.unlock)
        second.set_submit_branch(submit.base)
        self.commit_file(second, 'file', 'B\n')
        patch = PipeManager(second).get_patch()
        self.assertContainsRe(patch, '-A\n\+B')


class TestPipeDirectory(TestCaseWithPipes):

    def test_pipe_directories(self):
        foo = self.make_branch('foo')
        bar = self.make_branch('bar')
        baz = self.make_branch('baz')
        self.connect_many(foo, bar, baz)
        os.chdir('bar')
        self.assertEqual(baz.base,
            directory_service.directories.dereference(':next'))
        self.assertEqual(foo.base,
            directory_service.directories.dereference(':prev'))
        self.assertEqual(baz.base,
            directory_service.directories.dereference(':pipe:baz'))
        self.assertEqual(foo.base,
            directory_service.directories.dereference(':first'))
        self.assertEqual(baz.base,
            directory_service.directories.dereference(':last'))


class TestDwimPipe(TestCaseWithPipes):

    def test_dwim_pipe_no_location(self):
        checkout = self.make_branch_and_checkout('test')
        manager = PipeManager(checkout.branch)
        self.assertRaises(NoSuchPipe, dwim_pipe, manager, None)


class TestTreeToPipeline(TestCaseWithPipes):

    def test_tree_to_pipeline(self):
        tree = self.make_branch_and_tree('foo')
        tree.branch.nick = 'bar'
        tree.commit('rev1', rev_id='rev1-id')
        tree_to_pipeline(tree)
        b = branch.Branch.open('foo/.bzr/pipes/bar')
        self.assertEqual('rev1-id', b.last_revision())
        self.assertRaises(errors.NoWorkingTree,
                          workingtree.WorkingTree.open, 'foo/.bzr/pipes/bar')
        self.assertTrue(b.repository.is_shared())
        self.assertEqual(b.base, branch.Branch.open('foo').base)
        self.assertFalse(tree.is_ignored('pipes'))

    def test_tree_to_pipeline_lightweight_checkout(self):
        pipe_branch = self.make_branch('branch')
        pipe_checkout = pipe_branch.create_checkout('checkout',
                                                    lightweight=True)
        self.assertRaises(errors.AlreadyLightweightCheckout,
                          tree_to_pipeline, pipe_checkout)

    def test_tree_to_pipeline_reuse_shared_repo(self):
        repo = self.make_repository('repo', shared=True)
        branch = bzrdir.BzrDir.create_branch_convenience('repo/tree')
        tree = branch.bzrdir.open_workingtree()
        checkout = tree_to_pipeline(tree)
        self.assertEqual(repo.bzrdir.root_transport.base,
                         checkout.branch.repository.bzrdir.root_transport.base)
