新闻推荐系统源代码之推荐业务逻辑控制中心( 三 )

if temp.time_stamp < last_stamp:try:row = self.hbu.get_table_cells('history_recommend','reco:his:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),timestamp=temp.time_stamp + 1,include_timestamp=True)except Exception as e:row = []res = []if not row:temp.time_stamp = 0res = []elif len(row) == 1 and row[0][1] == temp.time_stamp:res = eval(row[0][0])temp.time_stamp = 0elif len(row) >= 2:res = eval(row[0][0])temp.time_stamp = int(row[1][1])res = list(map(int, res))# 封装推荐结果track = add_track(res, temp)# 曝光参数设置为空track['param'] = ''(注意:这里将用户请求的时间戳 +1 , 因为 Hbase 只能获取小于该时间戳的历史推荐结果)

  • 如果用户请求的时间戳大于 Hbase 历史推荐结果中最近一次请求的时间戳 last_stamp , 那么该请求为用户刷新推荐列表 , 需要读取推荐结果并返回 。 如果结果为空 , 需要调用 user_reco_list() 方法 , 再次计算推荐结果 , 再返回 。
if temp.time_stamp > last_stamp:# 获取缓存res = redis_cache.get_reco_from_cache(temp, self.hbu)# 如果结果为空 , 需要再次计算推荐结果 进行召回+排序 , 同时写入到hbase待推荐结果列表if not res:res = self.user_reco_list(temp)temp.time_stamp = int(last_stamp)track = add_track(res, temp)定义 user_reco_list() 方法 , 首先要读取多路召回结果 , 根据为用户分配的算法策略 , 读取相应路径的召回结果 , 并进行重后合并
reco_set = []# (1, [100, 101, 102, 103, 104], [])for number in RAParam.COMBINE[temp.algo][1]:if number == 103:_res = self.recall_service.read_redis_new_article(temp.channel_id)reco_set = list(set(reco_set).union(set(_res)))elif number == 104:_res = self.recall_service.read_redis_hot_article(temp.channel_id)reco_set = list(set(reco_set).union(set(_res)))else:# 100, 101, 102召回结果读取_res = self.recall_service.read_hbase_recall(RAParam.RECALL[number][0],'recall:user:{}'.format(temp.user_id).encode(),'{}:{}'.format(RAParam.RECALL[number][1],temp.channel_id).encode())reco_set = list(set(reco_set).union(set(_res)))接着 , 过滤当前该请求频道的历史推荐结果 , 如果不是 0 频道还需过滤 0 频道的历史推荐结果
history_list = []data = http://kandian.youth.cn/index/self.hbu.get_table_cells('history_recommend','reco:his:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode())for _ in data:history_list = list(set(history_list).union(set(eval(_))))data = http://kandian.youth.cn/index/self.hbu.get_table_cells('history_recommend','reco:his:{}'.format(temp.user_id).encode(),'channel:{}'.format(0).encode())for _ in data:history_list = list(set(history_list).union(set(eval(_))))reco_set = list(set(reco_set).difference(set(history_list)))最后 , 根据分配的算法策略 , 调用排序服务 , 将分数最高的 N 个推荐结果返回 , 并写入历史推荐结果表 , 如果还有剩余的排序结果 , 将其余写入待推荐结果表
# 使用指定模型对召回结果进行排序# temp.user_id ,reco_set_sort_num = RAParam.COMBINE[temp.algo][2][0]# 'LR'reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)if not reco_set:return reco_setelse:# 如果reco_set小于用户需要推荐的文章if len(reco_set) <= temp.article_num:res = reco_setelse:# 大于要推荐的文章结果res = reco_set[:temp.article_num]# 将剩下的文章列表写入待推荐的结果self.hbu.get_table_put('wait_recommend','reco:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),str(reco_set[temp.article_num:]).encode(),timestamp=temp.time_stamp)# 直接写入历史记录当中 , 表示这次又成功推荐一次self.hbu.get_table_put('history_recommend','reco:his:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),str(res).encode(),timestamp=temp.time_stamp)return res到这里 , 推荐中心的基本逻辑已经结束 。 下面是读取多路召回结果的实现细节:通过指定列族 , 读取基于模型、离线内容以及在线的召回结果 , 并删除 cb_recall 的召回结果
def read_hbase_recall_data(self, table_name, key_format, column_format):"""读取cb_recall当中的推荐数据读取的时候可以选择列族进行读取als, online, content:return:"""recall_list = []data = http://kandian.youth.cn/index/self.hbu.get_table_cells(table_name, key_format, column_format)# data是多个版本的推荐结果[[],[],[],]for _ in data:recall_list = list(set(recall_list).union(set(eval(_))))self.hbu.get_table_delete(table_name, key_format, column_format)return recall_list读取 redis 中的新文章
def read_redis_new_article(self, channel_id):"""读取新文章召回结果:param channel_id: 提供频道:return:"""_key = "ch:{}:new".format(channel_id)try:res = self.client.zrevrange(_key, 0, -1)except Exception as e:res = []return list(map(int, res))