Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def moco():
""" A mock soco with fake services.
Allows calls to services to be tracked. Should not cause any network access
"""
services = (
'AVTransport', 'RenderingControl', 'DeviceProperties',
'ContentDirectory', 'ZoneGroupTopology')
patchers = [mock.patch('soco.core.{0}'.format(service))
for service in services]
for patch in patchers:
patch.start()
yield SoCo(IP_ADDR)
for patch in reversed(patchers):
patch.stop()
Parameters:
arg is either an ip or the number of a speaker as shown by list
"""
# Update the list of known speakers if that has not already been done
if not KNOWN_SPEAKERS:
list(list_ips())
# pylint: disable=global-statement,fixme
# TODO: this should be refactored into a class with instance-wide state
global CUR_SPEAKER
# Set speaker by speaker number as identified by list_ips ...
if '.' not in arg and arg in KNOWN_SPEAKERS:
CUR_SPEAKER = soco.SoCo(KNOWN_SPEAKERS[arg])
# ... and if not that, then by ip
else:
CUR_SPEAKER = soco.SoCo(arg)
def _check_args(self, cmd, args):
"""Checks if func is called for a speaker and updates 'args'"""
req_ip, func = self.commands[cmd]
if not req_ip:
return func, args
if not self.current_speaker:
if not args:
err('Please specify a speaker IP for "{cmd}".'.format(cmd=cmd))
return None, None
else:
speaker_spec = args.pop(0)
sonos = soco.SoCo(speaker_spec)
args.insert(0, sonos)
else:
args.insert(0, self.current_speaker)
return func, args
def define_player(ip_address, name):
"""Returning a SoCo object of the chosen player"""
player = None
if ip_address:
player = soco.SoCo(ip_address)
if name:
player = get_player_by_name(name)
if player and GROUP:
# Change player to be the coordinator of the group
player = player.group.coordinator
return player
def sexy_time():
# connect to the Sonos
sonos = SoCo(SONOS_IP)
# connect to Philips Hue Bridge
hue = Bridge(ip=HUE_IP,
username=HUE_USERNAME)
# get queue
queue = sonos.get_queue()
# if we:
# * already have a queue
# * music is playing
# * we are already playing a queue that begins with "Let's Get It On"
# ...then skip to the next track
if len(queue) > 0 and \
sonos.get_current_transport_info()['current_transport_state'] == "PLAYING" and \
default=None,
help="The ip address of the device to query. "
"If none is supplied, a random device will be used"
)
parser.add_argument(
'-s', '--service',
default=None,
help="Dump data relating to services matching this regexp "
"only, e.g. %(prog)s -s GroupRenderingControl"
)
args = parser.parse_args()
# get a zone player - any one will do
if args.device:
device = soco.SoCo(args.device)
else:
device = soco.discovery.any_soco()
print("Querying %s" % device.player_name)
# loop over each of the available services
# pylint: disable=no-member
services = (srv(device) for srv in soco.services.Service.__subclasses__())
for srv in services:
if args.service is None or re.search(
args.service, srv.service_type):
print_details(srv)
def set_speaker(arg):
""" Set the current speaker for the shell session by ip or speaker number
Parameters:
arg is either an ip or the number of a speaker as shown by list
"""
# Update the list of known speakers if that has not already been done
if not KNOWN_SPEAKERS:
list(list_ips())
# pylint: disable=global-statement,fixme
# TODO: this should be refactored into a class with instance-wide state
global CUR_SPEAKER
# Set speaker by speaker number as identified by list_ips ...
if '.' not in arg and arg in KNOWN_SPEAKERS:
CUR_SPEAKER = soco.SoCo(KNOWN_SPEAKERS[arg])
# ... and if not that, then by ip
else:
CUR_SPEAKER = soco.SoCo(arg)
Returns:
True if partymode is set
If an error occurs, we'll attempt to parse the error and return a UPnP
error code. If that fails, the raw response sent back from the Sonos
speaker will be returned.
"""
master_speaker_info = self.get_speaker_info()
ips = self.get_speakers_ip()
rc = True
# loop through all IP's in topology and make them join this master
for ip in ips:
if not (ip == self.speaker_ip):
slave = SoCo(ip)
ret = slave.join(master_speaker_info["uid"])
if ret is False:
rc = False
return rc
def main():
speakers = [speaker.ip_address for speaker in SoCo.discover()]
if not speakers:
print 'no speakers found, exiting.'
return
soco = SoCo(speakers[0])
# get a plugin by name (eg from a config file)
myplugin = SoCoPlugin.from_name('soco.plugins.example.ExamplePlugin',
soco, 'some user')
# do something with your plugin
print 'Testing', myplugin.name
myplugin.music_plugin_play()
time.sleep(5)
# create a plugin by normal instantiation
from soco.plugins.example import ExamplePlugin
# create a new plugin, pass the soco instance to it