Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_blocking_stress(self):
sem = SlidingWindowSemaphore(5)
num_threads = 10
num_iterations = 50
def acquire():
for _ in range(num_iterations):
num = sem.acquire('a', blocking=True)
time.sleep(0.001)
sem.release('a', num)
for i in range(num_threads):
t = threading.Thread(target=acquire)
self.threads.append(t)
self.start_threads()
self.join_threads()
# Should have all the available resources freed.
self.assertEqual(sem.current_count(), 5)
def setUp(self):
super(BaseSubmissionTaskTest, self).setUp()
self.config = TransferConfig()
self.osutil = OSUtils()
self.executor = BoundedExecutor(
1000,
1,
{
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(10),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(10)
}
def test_can_acquire_release_multiple_times(self):
sem = SlidingWindowSemaphore(1)
num = sem.acquire('a', blocking=False)
self.assertEqual(num, 0)
sem.release('a', num)
num = sem.acquire('a', blocking=False)
self.assertEqual(num, 1)
sem.release('a', num)
def test_can_acquire_a_range(self):
sem = SlidingWindowSemaphore(3)
self.assertEqual(sem.acquire('a', blocking=False), 0)
self.assertEqual(sem.acquire('a', blocking=False), 1)
self.assertEqual(sem.acquire('a', blocking=False), 2)
sem.release('a', 0)
sem.release('a', 1)
sem.release('a', 2)
# Now we're reset so we should be able to acquire the same
# sequence again.
self.assertEqual(sem.acquire('a', blocking=False), 3)
self.assertEqual(sem.acquire('a', blocking=False), 4)
self.assertEqual(sem.acquire('a', blocking=False), 5)
self.assertEqual(sem.current_count(), 0)
def test_is_error_to_double_release(self):
# This is different than other error tests because
# we're verifying we can reset the state after an
# acquire/release cycle.
sem = SlidingWindowSemaphore(2)
sem.acquire('a', blocking=False)
sem.acquire('a', blocking=False)
sem.release('a', 0)
sem.release('a', 1)
self.assertEqual(sem.current_count(), 2)
with self.assertRaises(ValueError):
sem.release('a', 0)
def test_raises_error_when_count_is_zero(self):
sem = SlidingWindowSemaphore(3)
sem.acquire('a', blocking=False)
sem.acquire('a', blocking=False)
sem.acquire('a', blocking=False)
# Count is now 0 so trying to acquire should fail.
with self.assertRaises(NoResourcesAvailable):
sem.acquire('a', blocking=False)
def test_is_error_to_release_unknown_sequence_number(self):
sem = SlidingWindowSemaphore(3)
sem.acquire('a', blocking=False)
with self.assertRaises(ValueError):
sem.release('a', 1)
def test_error_to_release_unknown_tag(self):
sem = SlidingWindowSemaphore(3)
with self.assertRaises(ValueError):
sem.release('a', 0)
def test_can_handle_multiple_tags_released(self):
sem = SlidingWindowSemaphore(4)
sem.acquire('a', blocking=False)
sem.acquire('a', blocking=False)
sem.acquire('b', blocking=False)
sem.acquire('b', blocking=False)
sem.release('b', 1)
sem.release('a', 1)
self.assertEqual(sem.current_count(), 0)
sem.release('b', 0)
self.assertEqual(sem.acquire('a', blocking=False), 2)
sem.release('a', 0)
self.assertEqual(sem.acquire('b', blocking=False), 2)
self._config = TransferConfig()
self._osutil = osutil
if osutil is None:
self._osutil = OSUtils()
self._coordinator_controller = TransferCoordinatorController()
# A counter to create unique id's for each transfer submitted.
self._id_counter = 0
# The executor responsible for making S3 API transfer requests
self._request_executor = BoundedExecutor(
max_size=self._config.max_request_queue_size,
max_num_threads=self._config.max_request_concurrency,
tag_semaphores={
IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
self._config.max_in_memory_upload_chunks),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks)
},
executor_cls=executor_cls
)
# The executor responsible for submitting the necessary tasks to
# perform the desired transfer
self._submission_executor = BoundedExecutor(
max_size=self._config.max_submission_queue_size,
max_num_threads=self._config.max_submission_concurrency,
executor_cls=executor_cls
)
# There is one thread available for writing to disk. It will handle
# downloads for all files.