Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sending_ping(self):
tm = PingControlMessage("hello").single(mask=False)
m = MagicMock()
ws = WebSocket(sock=m)
ws.ping("hello")
m.sendall.assert_called_once_with(tm)
def test_send_generator_without_masking(self):
tm0 = b'hello'
tm1 = b'world'
def datasource():
yield tm0
yield tm1
gen = datasource()
m = MagicMock()
ws = WebSocket(sock=m)
ws.send(gen)
self.assertEqual(m.sendall.call_count, 2)
self.assertRaises(StopIteration, next, gen)
def test_terminate_with_closing(self):
m = MagicMock()
s = MagicMock()
c = MagicMock()
cc = MagicMock()
ws = WebSocket(sock=m)
with patch.multiple(ws, closed=c, close_connection=cc):
ws.stream = s
ws.stream.closing = CloseControlMessage(code=1000, reason='test closing')
ws.terminate()
self.assertTrue(ws.client_terminated)
self.assertTrue(ws.server_terminated)
self.assertTrue(ws.terminated)
c.assert_called_once_with(1000, b'test closing')
cc.assert_called_once_with()
self.assertIsNone(ws.stream)
self.assertIsNone(ws.environ)
def test_send_bytes_with_masking(self):
tm = TextMessage(b'hello world').single(mask=True)
m = MagicMock()
ws = WebSocket(sock=m)
ws.stream = MagicMock()
ws.stream.always_mask = True
ws.stream.text_message.return_value.single.return_value = tm
ws.send(b'hello world')
m.sendall.assert_called_once_with(tm)
stderr_lines = iter(popen.stderr.readline, "")
for stderr_line in stderr_lines:
client.send(json.dumps({'error': stderr_line}), False)
popen.stderr.close()
def execute(popen):
stdout_lines = iter(popen.stdout.readline, "")
for stdout_line in stdout_lines:
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
clients = []
board_ip = ''
class PythonDaemon(WebSocket):
def opened(self):
print "New client"
clients.append(self)
def received_message(self, message):
global board_ip
try:
data = json.loads(message.data)
print "New JSON"
if 'script' in data:
# Send a message to every connected client so it knows
for c in clients:
if c != self:
c.send(json.dumps(data))
# Send the board IP
if board_ip != '':
#!/usr/bin/env python
import logging
import cherrypy
from cherrypy.process import plugins
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
class WSHandler(WebSocket):
def received_message(self, m):
pass
class WSPlugin(plugins.SimplePlugin):
def __init__(self, bus):
plugins.SimplePlugin.__init__(self, bus)
def start(self):
cherrypy.engine.log("Starting the WebSocket Plugin")
cherrypy.tree.mount(WS(), '/ws', {
'/': {
'tools.websocket.on': True,
'tools.websocket.handler_cls': WSHandler
}
}
)
def broadcast(sockets, message):
for socket in sockets:
socket.send(message)
def send_refresh(sockets, filepath):
broadcast(sockets, stringify(("refresh", filepath)))
def send_time(sockets):
scene = bpy.context.scene
time = scene.frame_current / scene.render.fps * scene.render.fps_base
broadcast(sockets, stringify(("time", time)))
sockets = []
class WebSocketApp(_WebSocket):
def opened(self):
send_time([self])
sockets.append(self)
def closed(self, code, reason=None):
sockets.remove(self)
@persistent
def frame_change_post(context):
send_time(sockets)
@persistent
def load_post(context):
send_time(sockets)
@persistent
show_system_msg = self.settings['keys'].get('show_system_msg', get_system_message_types())
if self.ws.stream:
for item in self.history:
if isinstance(item, SystemMessage) and item.category not in show_system_msg:
continue
timedelta = datetime.datetime.now() - item.timestamp
timer = self.settings['keys'].get('clear_timer', LCSpin(-1)).simple()
if timer > 0:
if timedelta > datetime.timedelta(seconds=timer):
continue
message = prepare_message(item, self.settings)
self.ws.send(json.dumps(message))
class WebChatSocketServer(WebSocket):
def __init__(self, sock, protocols=None, extensions=None, environ=None, heartbeat_freq=None):
WebSocket.__init__(self, sock)
self.daemon = True
self.clients = []
self.settings = cherrypy.engine.publish('get-settings', 'server_chat')[0]
self.type = 'server_chat'
def opened(self):
cherrypy.engine.publish('add-client', self.peer_address, self)
if self.settings['keys'].get('show_history'):
timer = threading.Timer(0.3, self.fire_history)
timer.start()
def closed(self, code, reason=None):
cherrypy.engine.publish('del-client', self.peer_address, self)
# encoding: utf-8
from __future__ import absolute_import, print_function, unicode_literals
try:
import cherrypy
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
except ImportError:
raise
cherrypy.tools.websocket = WebSocketTool()
WebSocketPlugin(cherrypy.engine).subscribe()
class SynchronizingWebsocket(WebSocket):
'''
Class to handle requests sent to this websocket connection.
Each instance of this class represents a Salt websocket connection.
Waits to receive a ``ready`` message from the client.
Calls send on it's end of the pipe to signal to the sender on receipt
of ``ready``.
This class also kicks off initial information probing jobs when clients
initially connect. These jobs help gather information about minions, jobs,
and documentation.
'''
def __init__(self, *args, **kwargs): # pylint: disable=E1002
super(SynchronizingWebsocket, self).__init__(*args, **kwargs)
# This pipe needs to represent the parent end of a pipe.
# Clients need to ensure that the pipe assigned to ``self.pipe`` is