Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Create an algorithm object for the step 5
step5 = kmeans.init.Distributed(step5Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
# Create an algorithm object for the step 3
step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
for iRound in range(step5.parameter.nRounds):
# Perform steps 2 and 3
step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
# Perform step 4
pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
# One more step 2
for i in range(nBlocks):
# Create an algorithm object for the step 2
local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
local.parameter.outputForStep5Required = True
# Set the input data to the algorithm
local.input.setInput(kmeans.init.data, data[i])
local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
# Compute, get the result and add the result to the input of step 5
step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
step5.compute()
return step5.finalizeCompute().get(kmeans.init.centroids)
def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
# kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
for i in range(len(data)):
step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
step2.input.set(kmeans.init.data, data[i])
step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
if not bFirstIteration:
step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
res = step2.compute()
if bFirstIteration:
localNodeData.append(res.getLocal(kmeans.init.internalResult))
step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
return step3.compute()
if __name__ == "__main__":
masterAlgorithm = kmeans.Distributed(step2Master, nClusters, method=kmeans.lloydCSR, )
centroids = None
assignments = [0] * nBlocks
masterInitAlgorithm = init.Distributed(step2Master, nClusters, method=init.randomDense)
for i in range(nBlocks):
# Read dataFileNames and create a numeric table to store the input data
dataTable[i] = createSparseTable(dataFileNames[i])
# Create an algorithm object for the K-Means algorithm
localInit = init.Distributed(step1Local, nClusters, nBlocks * nVectorsInBlock, i * nVectorsInBlock, method=init.randomDense)
localInit.input.set(init.data, dataTable[i])
# compute and add input for next
masterInitAlgorithm.input.add(init.partialResults, localInit.compute())
masterInitAlgorithm.compute()
res = masterInitAlgorithm.finalizeCompute()
centroids = res.get(init.centroids)
for it in range(nIterations):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
localAlgorithm = kmeans.Distributed(step1Local, nClusters, it == nIterations, method=kmeans.lloydCSR)
# Set the input data to the algorithm
localAlgorithm.input.set(kmeans.data, dataTable[i])
def computeInitMaster(partsRDD):
# Create an algorithm to compute k-means on the master node
kmeansMasterInit = init.Distributed(step2Master, nClusters, method=init.randomCSR)
partsList = partsRDD.collect()
# Add partial results computed on local nodes to the algorithm on the master node
for _, val in partsList:
deserialized_val = deserializePartialResult(val, init)
kmeansMasterInit.input.add(kmeans.partialResults, deserialized_val)
# Compute k-means on the master node
kmeansMasterInit.compute()
# Finalize computations and retrieve the results
initResult = kmeansMasterInit.finalizeCompute()
return initResult.get(init.centroids)
def mapper(tup):
key, val = tup
# Create an algorithm to initialize the K-Means algorithm on local nodes
kmeansLocalInit = init.Distributed(step1Local,
nClusters,
nBlocks * nVectorsInBlock,
nVectorsInBlock * key,
method=init.randomDense)
# Set the input data on local nodes
deserialized_val = deserializeNumericTable(val)
kmeansLocalInit.input.set(init.data, deserialized_val)
# Initialize the K-Means algorithm on local nodes
pres = kmeansLocalInit.compute()
serialized_pres = serializeNumericTable(pres)
return (key, serialized_pres)
return dataRDD.map(mapper)
def initStep23(data, localNodeData, step2Input, step3, bFirstIteration, method):
# kmeans.init.Distributed(nClusters, bFirstIteration, step=step3Master, fptype=algorithmFPType, method=method)
for i in range(len(data)):
step2 = kmeans.init.Distributed(step2Local, nClusters, bFirstIteration, fptype=algorithmFPType, method=method)
step2.input.set(kmeans.init.data, data[i])
step2.input.setStepInput(kmeans.init.inputOfStep2, step2Input)
if not bFirstIteration:
step2.input.setLocal(kmeans.init.internalInput, localNodeData[i])
res = step2.compute()
if bFirstIteration:
localNodeData.append(res.getLocal(kmeans.init.internalResult))
step3.input.add(kmeans.init.inputOfStep3FromStep2, i, res.getOutput(kmeans.init.outputOfStep2ForStep3))
return step3.compute()
Environment.getInstance().setNumberOfThreads(nThreads)
# Get the number of threads that is used by the library after changing
nThreadsNew = Environment.getInstance().getNumberOfThreads()
# Initialize FileDataSource to retrieve the input data from a .csv file
dataSource = FileDataSource(
datasetFileName, DataSourceIface.doAllocateNumericTable,
DataSourceIface.doDictionaryFromContext
)
# Retrieve the data from the input file
dataSource.loadDataBlock()
# Get initial clusters for the K-Means algorithm
initAlg = init.Batch(nClusters)
initAlg.input.set(init.data, dataSource.getNumericTable())
res = initAlg.compute()
centroids = res.get(init.centroids)
# Create an algorithm object for the K-Means algorithm
algorithm = kmeans.Batch(nClusters, nIterations)
algorithm.input.set(kmeans.data, dataSource.getNumericTable())
algorithm.input.set(kmeans.inputCentroids, centroids)
# Run computations
unused_result = algorithm.compute()
print("Initial number of threads: {}".format(nThreadsInit))
print("Number of threads to set: {}".format(nThreads))
def initStep1(data, method):
for i in range(nBlocks):
# Create an algorithm object for the K-Means algorithm
local = kmeans.init.Distributed(step1Local, nClusters, nBlocks*nVectorsInBlock, i*nVectorsInBlock,
fptype=algorithmFPType, method=method)
local.input.set(kmeans.init.data, data[i])
pNewCenters = local.compute().get(kmeans.init.partialCentroids)
if pNewCenters:
return pNewCenters
return None
# Create an algorithm object for the step 3
step3 = kmeans.init.Distributed(step3Master, nClusters, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
for iRound in range(step5.parameter.nRounds):
# Perform steps 2 and 3
step3res = initStep23(data, localNodeData, pNewCentroids, step3, iRound == 0, method=kmeans.init.parallelPlusCSR)
# Perform step 4
pNewCentroids = initStep4(data, localNodeData, step3res, method=kmeans.init.parallelPlusCSR)
step5.input.add(kmeans.init.inputCentroids, pNewCentroids)
# One more step 2
for i in range(nBlocks):
# Create an algorithm object for the step 2
local = kmeans.init.Distributed(step2Local, nClusters, False, fptype=algorithmFPType, method=kmeans.init.parallelPlusCSR)
local.parameter.outputForStep5Required = True
# Set the input data to the algorithm
local.input.setInput(kmeans.init.data, data[i])
local.input.setLocal(kmeans.init.internalInput, localNodeData[i])
local.input.setStepInput(kmeans.init.inputOfStep2, pNewCentroids)
# Compute, get the result and add the result to the input of step 5
step5.input.add(kmeans.init.inputOfStep5FromStep2, local.compute().getOutput(kmeans.init.outputOfStep2ForStep5))
step5.input.setStepInput(kmeans.init.inputOfStep5FromStep3, step3res.getStepOutput(kmeans.init.outputOfStep3ForStep5))
step5.compute()
return step5.finalizeCompute().get(kmeans.init.centroids)
def computeInitMaster(partsRDD):
# Create an algorithm to compute k-means on the master node
kmeansMasterInit = init.Distributed(step2Master, nClusters, method=init.randomDense)
partsList = partsRDD.collect()
# Add partial results computed on local nodes to the algorithm on the master node
for _, value in partsList:
deserialized_pres = deserializePartialResult(value, init)
kmeansMasterInit.input.add(init.partialResults, deserialized_pres)
# Compute k-means on the master node
kmeansMasterInit.compute()
# Finalize computations and retrieve the results
initResult = kmeansMasterInit.finalizeCompute()
return initResult.get(init.centroids)