Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
n_dims = X.shape[1]
chunk_size = X.shape[0] // cpu_count()
remainder = (X.shape[0] % cpu_count()) > 0
process_pool = []
results_queue = Queue()
# Split up the indices and assign processes for each chunk
i = 0
while (i + chunk_size) <= X.shape[0]:
process_pool.append(KNN_Worker(index_filepath, k, search_k, n_dims,
(i, i+chunk_size), results_queue))
i += chunk_size
if remainder:
process_pool.append(KNN_Worker(index_filepath, k, search_k, n_dims,
(i, X.shape[0]), results_queue))
try:
for process in process_pool:
process.start()
# Read from queue constantly to prevent it from becoming full
with tqdm(total=X.shape[0], disable=verbose < 1) as pbar:
neighbour_list = []
neighbour_list_length = len(neighbour_list)
while any(process.is_alive() for process in process_pool):
while not results_queue.empty():
neighbour_list.append(results_queue.get())
progress = len(neighbour_list) - neighbour_list_length
pbar.update(progress)
neighbour_list_length = len(neighbour_list)
def __init__(self, index_filepath, k, search_k, n_dims,
data_indices, results_queue):
self.index_filepath = index_filepath
self.k = k
self.n_dims = n_dims
self.search_k = search_k
self.data_indices = data_indices
self.results_queue = results_queue
super(KNN_Worker, self).__init__()