-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
130 lines (97 loc) · 3.69 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Very simple play with libpulseaudio
pip install libpulseaudio
Help with the API:
http://www.ypass.net/blog/2009/10/pulseaudio-an-async-example-to-get-device-lists/
http://freedesktop.org/software/pulseaudio/doxygen/index.html
"""
import logging
from ctypes import *
from pulseaudio.lib_pulseaudio import *
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - '
'%(message)s'
)
ctypes_log = logging.getLogger('ctypes')
ctypes_log.info("Setting up mappings")
###########################
#### CTYPES FUNCTIONS START
PA_SET_STATE_CALLBACK = CFUNCTYPE(None, POINTER(pa_context), c_void_p)
#### CTYPES FUNCTIONS DONE
##########################
ctypes_log.debug("Setup done")
class PulseAudio(object):
_main_loop = None
_api = None
_context = None
_app_name = None
logger = logging.getLogger("PulseAudio")
STATE_MAP = {
PA_CONTEXT_AUTHORIZING: "authorizing",
PA_CONTEXT_CONNECTING: "connecting",
PA_CONTEXT_FAILED: "failed",
PA_CONTEXT_NOAUTOSPAWN: "no auto spawn",
PA_CONTEXT_NOFAIL: "no fail",
PA_CONTEXT_NOFLAGS: "no flags",
PA_CONTEXT_READY: "ready",
PA_CONTEXT_SETTING_NAME: "setting name",
PA_CONTEXT_TERMINATED: "terminated",
PA_CONTEXT_UNCONNECTED: "unconnected",
}
DISCONNECT_STATES = (PA_CONTEXT_FAILED, PA_CONTEXT_TERMINATED)
DONT_IGNORE_STATES = DISCONNECT_STATES + (PA_CONTEXT_READY,)
def __init__(self, main_loop=None, api=None, context=None, app_name=None):
self._main_loop = main_loop
self._api = api
self._context = context
self._app_name = app_name
self._server = None
self._state_changed = PA_SET_STATE_CALLBACK(self.state_changed)
@property
def main_loop(self):
if not self._main_loop:
self._main_loop = pa_mainloop_new()
return self._main_loop
@property
def api(self):
if not self._api:
self._api = pa_mainloop_get_api(self.main_loop)
return self._api
@property
def context(self):
if not self._context:
if not self._app_name:
raise NameError("No pa_context or app name to create it with "
"has been given")
self._context = pa_context_new(self.api, self._app_name)
self._setup_context_events()
return self._context
def _setup_context_events(self):
pa_context_set_state_callback(self.context, self._state_changed, None)
def connect(self, server=None, flags=0):
if server is not None:
server = c_char_p(server)
pa_context_connect(self.context, server, flags, None)
print self.context.server
def state_changed(self, context, userdata):
"""
PulseAudio state has changed (ready, disconnected, etc)
"""
state = pa_context_get_state(context)
if state not in self.DONT_IGNORE_STATES:
self.logger.debug("PA state changed (ignoring): %s",
self.STATE_MAP[state]
)
return
self.logger.debug("PA state changed: %s", self.STATE_MAP[state])
if state == PA_CONTEXT_READY:
self.logger.info("READY FOR WORK")
elif state in self.DISCONNECT_STATES:
self.logger.error("DISCONNECTED")
pa = PulseAudio(app_name='test')
pa.connect()
while True:
# Iterate the main loop, block until something happens
pa_mainloop_iterate(pa.main_loop, c_int(1), None)
# This would be for non-blocking main loop
# pa_mainloop_iterate(pa.main_loop, c_int(0), None)