
K-Means ++在Python和Spark中的实现

在本教程中,我们将使用PySpark,它是Apache Spark的Python包装器。虽然PySpark有一个很好的K-Means ++实现,但我们将从头开始编写自己的实现



def parse_data(row):


Parse each pandas row into a tuple of

(station_name, feature_vec),`l

where feature_vec is the concatenation of the projection vectors



return (row[0],

np.concatenate([row[1], row[2], row[3]]))

## Read data

data = pickle.load(open("stations_projections.pickle", "rb"))

rdd = sc.parallelize([parse_data(row[1])

for row in data.iterrows()])





import numpy as np

import pickle

import sys

import time

from numpy.linalg import norm

from matplotlib import pyplot as plt

Defining Global Parameters


# Number of centroids

K = 5

# Number of K-means runs that are executed in parallel. Equivalently, number of sets of initial points

RUNS = 25

# For reproducability of results

RANDOM_SEED = 60295531

# The K-means algorithm is terminated when the change in the

# location of the centroids is smaller than 0.1

converge_dist = 0.1



def print_log(s):


Print progress logs


sys.stdout.write(s + "\n")


def compute_entropy(d):


Compute the entropy given the frequency vector `d`


d = np.array(d)

d = 1.0 * d / d.sum()

return -np.sum(d * np.log2(d))

def choice(p):


Generates a random sample from [0, len(p)),

where p[i] is the probability associated with i.


random = np.random.random()

r = 0.0

for idx in range(len(p)):

r = r + p[idx]

if r > random:

return idx



对于K-Means ++,我们希望在初始化时使质心尽可能远。这个想法是让质心在初始化时更接近不同的聚类中心,从而更快地达到收敛。Python代码如下:

def kmeans_init(rdd, K, RUNS, seed):


Select `RUNS` sets of initial points for `K`-means++


# the `centers` variable is what we want to return

n_data = rdd.count()

shape = rdd.take(1)[0][1].shape[0]

centers = np.zeros((RUNS, K, shape))

def update_dist(vec, dist, k):

new_dist = norm(vec - centers[:, k], axis=1)**2

return np.min([dist, new_dist], axis=0)

K-Means ++实现

现在我们有了初始化函数,现在我们可以使用它来实现K-Means ++算法。Python实现如下:

def get_closest(p, centers):


Return the indices the nearest centroids of `p`.

`centers` contains sets of centroids, where `centers[i]` is

the i-th set of centroids.


best = [0] * len(centers)

closest = [np.inf] * len(centers)

for idx in range(len(centers)):

for j in range(len(centers[0])):

temp_dist = norm(p - centers[idx][j])

if temp_dist < closest[idx]:

closest[idx] = temp_dist

best[idx] = j

return best

def kmeans(rdd, K, RUNS, converge_dist, seed):


Run K-means++ algorithm on `rdd`, where `RUNS` is the number of

initial sets to use.


k_points = kmeans_init(rdd, K, RUNS, seed)


temp_dist = 1.0

iters = 0

st = time.time()

while temp_dist > converge_dist:

# Update all `RUNS` sets of centroids using standard k-means

# algorithm

# Outline:

# - For each point x, select its nearest centroid in i-th

# centroids set

# - Average all points that are assigned to the same

# centroid

# - Update the centroid with the average of all points

# that are assigned to it

temp_dist = np.max([

np.sum([norm(k_points[idx][j] - new_points[(idx,

j)]) for idx,j in new_points.keys()])


iters = iters + 1

if iters % 5 == 0:

print_log("Iteration %d max shift: %.2f (time: %.2f)" %

(iters, temp_dist, time.time() - st))

st = time.time()

# update old centroids

# You modify this for-loop to meet your need

for ((idx, j), p) in new_points.items():

k_points[idx][j] = p

return k_points


由于其初始化算法,K-Means ++优于K-Means的优点在于其收敛速度。此外,Spark正被用于尽可能地并行化该算法。因此,让我们使用Benchmark这个实现。

st = time.time()


centroids = kmeans(rdd, K, RUNS, converge_dist,


group = rdd.mapValues(lambda p: get_closest(p, centroids)) \


print "Time takes to converge:", time.time() - st




def get_cost(rdd, centers):


Compute the square of l2 norm from each data point in `rdd`

to the centroids in `centers`


def _get_cost(p, centers):

best = [0] * len(centers)

closest = [np.inf] * len(centers)

for idx in range(len(centers)):

for j in range(len(centers[0])):

temp_dist = norm(p - centers[idx][j])

if temp_dist < closest[idx]:

closest[idx] = temp_dist

best[idx] = j

return np.array(closest)**2

cost = rdd.map(lambda (name, v): _get_cost(v,


return np.array(cost).sum(axis=0)

cost = get_cost(rdd, centroids)

log2 = np.log2

print "Min Cost:\t"+str(log2(np.max(cost)))

print "Max Cost:\t"+str(log2(np.min(cost)))

print "Mean Cost:\t"+str(log2(np.mean(cost)))

Min Cost: 33.7575332525

Max Cost: 33.8254902123

Mean Cost: 33.7790236109



print 'entropy=',entropy

best = np.argmin(cost)

print 'best_centers=',list(centroids[best])

