导航菜单
首页 » 问答 » 正文

重温经典之ps-lite源码解析(4):实现分布式FM

本系列是对 的经典实现ps-lite的源码解析,共分4章:

第一章,算是热身,先讲一下我们为什么需要 ,然后再分析一下的入门demo程序,最后介绍中的几个基本概念,为后续阅读打好基础。第二章,讲解中的和Van两个类,涉及中节点管理、消息分发、多机同步等方面的知识。第三章,讲解中的, 和三个类,介绍最常用的两个动作Push/Pull是如何实现的。第四章(本章),掌握了的运行原理后,让我们看看基于如何实现一个分布式的FM算法。

另外,用文字来描述代码流程,是苍白无力的。 为此,我提供了一份ps-lite源码思维导图。感兴趣的同学,关注我的微信公众号“推荐道”,回复领取。下载免费版打开后,读我的解析文章与看思维导图相结合,更容易理解。

本章要分析的分布式FM源码见xflow。这个项目有如下看点:

代码

定义在.h中,可以看到在构造函数中先建立并启动了了对一阶权重w和对的两个。注意,由于要读写两类数据,所以两个分别拥有独立,针对一阶权重w的=0,针对 "v"的=1。

  Server() {
    server_w_ = new ps::KVServer<float>(0);// app_id=0
    server_w_->set_request_handle(FTRL::KVServerFTRLHandle_w());
    server_v_ = new ps::KVServer<float>(1);// app_id=1
    server_v_->set_request_handle(FTRL::KVServerFTRLHandle_v());
  }

具体逻辑,即接收到发来的梯度后,用FTRL算法更新一阶权重"w"和 "v",分别实现在和中。两个类的逻辑差不多,只不过是将一阶权重w看成=1的。所以接下来重点看是如何更新的。

存储

ps 用:map

store来存储各的。其中使用到的定义如下。

  typedef struct FTRLEntry_v {
    FTRLEntry_v(int k = v_dim) {// v_dim在代码里定义成10
      w.resize(k, 0.0);
      n.resize(k, 0.0);
      z.resize(k, 0.0);
    }
    std::vector<float> w;//embedding向量
    std::vector<float> n;//FTRL要用到的中间变量
    std::vector<float> z;//FTRL要用到的中间谜题
  } ftrlentry_v;

从以上代码可以看到,为了使用FTRL算法来更新权重,对于每个,端不仅要存储其,还要存储n和z两个中间变量。在推荐系统这样一个动辄特征量就上亿的场景下,这会占用极大量的内存。因此,现在很多分布式优化算法在下功夫,减少中间变量存储。

struct KVServerFTRLHandle_v {
    void operator()(const ps::KVMeta& req_meta,const ps::KVPairs<float>& req_data,ps::KVServer<float>* server) {
      size_t keys_size = req_data.keys.size();
      ps::KVPairs<float> res;
      if (req_meta.push) {
        size_t vals_size = req_data.vals.size();
        // 做sanity check,因为每个embedding都有v_dim那么长,所以vals_size == keys_size * v_dim
        CHECK_EQ(keys_size, vals_size / v_dim);
      } else {
        res.keys = req_data.keys;
        res.vals.resize(keys_size * v_dim);
      }
      // ************** 遍历请求的每个key
      for (size_t i = 0; i < keys_size; ++i) {
        ps::Key key = req_data.keys[i];
        // 如果第一次访问某个key,随机初始化
        // 更先进的算法是设立准入规则,没必要为只出现一两次的特征浪费存储空间
        if (store.find(key) == store.end()) {
          FTRLEntry_v val(v_dim);;
          for (int k = 0; k < v_dim; ++k) {
            val.w[k] = Base::local_normal_real_distribution<double>(0.0, 1.0)(Base::local_random_engine()) * 1e-2;
          }
          store[key] = val;
        }
        FTRLEntry_v& val = store[key];
      // ************** 遍历embedding的每一位
        for (int j = 0; j < v_dim; ++j) {
          if (req_meta.push) {
            // ---------- 处理push请求,用FTRL算法根据梯度更新embedding
            float g = req_data.vals[i * v_dim + j];
            float old_n = val.n[j];
            float n = old_n + g * g;
            val.z[j] += g -
                        (std::sqrt(n) - std::sqrt(old_n)) / alpha * val.w[j];
            val.n[j] = n;
            if (std::abs(val.z[j]) <= lambda1) {
              val.w[j] = 0.0;//更新embedding
            } else {
              float tmpr = 0.0;
              if (val.z[j] > 0.0) tmpr = val.z[j] - lambda1;
              if (val.z[j] < 0.0) tmpr = val.z[j] + lambda1;
              float tmpl = -1 * ((beta + std::sqrt(val.n[j]))/alpha  + lambda2);
              val.w[j] = tmpr / tmpl;//更新embedding
            }
          } else {
            // ---------- 处理pull请求,把embedding按位复制进response
            // vals是将所有key对应的embedding拼接并打平成一个大数组
            res.vals[i * v_dim + j] = val.w[j];
          }
        }
      }
      server->Response(req_meta, res);
    }
   private:
    std::unordered_map<ps::Key, ftrlentry_v> store;//存储参数
  };

代码

FM的端代码定义在.h和中

构造函数

  FMWorker(const char *train_file, const char *test_file) :train_file_path(train_file),test_file_path(test_file) {
    kv_w = new ps::KVWorker<float>(0);// app_id=0
    kv_v = new ps::KVWorker<float>(1);// app_id=1
    ......
  }

同样是由于FM要读写一阶权重w和 "v"两种类型的数据,所以在侧也定义了两个,各自拥有独立的app_d,针对一阶权重w的=0,针对 "v"的=1。拥有相同的/之间才能通信。

:训练入口

侧训练的入口在::。

void FMWorker::batch_training(ThreadPool* pool) {
  ......
  for (int epoch = 0; epoch < epochs; ++epoch) {//把数据过若干epoch

    xflow::LoadData train_data_loader(train_data_path, block_size << 20);
    train_data = &(train_data_loader.m_data);
    int block = 0;
    while (1) {// 循环直到将本次的训练数据都读完

      // 读取一个mini-batch的数据存放在train_data->fea_matrix中
      train_data_loader.load_minibatch_hash_data_fread();
      if (train_data->fea_matrix.size() <= 0) break;
      // 把这个mini-batch的训练数据平分到各个thread上
      // 每个thread分到thread_size条训练数据
      int thread_size = train_data->fea_matrix.size() / core_num;
      gradient_thread_finish_num = core_num;
      for (int i = 0; i < core_num; ++i) {//遍历启动各训练线程
        int start = i * thread_size;// 当前线程分到的数据的起始位置
        int end = (i + 1)* thread_size;// 当前线程分到的数据的截止位置
        // 启动线程运行FMWorker::update,训练当前mini-batch的局部数据
        pool->enqueue(std::bind(&FMWorker::update, this, start, end));
      }
      while (gradient_thread_finish_num > 0) {//等待所有thread结束
        usleep(5);
      }
      ++block;
    }
    if ((epoch + 1) % 30 == 0) std::cout << "epoch : " << epoch << std::endl;
    train_data = NULL;
  }
}

:真正执行训练

void FMWorker::update(int start, int end) {//运行在独立线程中,真正执行一部分数据的训练
  size_t idx = 0;
  auto all_keys = std::vector<Base::sample_key>();
  auto unique_keys = std::vector<ps::Key>();
  int line_num = 0;
  // ******************* 遍历分配给自己的数据,重新组织feature
  for (int row = start; row < end; ++row) {
    // sample_size是第row行样本内部包含的特征个数
    int sample_size = train_data->fea_matrix[row].size();
    Base::sample_key sk;
    sk.sid = line_num;// sid,即sample_id,用于标识特征属于哪条样本
    for (int j = 0; j < sample_size; ++j) {//遍历当前样本的每个特征
      idx = train_data->fea_matrix[row][j].fid;
      sk.fid = idx;// fid,即feature_id
      all_keys.push_back(sk);
      (unique_keys).push_back(idx);
    }
    ++line_num;
  }
  // 把分配给自己的那部分数据中的所有特征,按feature_id重新排序
  // 相同feature_id,但分属不同sample的feature排列在一起
  std::sort(all_keys.begin(), all_keys.end(), base_->sort_finder);
  // 把这部分数据中出现的所有feature_id去重
  // 因为向ps server pull的时候,没必要重复pull相同的key
  std::sort((unique_keys).begin(), (unique_keys).end());
  (unique_keys).erase(unique((unique_keys).begin(), (unique_keys).end()),
                      (unique_keys).end());
  int keys_size = (unique_keys).size();//去重后的feature_id个数

  // ******************* 用去重后的feature_ids从ps server拉取最新的参数
  // 拉取一阶权重"w"
  auto w = std::vector<float>();
  kv_w->Wait(kv_w->Pull(unique_keys, &w));
  // 定义空间,等待容纳各feature的一阶权重'w'上的导数
  auto push_w_gradient = std::vector<float>(keys_size);
  // 拉取embedding "v"
  auto v = std::vector<float>();
  kv_v->Wait(kv_v->Pull(unique_keys, &v));
  // 定义空间,等待容纳各feature的embedding上的导数
  // 由于embedding是个向量,所以需要定义的空间是keys_size(去重后有多少feaute)*v_dim_(每个embedding的长度)
  auto push_v_gradient = std::vector<float>(keys_size * v_dim_);
  // ******************* 前代
  auto loss = std::vector<float>(end - start);
  auto v_sum = std::vector<float>(end - start);
  // loss这个名字取得不好,其实里面存储的是loss针对final logit的层数
  calculate_loss(w, v, all_keys, unique_keys, start, end, v_sum, loss);
  // ******************* 回代
  // 输出push_w_gradient: loss对各feature的一阶权重'w'上的导数
  // 输出push_v_gradient: loss对各feature的embedding上的导数
  calculate_gradient(all_keys, unique_keys, start, end, v, v_sum, loss,
                     push_w_gradient, push_v_gradient);
  // ******************* 向Sever push梯度,让server端更新优化参数
 // 注意!!!这里是每个worker thread各push各自的,完全没有与其他worker同步,因此这里实现的是异步模式。
  kv_w->Wait(kv_w->Push(unique_keys, push_w_gradient));
  kv_v->Wait(kv_v->Push(unique_keys, push_v_gradient));
  --gradient_thread_finish_num;//代表有一个线程完成了
}

同步还是异步?

需要注意的是,代码最后的kv_w->Wait(kv_w->Push和kv_v->Wait(kv_v->Push,尽管调用了Wait,但并不是在实现 SGD。这个 FM的实现,本质上仍然是 SGD。 如前所述,Wait只作用于单机,这里只是等待异步的Push完成,但是这个 并没有等待其他 ,就开始训练下一个batch,即,没有多机间的协调。(这个代码写得有点复杂,其实代码中是将一个 中的多个 进行了同步,但是多个 之间仍然缺乏同步)

如果要基于实现同步SGD,还有点复杂:

而现代针对推荐系统优化的PS,往往需要针对用异步更新,面对dnn 用同步更新。想想就复杂,此间先按下不表。

前代 & 回代

前代发生在::,回代发生在::。

理解两个函数的关键在于,记住这两个函数的调用者,已经将训练数据中的所有信息(代码里的,记录某的和所属),根据排好序了,拥有相同但分属不同的信息(i.e., )排列在一起。

这么做的目的,是为了能够高效访问拉来的数据。由于具备相同的特征信息排列在一起,那个对应的一阶权重"w"和 "v",只需要查找并提取一次(由于拉的数据也是按照key排好的序的,查找都免了,顺序访问就好),避免根据在拉来数据中反复查找带来的时间浪费。

两个函数的结构非常类似,只看::,另一个留待读者自己查看源码。

void FMWorker::calculate_loss(std::vector<float>& w,
    std::vector<float>& v,
    std::vector<Base::sample_key>& all_keys,
    std::vector<ps::Key>& unique_keys,
    size_t start, size_t end,
    std::vector<float>& v_sum,
    std::vector<float>& loss) {
  // *************** 每条样本,累加一阶权重
  auto wx = std::vector<float>(end - start);
  for (int j = 0, i = 0; j < all_keys.size(); ) {
    // j: 遍历每个feature信息 (sample_key)的游标
    // i: 遍历拉下来的每个feature一阶权重的游标
    size_t allkeys_fid = all_keys[j].fid;
    size_t weight_fid = (unique_keys)[i];
    if (allkeys_fid == weight_fid) {//还使用当前的w
      // 把权重w[i]累加到第[all_keys[j].sid]个样本的logit上
      wx[all_keys[j].sid] += (w)[i];
      ++j;
    } else if (allkeys_fid > weight_fid) {//移到下一个w
      ++i;
    }
  }
  // *************** 每条样本,累加所有embedding的两两交叉
  auto v_pow_sum = std::vector<float>(end - start);
  for (size_t k = 0; k < v_dim_; k++) {//遍历embedding的各位
    for (size_t j = 0, i = 0; j < all_keys.size(); ) {
    // j: 遍历每个feature信息 (sample_key)的游标
    // i: 遍历拉下来的每个feature一阶权重的游标
      size_t allkeys_fid = all_keys[j].fid;
      size_t weight_fid = unique_keys[i];
      if (allkeys_fid == weight_fid) {//还使用当前embedding
        size_t sid = all_keys[j].sid;
        float v_weight = v[i * v_dim_ + k];
        v_sum[sid] += v_weight;
        v_pow_sum[sid] += v_weight * v_weight;
        ++j;
      } else if (allkeys_fid > weight_fid) {//移到下一个embedding
        ++i;
      }
    }
  }
  auto v_y = std::vector<float>(end - start);
  for (size_t i = 0; i < end - start; ++i) {
    v_y[i] = v_sum[i] * v_sum[i] - v_pow_sum[i];
  }
  // *************** 每条样本,计算loss对logit的梯度
  for (int i = 0; i < wx.size(); i++) {// 遍历每条样本
    float pctr = base_->sigmoid(wx[i] + v_y[i]);
    // loss这个名字起得太有误导性,loss[i]不是每i条样本上的loss
    // loss[i]是第i条样本上的loss对第i条样本预测logit的梯度
    loss[i] = pctr - train_data->label[start++];
  }
}

- END -

评论(0)

二维码