File: test-message.py

package info (click to toggle)
telepathy-sofiasip 0.5.8-1
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 2,376 kB
  • ctags: 897
  • sloc: sh: 8,563; ansic: 8,262; python: 2,027; makefile: 302; xml: 200
file content (140 lines) | stat: -rw-r--r-- 4,455 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-

from servicetest import unwrap, match, tp_name_prefix
from sofiatest import go

import twisted.protocols.sip
import dbus

# Test outgoing message handling

@match('dbus-signal', signal='StatusChanged', args=[1, 1])
def expect_connecting(event, data):
    return True

@match('dbus-signal', signal='StatusChanged', args=[0, 1])
def expect_connected(event, data):
    contact = 'sip:user@somewhere.com'

    handle = data['conn_iface'].RequestHandles(1, [contact])[0]

    chan = data['conn_iface'].RequestChannel(tp_name_prefix + '.Channel.Type.Text',
        1, handle, True)
    return True

@match('dbus-signal', signal='NewChannel')
def expect_text_chan(event, data):
    if event.args[1] != tp_name_prefix + '.Channel.Type.Text':
        return False

    bus = data['conn']._bus
    data['text_chan'] = bus.get_object(
        data['conn']._named_service, event.args[0])

    dbus.Interface(data['text_chan'],
        u'org.freedesktop.Telepathy.Channel.Type.Text').Send(0, 'Hello')

    return True

@match('sip-message', uri='sip:user@somewhere.com', body='Hello')
def expect_message(event, data):
    data['sip'].deliverResponse(data['sip'].responseFromRequest(404, event.sip_message))
    return True

@match('dbus-signal', signal='SendError')
def expect_send_error(event, data):
    dbus.Interface(data['text_chan'],
        u'org.freedesktop.Telepathy.Channel.Type.Text').Send(0, 'Hello Again')
    return True

@match('sip-message', uri='sip:user@somewhere.com', body='Hello Again')
def expect_message_again(event, data):
    data['sip'].deliverResponse(data['sip'].responseFromRequest(200, event.sip_message))
    data['prevhdr'] = event.sip_message.headers
    return True

# Test incoming message handling

@match('dbus-signal', signal='Sent')
def expect_sent(event, data):
    url = twisted.protocols.sip.parseURL('sip:testacc@127.0.0.1')
    msg = twisted.protocols.sip.Request('MESSAGE', url)
    send_message(data, 'Hi')
    return True

@match('dbus-signal', signal='NewChannel')
def expect_other_text_chan(event, data):
    if event.args[1] != tp_name_prefix + '.Channel.Type.Text' or event.args[2] != 1:
        return False

    handle = event.args[3]
    name = data['conn_iface'].InspectHandles(1, [handle])[0]

    assert name == 'sip:other.user@somewhere.else.com'
    return True

@match('dbus-signal', signal='Received')
def expect_msg_recv(event, data):
    if event.args[5] != 'Hi':
        return False

    send_message(data, u'straight ASCII'.encode('us-ascii'), 'us-ascii')
    return True

# Test conversion from an 8-bit encoding.
# Due to limited set of encodings available in some environments,
# try with US ASCII and ISO 8859-1.

@match('dbus-signal', signal='Received')
def expect_msg_recv_us_ascii(event, data):
    if event.args[5] != u'straight ASCII':
        return False

    send_message(data, u'Hyv\xe4!'.encode('iso-8859-1'), 'iso-8859-1')
    return True

@match('dbus-signal', signal='Received')
def expect_msg_recv_iso8859_1(event, data):
    if event.args[5] != u'Hyv\xe4!':
        return False

    data['conn_iface'].Disconnect()
    return True

@match('dbus-signal', signal='StatusChanged', args=[2, 1])
def expect_disconnected(event, data):
    return True

def register_cb(message, host, port):
    return True

cseq_num = 1
def send_message(data, body, encoding=None):
    global cseq_num
    cseq_num += 1
    url = twisted.protocols.sip.parseURL('sip:testacc@127.0.0.1')
    msg = twisted.protocols.sip.Request('MESSAGE', url)
    msg.body = body
    msg.addHeader('from', '<sip:other.user@somewhere.else.com>;tag=XYZ')
    msg.addHeader('to', '<sip:user@127.0.0.1>')
    msg.addHeader('cseq', '%d MESSAGE' % cseq_num)
    msg.addHeader('allow', 'INVITE ACK BYE MESSAGE')
    if encoding is None:
        msg.addHeader('content-type', 'text/plain')
    else:
        msg.addHeader('content-type', 'text/plain; charset=%s' % encoding)
    msg.addHeader('content-length', '%d' % len(msg.body))
    msg.addHeader('call-id', data['prevhdr']['call-id'][0])
    msg.addHeader('via', 'SIP/2.0/UDP 127.0.0.1;branch=ABCXYZ')

    destVia = twisted.protocols.sip.parseViaHeader(data['prevhdr']['via'][0])
    host = destVia.received or destVia.host
    port = destVia.rport or destVia.port or self.PORT
    destAddr = twisted.protocols.sip.URL(host=host, port=port)
    data['sip'].sendMessage(destAddr, msg)
    return True

if __name__ == '__main__':
    go(register_cb)