File: flow_e2e_test.py

package info (click to toggle)
errbot 6.1.7%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 3,712 kB
  • sloc: python: 13,831; makefile: 164; sh: 97
file content (132 lines) | stat: -rw-r--r-- 4,908 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from os import path
from queue import Empty

import pytest

extra_plugin_dir = path.join(path.dirname(path.realpath(__file__)), "flow_plugin")


def test_list_flows(testbot):
    assert len(testbot.bot.flow_executor.flow_roots) == 4
    testbot.bot.push_message("!flows list")
    result = testbot.pop_message()
    assert "documentation of W1" in result
    assert "documentation of W2" in result
    assert "documentation of W3" in result
    assert "documentation of W4" in result
    assert "w1" in result
    assert "w2" in result
    assert "w3" in result
    assert "w4" in result


def test_no_autotrigger(testbot):
    assert "a" in testbot.exec_command("!a")
    assert len(testbot.bot.flow_executor.in_flight) == 0


def test_autotrigger(testbot):
    assert "c" in testbot.exec_command("!c")
    flow_message = testbot.pop_message()
    assert "You are in the flow w2, you can continue with" in flow_message
    assert "!b" in flow_message
    assert len(testbot.bot.flow_executor.in_flight) == 1
    assert testbot.bot.flow_executor.in_flight[0].name == "w2"


def test_no_duplicate_autotrigger(testbot):
    assert "c" in testbot.exec_command("!c")
    flow_message = testbot.pop_message()
    assert "You are in the flow w2, you can continue with" in flow_message
    assert "c" in testbot.exec_command("!c")
    assert len(testbot.bot.flow_executor.in_flight) == 1
    assert testbot.bot.flow_executor.in_flight[0].name == "w2"


def test_secondary_autotrigger(testbot):
    assert "e" in testbot.exec_command("!e")
    second_message = testbot.pop_message()
    assert "You are in the flow w2, you can continue with" in second_message
    assert "!d" in second_message
    assert len(testbot.bot.flow_executor.in_flight) == 1
    assert testbot.bot.flow_executor.in_flight[0].name == "w2"


def test_manual_flow(testbot):
    assert "Flow w1 started" in testbot.exec_command("!flows start w1")
    flow_message = testbot.pop_message()
    assert "You are in the flow w1, you can continue with" in flow_message
    assert "!a" in flow_message
    assert "a" in testbot.exec_command("!a")
    flow_message = testbot.pop_message()
    assert "You are in the flow w1, you can continue with" in flow_message
    assert "!b" in flow_message
    assert "!c" in flow_message


def test_manual_flow_with_or_without_hinting(testbot):
    assert "Flow w4 started" in testbot.exec_command("!flows start w4")
    assert "a" in testbot.exec_command("!a")
    assert "b" in testbot.exec_command("!b")
    flow_message = testbot.pop_message()
    assert "You are in the flow w4, you can continue with" in flow_message
    assert "!c" in flow_message


def test_no_flyby_trigger_flow(testbot):
    testbot.bot.push_message("!flows start w1")
    # One message or the other can arrive first.
    flow_message = testbot.pop_message()
    assert "Flow w1 started" in flow_message or "You are in the flow w1" in flow_message
    flow_message = testbot.pop_message()
    assert "Flow w1 started" in flow_message or "You are in the flow w1" in flow_message
    assert "a" in testbot.exec_command("!a")
    flow_message = testbot.pop_message()
    assert "You are in the flow w1" in flow_message

    assert "c" in testbot.exec_command(
        "!c"
    )  # c is a trigger for w2 but it should not trigger now.
    flow_message = testbot.pop_message()
    assert "You are in the flow w1" in flow_message
    assert len(testbot.bot.flow_executor.in_flight) == 1


def test_flow_only(testbot):
    assert "a" in testbot.exec_command("!a")  # non flow_only should respond.
    testbot.push_message("!d")
    with pytest.raises(Empty):
        testbot.pop_message(timeout=1)


def test_flow_only_help(testbot):
    testbot.push_message("!help")
    msg = testbot.bot.pop_message()
    assert "!a" in msg  # non flow_only should be in help by default
    assert "!d" not in msg  # flow_only should not be in help by default


def test_flows_stop(testbot):
    assert "c" in testbot.exec_command("!c")
    flow_message = testbot.bot.pop_message()
    assert "You are in the flow w2" in flow_message
    assert "w2 stopped" in testbot.exec_command("!flows stop w2")
    assert len(testbot.bot.flow_executor.in_flight) == 0


def test_flows_kill(testbot):
    assert "c" in testbot.exec_command("!c")
    flow_message = testbot.bot.pop_message()
    assert "You are in the flow w2" in flow_message
    assert "w2 killed" in testbot.exec_command("!flows kill gbin@localhost w2")


def test_room_flow(testbot):
    assert "Flow w3 started" in testbot.exec_command("!flows start w3")
    flow_message = testbot.pop_message()
    assert "You are in the flow w3, you can continue with" in flow_message
    assert "!a" in flow_message
    assert "a" in testbot.exec_command("!a")
    flow_message = testbot.pop_message()
    assert "You are in the flow w3, you can continue with" in flow_message
    assert "!b" in flow_message