本文最后更新于 394 天前,其中的信息可能已经过时,如有错误请发送邮件到 wuxianglongblog@163.com
| import theano, time |
| import theano.tensor as T |
| import numpy as np |
| |
| def floatX(X): |
| return np.asarray(X, dtype=theano.config.floatX) |
| Using gpu device 1: Tesla C2075 (CNMeM is disabled) |
theano
中可以使用 scan
进行循环,常用的 map
和 reduce
操作都可以看成是 scan
的特例。
scan
通常作用在一个序列上,每次处理一个输入,并输出一个结果。
sum(x)
函数可以看成是 z + x(i)
函数在给定 z = 0
的情况下,对 x
的一个 scan
。
通常我们可以将一个 for
循环表示成一个 scan
操作,其好处如下:
- 迭代次数成为符号图结构的一部分
- 最小化 GPU 数据传递
- 序列化梯度计算
- 速度比
for
稍微快一些
- 降低内存使用
函数的用法如下:
| theano.scan(fn, |
| sequences=None, |
| outputs_info=None, |
| non_sequences=None, |
| n_steps=None, |
| truncate_gradient=-1, |
| go_backwards=False, |
| mode=None, |
| name=None, |
| profile=False, |
| allow_gc=None, |
| strict=False) |
主要参数的含义:
fn
sequences
outputs_info
non_sequences
n_steps
go_backwards
输出为一个元组 (outputs, updates)
:
outputs
updates
- 一个字典,用来记录
scan
过程中用到的共享变量更新规则,构造函数的时候,如果需要更新共享变量,将这个变量当作 updates
的参数传入。
这里实现一个简单的 map
操作,将向量 $\mathbf x$ 中的所有元素变成原来的两倍:
| x = T.vector() |
| |
| results, _ = theano.scan(fn = lambda t: t * 2, |
| sequences = x) |
| x_double_scan = theano.function([x], results) |
| |
| print x_double_scan(range(10)) |
| [ 0. 2. 4. 6. 8. 10. 12. 14. 16. 18.] |
之前我们说到,theano
中的 map
是 scan
的一个特例,因此 theano.map
的用法其实跟 theano.scan
十分类似。
由于不需要考虑前一步的输出结果,所以 theano.map
的参数中没有 outputs_info
这一部分。
我们用 theano.map
实现相同的效果:
| result, _ = theano.map(fn = lambda t: t * 2, |
| sequences = x) |
| x_double_map = theano.function([x], result) |
| |
| print x_double_map(range(10)) |
| [ 0. 2. 4. 6. 8. 10. 12. 14. 16. 18.] |
这里一个简单的 reduce
操作,求和:
| reduce(lambda a, b: a + b, x) |
| result, _ = theano.scan(fn = lambda t, v: t + v, |
| sequences = x, |
| outputs_info = floatX(0.)) |
| |
| |
| x_sum_scan = theano.function([x], result[-1]) |
| |
| |
| print x_sum_scan(range(10)) |
theano.reduce
也是 scan
的一个特例,使用 theano.reduce
实现相同的效果:
| result, _ = theano.reduce(fn = lambda t, v: t + v, |
| sequences = x, |
| outputs_info = 0.) |
| |
| x_sum_reduce = theano.function([x], result) |
| |
| |
| print x_sum_reduce(range(10)) |
reduce
与 scan
不同的地方在于,result
包含的内容并不是每次输出的结果,而是最后一次输出的结果。
fn
是一个函数句柄,对于这个函数句柄,它每一步接受的参数是由 sequences, outputs_info, non_sequence
这三个参数所决定的,并且按照以下的顺序排列:
sequences
中第一个序列的值
- ...
sequences
中最后一个序列的值
outputs_info
中第一个输出之前的值
- ...
outputs_info
中最后一个输出之前的值
non_squences
中的参数
这些序列的顺序与在参数 sequences, outputs_info
中指定的顺序相同。
默认情况下,在第 k
次迭代时,如果 sequences
和 outputs_info
中给定的值不是字典(dictionary
)或者一个字典列表(list of dictionaries
),那么
sequences
中的序列 seq
传入 fn
的是 seq[k]
的值
outputs_info
中的序列 output
传入 fn
的是 output[k-1]
的值
fn
的返回值有两部分 (outputs_list, update_dictionary)
,第一部分将作为序列,传入 outputs
中,与 outputs_info
中的初始输入值的维度一致(如果没有给定 outputs_info
,输出值可以任意。)
第二部分则是更新规则的字典,告诉我们如何对 scan
中使用到的一些共享的变量进行更新:
| return [y1_t, y2_t], {x:x+1} |
这两部分可以任意,即顺序既可以是 (outputs_list, update_dictionary)
, 也可以是 (update_dictionary, outputs_list)
,theano
会根据类型自动识别。
两部分只需要有一个存在即可,另一个可以为空。
例如,在我们的第一个例子中
| theano.scan(fn = lambda t: t * 2, |
| sequences = x) |
在第 k
次迭代的时候,传入参数 t
的值为 x[k]
。
再如,在我们的第二个例子中:
| theano.scan(fn = lambda t, v: t + v, |
| sequences = x, |
| outputs_info = floatX(0.)) |
fn
接受了两个参数,初始迭代时,按照规则,t
接受的参数为 x[0]
,v
接受的参数为我们传入 outputs_info
的第一个初始值即 0
(认为是 outputs[-1]
),他们的结果 t+v
将作为 outputs[0]
的值传入下一次迭代以及最终 scan
输出的 outputs
值中。
我们可以一次输入多个序列,这些序列会按照顺序传入 fn 的参数中,例如计算多项式
时,我们可以将多项式的系数和幂数两个序列放到一个 list
中作为输入参数:
| |
| x = T.scalar("x") |
| |
| |
| A = T.vectors("A") |
| |
| |
| N = T.ivectors("N") |
| |
| |
| components, _ = theano.scan(fn = lambda a, n, v: a * (v ** n), |
| sequences = [A, N], |
| non_sequences = x) |
| |
| result = components.sum() |
| |
| polynomial = theano.function([x, A, N], result) |
| |
| |
| print polynomial(floatX(10), |
| floatX([1, 3, 2]), |
| [0, 2, 3]) |
默认情况下,我们只能使用输入序列的当前时刻的值,以及前一个输出的输出值。
事实上,theano
会将参数中的序列变成一个有 input
和 taps
两个键值的 dict
:
input
:输入的序列
taps
:要传入 fn
的值的列表
- 对于
sequences
参数中的序列来说,默认值为 [0],表示时间 t
传入 t+0
时刻的序列值,可以为正,可以为负。
- 对于
outputs_info
参数中的序列来说,默认值为 [-1],表示时间 t
传入 t-1
时刻的序列值,只能为负值,如果值为 None
,表示这个输出结果不会作为参数传入 fn
中。
传入 fn
的参数也会按照 taps
中的顺序来排列,我们考虑下面这个例子:
| scan(fn, sequences = [ dict(input= Sequence1, taps = [-3,2,-1]) |
| , Sequence2 |
| , dict(input = Sequence3, taps = 3) ] |
| , outputs_info = [ dict(initial = Output1, taps = [-3,-5]) |
| , dict(initial = Output2, taps = None) |
| , Output3 ] |
| , non_sequences = [ Argument1, Argument2]) |
首先是 Sequence1
的 [-3, 2, -1]
被传入,然后 Sequence2
不是 dict
, 所以传入默认值 [0]
,Sequence3
传入的参数是 3
,所以 fn
在第 t
步接受的前几个参数是:
| Sequence1[t-3] |
| Sequence1[t+2] |
| Sequence1[t-1] |
| Sequence2[t] |
| Sequence3[t+3] |
然后 Output1
传入的是 [-3, -5]
(传入的初始值的形状应为 shape (5,)+
),Output2
不作为参数传入,Output3
传入的是 [-1]
,所以接下的参数是:
| Output1[t-3] |
| Output1[t-5] |
| Output3[t-1] |
| Argument1 |
| Argument2 |
总的说来上面的例子中,fn
函数按照以下顺序最多接受这样 10 个参数:
| Sequence1[t-3] |
| Sequence1[t+2] |
| Sequence1[t-1] |
| Sequence2[t] |
| Sequence3[t+3] |
| Output1[t-3] |
| Output1[t-5] |
| Output3[t-1] |
| Argument1 |
| Argument2 |
例子,假设 $x$ 是我们的输入,$y$ 是我们的输出,我们需要计算 $y (t) = tanh\left [W{1} y(t-1) + W{2} x (t) + W_{3} x (t-1)\right]$ 的值:
| X = T.matrix("X") |
| Y = T.vector("y") |
| |
| W_1 = T.matrix("W_1") |
| W_2 = T.matrix("W_2") |
| W_3 = T.matrix("W_3") |
| |
| |
| results, _ = theano.scan(fn = lambda x, x_pre, y: T.tanh(T.dot(W_1, y) + T.dot(W_2, x) + T.dot(W_3, x_pre)), |
| |
| sequences = dict(input=X, taps=[0, -1]), |
| outputs_info = Y) |
| |
| Y_seq = theano.function(inputs = [X, Y, W_1, W_2, W_3], |
| outputs = results) |
测试小矩阵计算:
| |
| t = 1001 |
| x_dim = 10 |
| y_dim = 20 |
| |
| x = 2 * floatX(np.random.random([t, x_dim])) - 1 |
| y = 2 * floatX(np.zeros(y_dim)) - 1 |
| w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1 |
| w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 |
| w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 |
| |
| tic = time.time() |
| |
| y_res_theano = Y_seq(x, y, w_1, w_2, w_3) |
| |
| print "theano running time {:.4f} s".format(time.time() - tic) |
| |
| tic = time.time() |
| |
| y_res_numpy = np.zeros([t, y_dim]) |
| y_res_numpy[0] = y |
| |
| for i in range(1, t): |
| y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1])) |
| |
| print "numpy running time {:.4f} s".format(time.time() - tic) |
| |
| |
| print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[0:10] - y_res_numpy[1:11])) |
| theano running time 0.0537 s |
| numpy running time 0.0197 s |
| the max difference of the first 10 results is 1.25780650354e-06 |
测试大矩阵运算:
| |
| t = 1001 |
| x_dim = 100 |
| y_dim = 200 |
| |
| x = 2 * floatX(np.random.random([t, x_dim])) - 1 |
| y = 2 * floatX(np.zeros(y_dim)) - 1 |
| w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1 |
| w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 |
| w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1 |
| |
| tic = time.time() |
| |
| y_res_theano = Y_seq(x, y, w_1, w_2, w_3) |
| |
| print "theano running time {:.4f} s".format(time.time() - tic) |
| |
| tic = time.time() |
| |
| y_res_numpy = np.zeros([t, y_dim]) |
| y_res_numpy[0] = y |
| |
| for i in range(1, t): |
| y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1])) |
| |
| print "numpy running time {:.4f} s".format(time.time() - tic) |
| |
| |
| print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[:10] - y_res_numpy[1:11])) |
| theano running time 0.0754 s |
| numpy running time 0.1334 s |
| the max difference of the first 10 results is 0.000656997077348 |
值得注意的是,由于 theano
和 numpy
在某些计算的实现上存在一定的差异,随着序列长度的增加,这些差异将被放大:
| for i in xrange(20): |
| print "iter {:03d}, max diff:{:.6f}".format(i + 1, |
| np.max(np.abs(y_res_numpy[i + 1,:] - y_res_theano[i,:]))) |
| iter 001, max diff:0.000002 |
| iter 002, max diff:0.000005 |
| iter 003, max diff:0.000007 |
| iter 004, max diff:0.000010 |
| iter 005, max diff:0.000024 |
| iter 006, max diff:0.000049 |
| iter 007, max diff:0.000113 |
| iter 008, max diff:0.000145 |
| iter 009, max diff:0.000334 |
| iter 010, max diff:0.000657 |
| iter 011, max diff:0.001195 |
| iter 012, max diff:0.002778 |
| iter 013, max diff:0.004561 |
| iter 014, max diff:0.004748 |
| iter 015, max diff:0.014849 |
| iter 016, max diff:0.012696 |
| iter 017, max diff:0.043639 |
| iter 018, max diff:0.046540 |
| iter 019, max diff:0.083032 |
| iter 020, max diff:0.123678 |
假设我们要计算方阵 $A$ 的 $A^k$,$k$ 是一个未知变量,我们可以这样通过 n_steps
参数来控制循环计算的次数:
| A = T.matrix("A") |
| k = T.iscalar("k") |
| |
| results, _ = theano.scan(fn = lambda P, A: P.dot(A), |
| |
| outputs_info = T.eye(A.shape[0]), |
| |
| non_sequences = A, |
| n_steps = k) |
| |
| A_k = theano.function(inputs = [A, k], outputs = results[-1]) |
| |
| test_a = floatX([[2, -2], [-1, 2]]) |
| |
| print A_k(test_a, 10) |
| |
| |
| a_k = np.eye(2) |
| for i in range(10): |
| a_k = a_k.dot(test_a) |
| |
| print a_k |
| [[ 107616. -152192.] |
| [ -76096. 107616.]] |
| [[ 107616. -152192.] |
| [ -76096. 107616.]] |
可以在 scan
中使用并更新共享变量,例如,利用共享变量 n
,我们可以实现这样一个迭代 k
步的简单计数器:
| n = theano.shared(floatX(0)) |
| k = T.iscalar("k") |
| |
| |
| _, updates = theano.scan(fn = lambda n: {n:n+1}, |
| non_sequences = n, |
| n_steps = k) |
| |
| counter = theano.function(inputs = [k], |
| outputs = [], |
| updates = updates) |
| |
| print n.get_value() |
| counter(10) |
| print n.get_value() |
| counter(10) |
| print n.get_value() |
之前说到,fn
函数的返回值应该是 (outputs_list, update_dictionary)
或者 (update_dictionary, outputs_list)
或者两者之一。
这里 fn
函数返回的是一个字典,因此自动被放入了 update_dictionary
中,然后传入 function
的 updates
参数中进行迭代。
我们可以将 scan
设计为 loop-until
的模式,具体方法是在 scan
中,将 fn
的返回值增加一个参数,使用 theano.scan_module
来设置停止条件。
假设我们要计算所有不小于某个值的 2 的幂,我们可以这样定义:
| max_value = T.scalar() |
| |
| results, _ = theano.scan(fn = lambda v_pre, max_v: (v_pre * 2, theano.scan_module.until(v_pre * 2 > max_v)), |
| outputs_info = T.constant(1.), |
| non_sequences = max_value, |
| n_steps = 1000) |
| |
| |
| |
| |
| power_of_2 = theano.function(inputs = [max_value], outputs = results[:-1]) |
| |
| print power_of_2(40) |