1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
from __future__ import print_function from __future__ import division import random import numpy as np import cPickle as pkl
__author__ = 'wangzx'
class Network(object):
def __init__(self, sizes): """``sizes``是 list 类型,第 i 个值表示第 i 层所包含的神经元个数。 例如,如果 sizes=[2,3,1],那么就表示一个三层网络,第一层有 2 个神经元, 第二层有 3 个神经元,最后一层有 1 个神经元。网络的偏置 b 和权值 w 使用标准高斯 分布来初始化。需要注意的一点是网络的第一层表示输入层,最后一层表示输出层 """ self.num_layers = len(sizes) self.sizes = sizes self.biases = [np.random.randn(y, 1) for y in sizes[1:]] self.weights = [np.random.randn(y, x) for x, y in zip(sizes[:-1], sizes[1:])]
def feedforward(self, a): for b, w in zip(self.biases, self.weights): a = sigmoid(np.dot(w, a)+b) return a / np.sum(a)
def SGD(self, training_data, epochs, mini_batch_size, eta, test_data=None): """使用 mini-batch stochastic gradient descent 方法训练网络。 ``training_data``是一个列表元组``(x,y)``,代表训练的输入和目标输出。 如果提供``test_data``的话,在每一个 epoch 完成之后会用模型计算这个数据相对应的 输出。这个可以让我们更清楚整个训练情况,不过会降低训练的速度。 """ if test_data: n_test = len(test_data) n = len(training_data) for j in xrange(epochs): random.shuffle(training_data) mini_batches = [ training_data[k:k+mini_batch_size] for k in xrange(0, n, mini_batch_size)] for mini_batch in mini_batches: self.update_mini_batch(mini_batch, eta) if test_data: print("Epoch {0}: {1} / {2}".format( j, self.evaluate(test_data), n_test)) else: print("Epoch {0} complete".format(j))
def update_mini_batch(self, mini_batch, eta): """通过随机梯度下降的方法来更新权值。 ``mini_batch``是一个元组列表``(x,y)``,每一项是一个训练数据,分别代表 训练输入和目标输出,``eta``是学习率""" nabla_b = [np.zeros(b.shape) for b in self.biases] nabla_w = [np.zeros(w.shape) for w in self.weights] for x, y in mini_batch: delta_nabla_b, delta_nabla_w = self.backprop(x, y) nabla_b = [nb+dnb for nb, dnb in zip(nabla_b, delta_nabla_b)] nabla_w = [nw+dnw for nw, dnw in zip(nabla_w, delta_nabla_w)] self.weights = [w-(eta/len(mini_batch))*nw for w, nw in zip(self.weights, nabla_w)] self.biases = [b-(eta/len(mini_batch))*nb for b, nb in zip(self.biases, nabla_b)]
def backprop(self, x, y): """返回损失函数 C_x 的梯度(nabla_b, nabla_w). ``nabla_b``表示偏置 b 的梯度,``nabla_w``表示 权重 w 的梯度。需要注意一点,返回的是每一层的梯度,也就是说返回值是一个 list,list 的每一项代表 网络其中一层的梯度""" nabla_b = [np.zeros(b.shape) for b in self.biases] nabla_w = [np.zeros(w.shape) for w in self.weights] activation = x activations = [x] zs = [] for b, w in zip(self.biases, self.weights): z = np.dot(w, activation)+b zs.append(z) activation = sigmoid(z) activations.append(activation) delta = self.cost_derivative(activations[-1], y) * \ sigmoid_prime(zs[-1]) nabla_b[-1] = delta nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.num_layers): z = zs[-l] sp = sigmoid_prime(z) delta = np.dot(self.weights[-l+1].transpose(), delta) * sp nabla_b[-l] = delta nabla_w[-l] = np.dot(delta, activations[-l-1].transpose()) return (nabla_b, nabla_w)
def evaluate(self, test_data): """返回预测准确的样本个数""" test_results = [(np.argmax(self.feedforward(x)), y) for (x, y) in test_data] return sum(int(x == y) for (x, y) in test_results)
def cost_derivative(self, output_activations, y): t = np.zeros((self.sizes[-1], 1)) t[y, 0] = 1 return (output_activations-t)
def sigmoid(z): """sigmoid 函数.""" return 1.0/(1.0+np.exp(-z))
def sigmoid_prime(z): """sigmoid 函数的导数.""" return sigmoid(z)*(1-sigmoid(z))
def test(): feat = pkl.load(open("data/train_X.pkl", 'rb')) feat = (feat - np.min(feat, axis=0)) / \ (np.max(feat, axis=0) - np.min(feat, axis=0) + 1e-3) y = pkl.load(open("data/train_Y.pkl", 'rb')) train_data = [(np.asarray(f).reshape((-1,1)), t) for f, t in zip(feat, y)] test_feat = pkl.load(open("data/test_X.pkl", 'rb')) test_feat = (test_feat - np.min(test_feat, axis=0)) / \ (np.max(test_feat, axis=0) - np.min(test_feat, axis=0) + 1e-3) test_y = pkl.load(open("data/test_Y.pkl", 'rb')) test_data = [(np.asarray(f).reshape((-1,1)), t) for f, t in zip(test_feat, test_y)] n_sample, ndim = feat.shape bp = Network([ndim, 50, 2])
print("Begin fit training data") bp.SGD(train_data, 50, 50, 0.02, test_data)
score = bp.evaluate(test_data) / len(test_data) print("Test accuracy is %f" % score)
def test_mnist(): path = '/home/wangzx/usr/share/data/mnist2/' train_X = pkl.load(open(path + 'mnist_train_X.pkl')) train_y = pkl.load(open(path + 'mnist_train_y.pkl')).ravel() test_X = pkl.load(open(path + 'mnist_test_X.pkl')) test_y = pkl.load(open(path + 'mnist_test_y.pkl')).ravel() train_data = [(np.asarray(f).reshape((-1,1)), t) for f, t in zip(train_X, train_y)] test_data = [(np.asarray(f).reshape((-1,1)), t) for f, t in zip(test_X, test_y)] n_sample, ndim = train_X.shape bp = Network([ndim, 10])
print("Begin fit training data") bp.SGD(train_data, 50, 50, 0.1, test_data)
score = bp.evaluate(test_data) / len(test_data) print("Test accuracy is %f" % score) if __name__ == "__main__": pass
|