本系列是对 的经典实现ps-lite的源码解析,共分4章:
第一章,算是热身,先讲一下我们为什么需要 ,然后再分析一下的入门demo程序,最后介绍中的几个基本概念,为后续阅读打好基础。第二章,讲解中的和Van两个类,涉及中节点管理、消息分发、多机同步等方面的知识。第三章,讲解中的, 和三个类,介绍最常用的两个动作Push/Pull是如何实现的。第四章(本章),掌握了的运行原理后,让我们看看基于如何实现一个分布式的FM算法。
另外,用文字来描述代码流程,是苍白无力的。 为此,我提供了一份ps-lite源码思维导图。感兴趣的同学,关注我的微信公众号“推荐道”,回复领取。下载免费版打开后,读我的解析文章与看思维导图相结合,更容易理解。
本章要分析的分布式FM源码见xflow。这个项目有如下看点:
代码
定义在.h中,可以看到在构造函数中先建立并启动了了对一阶权重w和对的两个。注意,由于要读写两类数据,所以两个分别拥有独立,针对一阶权重w的=0,针对 "v"的=1。
Server() {
server_w_ = new ps::KVServer<float>(0);// app_id=0
server_w_->set_request_handle(FTRL::KVServerFTRLHandle_w());
server_v_ = new ps::KVServer<float>(1);// app_id=1
server_v_->set_request_handle(FTRL::KVServerFTRLHandle_v());
}
具体逻辑,即接收到发来的梯度后,用FTRL算法更新一阶权重"w"和 "v",分别实现在和中。两个类的逻辑差不多,只不过是将一阶权重w看成=1的。所以接下来重点看是如何更新的。
存储
ps 用:map
store来存储各的。其中使用到的定义如下。
typedef struct FTRLEntry_v {
FTRLEntry_v(int k = v_dim) {// v_dim在代码里定义成10
w.resize(k, 0.0);
n.resize(k, 0.0);
z.resize(k, 0.0);
}
std::vector<float> w;//embedding向量
std::vector<float> n;//FTRL要用到的中间变量
std::vector<float> z;//FTRL要用到的中间谜题
} ftrlentry_v;
从以上代码可以看到,为了使用FTRL算法来更新权重,对于每个,端不仅要存储其,还要存储n和z两个中间变量。在推荐系统这样一个动辄特征量就上亿的场景下,这会占用极大量的内存。因此,现在很多分布式优化算法在下功夫,减少中间变量存储。
struct KVServerFTRLHandle_v {
void operator()(const ps::KVMeta& req_meta,const ps::KVPairs<float>& req_data,ps::KVServer<float>* server) {
size_t keys_size = req_data.keys.size();
ps::KVPairs<float> res;
if (req_meta.push) {
size_t vals_size = req_data.vals.size();
// 做sanity check,因为每个embedding都有v_dim那么长,所以vals_size == keys_size * v_dim
CHECK_EQ(keys_size, vals_size / v_dim);
} else {
res.keys = req_data.keys;
res.vals.resize(keys_size * v_dim);
}
// ************** 遍历请求的每个key
for (size_t i = 0; i < keys_size; ++i) {
ps::Key key = req_data.keys[i];
// 如果第一次访问某个key,随机初始化
// 更先进的算法是设立准入规则,没必要为只出现一两次的特征浪费存储空间
if (store.find(key) == store.end()) {
FTRLEntry_v val(v_dim);;
for (int k = 0; k < v_dim; ++k) {
val.w[k] = Base::local_normal_real_distribution<double>(0.0, 1.0)(Base::local_random_engine()) * 1e-2;
}
store[key] = val;
}
FTRLEntry_v& val = store[key];
// ************** 遍历embedding的每一位
for (int j = 0; j < v_dim; ++j) {
if (req_meta.push) {
// ---------- 处理push请求,用FTRL算法根据梯度更新embedding
float g = req_data.vals[i * v_dim + j];
float old_n = val.n[j];
float n = old_n + g * g;
val.z[j] += g -
(std::sqrt(n) - std::sqrt(old_n)) / alpha * val.w[j];
val.n[j] = n;
if (std::abs(val.z[j]) <= lambda1) {
val.w[j] = 0.0;//更新embedding
} else {
float tmpr = 0.0;
if (val.z[j] > 0.0) tmpr = val.z[j] - lambda1;
if (val.z[j] < 0.0) tmpr = val.z[j] + lambda1;
float tmpl = -1 * ((beta + std::sqrt(val.n[j]))/alpha + lambda2);
val.w[j] = tmpr / tmpl;//更新embedding
}
} else {
// ---------- 处理pull请求,把embedding按位复制进response
// vals是将所有key对应的embedding拼接并打平成一个大数组
res.vals[i * v_dim + j] = val.w[j];
}
}
}
server->Response(req_meta, res);
}
private:
std::unordered_map<ps::Key, ftrlentry_v> store;//存储参数
};
代码
FM的端代码定义在.h和中
构造函数
FMWorker(const char *train_file, const char *test_file) :train_file_path(train_file),test_file_path(test_file) {
kv_w = new ps::KVWorker<float>(0);// app_id=0
kv_v = new ps::KVWorker<float>(1);// app_id=1
......
}
同样是由于FM要读写一阶权重w和 "v"两种类型的数据,所以在侧也定义了两个,各自拥有独立的app_d,针对一阶权重w的=0,针对 "v"的=1。拥有相同的/之间才能通信。
:训练入口
侧训练的入口在::。
void FMWorker::batch_training(ThreadPool* pool) {
......
for (int epoch = 0; epoch < epochs; ++epoch) {//把数据过若干epoch
xflow::LoadData train_data_loader(train_data_path, block_size << 20);
train_data = &(train_data_loader.m_data);
int block = 0;
while (1) {// 循环直到将本次的训练数据都读完
// 读取一个mini-batch的数据存放在train_data->fea_matrix中
train_data_loader.load_minibatch_hash_data_fread();
if (train_data->fea_matrix.size() <= 0) break;
// 把这个mini-batch的训练数据平分到各个thread上
// 每个thread分到thread_size条训练数据
int thread_size = train_data->fea_matrix.size() / core_num;
gradient_thread_finish_num = core_num;
for (int i = 0; i < core_num; ++i) {//遍历启动各训练线程
int start = i * thread_size;// 当前线程分到的数据的起始位置
int end = (i + 1)* thread_size;// 当前线程分到的数据的截止位置
// 启动线程运行FMWorker::update,训练当前mini-batch的局部数据
pool->enqueue(std::bind(&FMWorker::update, this, start, end));
}
while (gradient_thread_finish_num > 0) {//等待所有thread结束
usleep(5);
}
++block;
}
if ((epoch + 1) % 30 == 0) std::cout << "epoch : " << epoch << std::endl;
train_data = NULL;
}
}
:真正执行训练
void FMWorker::update(int start, int end) {//运行在独立线程中,真正执行一部分数据的训练
size_t idx = 0;
auto all_keys = std::vector<Base::sample_key>();
auto unique_keys = std::vector<ps::Key>();
int line_num = 0;
// ******************* 遍历分配给自己的数据,重新组织feature
for (int row = start; row < end; ++row) {
// sample_size是第row行样本内部包含的特征个数
int sample_size = train_data->fea_matrix[row].size();
Base::sample_key sk;
sk.sid = line_num;// sid,即sample_id,用于标识特征属于哪条样本
for (int j = 0; j < sample_size; ++j) {//遍历当前样本的每个特征
idx = train_data->fea_matrix[row][j].fid;
sk.fid = idx;// fid,即feature_id
all_keys.push_back(sk);
(unique_keys).push_back(idx);
}
++line_num;
}
// 把分配给自己的那部分数据中的所有特征,按feature_id重新排序
// 相同feature_id,但分属不同sample的feature排列在一起
std::sort(all_keys.begin(), all_keys.end(), base_->sort_finder);
// 把这部分数据中出现的所有feature_id去重
// 因为向ps server pull的时候,没必要重复pull相同的key
std::sort((unique_keys).begin(), (unique_keys).end());
(unique_keys).erase(unique((unique_keys).begin(), (unique_keys).end()),
(unique_keys).end());
int keys_size = (unique_keys).size();//去重后的feature_id个数
// ******************* 用去重后的feature_ids从ps server拉取最新的参数
// 拉取一阶权重"w"
auto w = std::vector<float>();
kv_w->Wait(kv_w->Pull(unique_keys, &w));
// 定义空间,等待容纳各feature的一阶权重'w'上的导数
auto push_w_gradient = std::vector<float>(keys_size);
// 拉取embedding "v"
auto v = std::vector<float>();
kv_v->Wait(kv_v->Pull(unique_keys, &v));
// 定义空间,等待容纳各feature的embedding上的导数
// 由于embedding是个向量,所以需要定义的空间是keys_size(去重后有多少feaute)*v_dim_(每个embedding的长度)
auto push_v_gradient = std::vector<float>(keys_size * v_dim_);
// ******************* 前代
auto loss = std::vector<float>(end - start);
auto v_sum = std::vector<float>(end - start);
// loss这个名字取得不好,其实里面存储的是loss针对final logit的层数
calculate_loss(w, v, all_keys, unique_keys, start, end, v_sum, loss);
// ******************* 回代
// 输出push_w_gradient: loss对各feature的一阶权重'w'上的导数
// 输出push_v_gradient: loss对各feature的embedding上的导数
calculate_gradient(all_keys, unique_keys, start, end, v, v_sum, loss,
push_w_gradient, push_v_gradient);
// ******************* 向Sever push梯度,让server端更新优化参数
// 注意!!!这里是每个worker thread各push各自的,完全没有与其他worker同步,因此这里实现的是异步模式。
kv_w->Wait(kv_w->Push(unique_keys, push_w_gradient));
kv_v->Wait(kv_v->Push(unique_keys, push_v_gradient));
--gradient_thread_finish_num;//代表有一个线程完成了
}
同步还是异步?
需要注意的是,代码最后的kv_w->Wait(kv_w->Push和kv_v->Wait(kv_v->Push,尽管调用了Wait,但并不是在实现 SGD。这个 FM的实现,本质上仍然是 SGD。 如前所述,Wait只作用于单机,这里只是等待异步的Push完成,但是这个 并没有等待其他 ,就开始训练下一个batch,即,没有多机间的协调。(这个代码写得有点复杂,其实代码中是将一个 中的多个 进行了同步,但是多个 之间仍然缺乏同步)
如果要基于实现同步SGD,还有点复杂:
而现代针对推荐系统优化的PS,往往需要针对用异步更新,面对dnn 用同步更新。想想就复杂,此间先按下不表。
前代 & 回代
前代发生在::,回代发生在::。
理解两个函数的关键在于,记住这两个函数的调用者,已经将训练数据中的所有信息(代码里的,记录某的和所属),根据排好序了,拥有相同但分属不同的信息(i.e., )排列在一起。
这么做的目的,是为了能够高效访问拉来的数据。由于具备相同的特征信息排列在一起,那个对应的一阶权重"w"和 "v",只需要查找并提取一次(由于拉的数据也是按照key排好的序的,查找都免了,顺序访问就好),避免根据在拉来数据中反复查找带来的时间浪费。
两个函数的结构非常类似,只看::,另一个留待读者自己查看源码。
void FMWorker::calculate_loss(std::vector<float>& w,
std::vector<float>& v,
std::vector<Base::sample_key>& all_keys,
std::vector<ps::Key>& unique_keys,
size_t start, size_t end,
std::vector<float>& v_sum,
std::vector<float>& loss) {
// *************** 每条样本,累加一阶权重
auto wx = std::vector<float>(end - start);
for (int j = 0, i = 0; j < all_keys.size(); ) {
// j: 遍历每个feature信息 (sample_key)的游标
// i: 遍历拉下来的每个feature一阶权重的游标
size_t allkeys_fid = all_keys[j].fid;
size_t weight_fid = (unique_keys)[i];
if (allkeys_fid == weight_fid) {//还使用当前的w
// 把权重w[i]累加到第[all_keys[j].sid]个样本的logit上
wx[all_keys[j].sid] += (w)[i];
++j;
} else if (allkeys_fid > weight_fid) {//移到下一个w
++i;
}
}
// *************** 每条样本,累加所有embedding的两两交叉
auto v_pow_sum = std::vector<float>(end - start);
for (size_t k = 0; k < v_dim_; k++) {//遍历embedding的各位
for (size_t j = 0, i = 0; j < all_keys.size(); ) {
// j: 遍历每个feature信息 (sample_key)的游标
// i: 遍历拉下来的每个feature一阶权重的游标
size_t allkeys_fid = all_keys[j].fid;
size_t weight_fid = unique_keys[i];
if (allkeys_fid == weight_fid) {//还使用当前embedding
size_t sid = all_keys[j].sid;
float v_weight = v[i * v_dim_ + k];
v_sum[sid] += v_weight;
v_pow_sum[sid] += v_weight * v_weight;
++j;
} else if (allkeys_fid > weight_fid) {//移到下一个embedding
++i;
}
}
}
auto v_y = std::vector<float>(end - start);
for (size_t i = 0; i < end - start; ++i) {
v_y[i] = v_sum[i] * v_sum[i] - v_pow_sum[i];
}
// *************** 每条样本,计算loss对logit的梯度
for (int i = 0; i < wx.size(); i++) {// 遍历每条样本
float pctr = base_->sigmoid(wx[i] + v_y[i]);
// loss这个名字起得太有误导性,loss[i]不是每i条样本上的loss
// loss[i]是第i条样本上的loss对第i条样本预测logit的梯度
loss[i] = pctr - train_data->label[start++];
}
}
- END -
评论(0)