新闻推荐系统源代码之推荐业务逻辑控制中心( 二 )


from collections import namedtuple# ABTest参数信息param = namedtuple('RecommendAlgorithm', ['COMBINE','RECALL','SORT','CHANNEL','BYPASS'])RAParam = param(COMBINE={'Algo-1': (1, [100, 101, 102, 103, 104], [200]),# 首页推荐 , 所有召回结果读取+LR排序'Algo-2': (2, [100, 101, 102, 103, 104], [201])# 首页推荐 , 所有召回结果读取 排序},RECALL={100: ('cb_recall', 'als'),# 离线模型ALS召回 , recall:user:1115629498121 column=als:18101: ('cb_recall', 'content'),# 离线word2vec的画像内容召回 'recall:user:5', 'content:1'102: ('cb_recall', 'online'),# 在线word2vec的画像召回 'recall:user:1', 'online:1'103: 'new_article',# 新文章召回 redis当中ch:18:new104: 'popular_article',# 基于用户协同召回结果 ch:18:hot105: ('article_similar', 'similar')# 文章相似推荐结果 '1' 'similar:2'},SORT={200: 'LR',201: 'WDL‘},CHANNEL=25,BYPASS=[{"Bucket": ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd'],"Strategy": "Algo-1"},{"BeginBucket": ['e', 'f'],"Strategy": "Algo-2"}])sort_dict = {"LR": lr_sort_service,"WDL": wdl_sort_service}流量切分 , 将用户 ID 进行哈希 , 然后取哈希结果的第一个字符 , 将包含该字符的策略桶所对应的算法编号赋值到此用户请求参数的 algo 属性中 , 后面将调用该编号对应的算法策略为此用户计算推荐数据
import hashlibfrom setting.default import DefaultConfig, RAParam# 进行分桶实现分流 , 制定不同的实验策略bucket = hashlib.md5(user_id.encode()).hexdigest()[:1]if bucket in RAParam.BYPASS[0]['Bucket']:temp.algo = RAParam.BYPASS[0]['Strategy']else:temp.algo = RAParam.BYPASS[1]['Strategy']推荐中心逻辑推荐中心逻辑主要包括:

  • 接收应用系统发送的推荐请求 , 解析请求参数
  • 进行 ABTest 分流 , 为用户分配推荐策略
  • 根据分配的算法调用召回服务和排序服务 , 读取推荐结果
  • 根据业务进行调整 , 如过滤、补足、合并信息等
  • 封装埋点参数 , 返回推荐结果

新闻推荐系统源代码之推荐业务逻辑控制中心文章插图
首先 , 在 Hbase 中创建历史推荐结果表 history_recommend , 用于存储用户历史推荐结果
create 'history_recommend', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999}86400# 每次指定一个时间戳,可以达到不同版本的效果put 'history_recommend', 'reco:his:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]继续在 Hbase 中创建待推荐结果表 wait_recommend , 用于存储经过多路召回并且排序之后的待推荐结果 , 当 wait_recommend 没有数据时 , 才再次调用排序服务计算出新的待推荐结果并写入到 wait_recommend , 所以不需设置多个版本 。 注意该表与 cb_recall 的区别 , cb_recall 存储的是还未经排序的召回结果 。
create 'wait_recommend', 'channel'put 'wait_recommend', 'reco:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]put 'wait_recommend', 'reco:1', 'channel:0', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846]用户获取 Feed 流推荐数据时 , 如果用户向下滑动 , 发出的是刷新推荐列表的请求 , 需要传入当前时间作为请求时间戳参数 , 该请求时间戳必然大于 Hbase 历史推荐结果表中的请求时间戳 , 那么程序将获取新的推荐列表 , 并返回 Hbase 历史推荐结果表中最近一次推荐的请求时间戳 , 用于查询历史推荐结果;如果用户向上滑动 , 发出的是查看历史推荐结果的请求 , 需要传入前面刷新推荐列表时返回的最近一次推荐的请求时间戳 , 该请求时间戳必然小于等于 Hbase 历史推荐结果中最近一次推荐的时间戳 , 那么程序将获取小于等于该请求时间戳的最近一次历史推荐结果 , 并返回小于该推荐结果最近一次推荐的时间戳 , 也就是上一次推荐的时间戳 , 下面是具体实现 。
在获取推荐列表时 , 首先获取用户的历史数据库中最近一次时间戳 last_stamp , 没有则将 last_stamp 置为 0
try:last_stamp = self.hbu.get_table_row('history_recommend','reco:his:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),include_timestamp=True)[1]except Exception as e:last_stamp = 0