Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def put(self, item, block=True, timeout=None, dup_callback=None):
"""Put an item to queue if it is not duplicated.
"""
if not self.check_dup(item):
super(CachedQueue, self).put(item, block, timeout)
else:
if dup_callback:
dup_callback(item)
def __init__(self, thread_num, in_queue=None, out_queue=None, name=None):
self.thread_num = thread_num
self.in_queue = (in_queue
if in_queue else CachedQueue(5 * self.thread_num))
self.out_queue = (out_queue
if out_queue else CachedQueue(5 * self.thread_num))
self.name = name if name else __name__
self.workers = []
self.lock = Lock()
self.logger = logging.getLogger(self.name)
def __init__(self, thread_num, in_queue=None, out_queue=None, name=None):
self.thread_num = thread_num
self.in_queue = (in_queue
if in_queue else CachedQueue(5 * self.thread_num))
self.out_queue = (out_queue
if out_queue else CachedQueue(5 * self.thread_num))
self.name = name if name else __name__
self.workers = []
self.lock = Lock()
self.logger = logging.getLogger(self.name)
def __init__(self, *args, **kwargs):
super(CachedQueue, self).__init__(*args, **kwargs)
if 'cache_capacity' in kwargs:
self.cache_capacity = kwargs['cache_capacity']
else:
self.cache_capacity = 0
self._cache = OrderedDict()