-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathsample.py
69 lines (57 loc) · 2.53 KB
/
sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#/usr/bin/python
from application.notification import NotificationCenter
from sipsimple.account import AccountManager
from sipsimple.application import SIPApplication
from sipsimple.core import SIPURI, ToHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.storage import FileStorage
from sipsimple.session import Session
from sipsimple.streams import MediaStreamRegistry
from sipsimple.threading.green import run_in_green_thread
from threading import Event
class SimpleCallApplication(SIPApplication):
def __init__(self):
SIPApplication.__init__(self)
self.ended = Event()
self.callee = None
self.session = None
notification_center = NotificationCenter()
notification_center.add_observer(self)
def call(self, callee):
self.callee = callee
self.start(FileStorage('config'))
@run_in_green_thread
def _NH_SIPApplicationDidStart(self, notification):
self.callee = ToHeader(SIPURI.parse(self.callee))
account = AccountManager().get_account("[email protected]");
try:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
routes = DNSLookup().lookup_sip_proxy(uri, ['udp']).wait()
except DNSLookupError, e:
print 'DNS lookup failed: %s' % str(e)
else:
self.session = Session(account)
print 'Routes: %s' % str(routes)
self.session.connect(self.callee, routes, streams=[MediaStreamRegistry.AudioStream()])
def _NH_SIPSessionGotRingIndication(self, notification):
print 'Ringing!'
def _NH_SIPSessionDidStart(self, notification):
audio_stream = notification.data.streams[0]
print 'Audio session established using "%s" codec at %sHz' % (audio_stream.codec, audio_stream.sample_rate)
def _NH_SIPSessionDidFail(self, notification):
print 'Failed to connect'
self.stop()
def _NH_SIPSessionDidEnd(self, notification):
print 'Session ended'
self.stop()
def _NH_SIPApplicationDidEnd(self, notification):
self.ended.set()
# place an audio call to the specified SIP URI in user@domain format
# target_uri="sip:[email protected]"
target_uri="sip:[email protected]"
application = SimpleCallApplication()
application.call(target_uri)
print "Placing call to %s, press Enter to quit the program" % target_uri
raw_input()
application.session.end()
application.ended.wait()