新闻推荐系统源代码之推荐业务逻辑控制中心( 四 )

读取 redis 中的热门文章 , 并选取热度最高的前 K 个文章
def read_redis_hot_article(self, channel_id):"""读取热门文章召回结果:param channel_id: 提供频道:return:"""_key = "ch:{}:hot".format(channel_id)try:res = self.client.zrevrange(_key, 0, -1)except Exception as e:# 由于每个频道的热门文章有很多 , 因为 保留文章点击次数res = list(map(int, res))if len(res) > self.hot_num:res = res[:self.hot_num]return res读取相似文章
def read_hbase_article_similar(self, table_name, key_format, article_num):"""获取文章相似结果:param article_id: 文章id:param article_num: 文章数量:return:"""try:_dic = self.hbu.get_table_row(table_name, key_format)res = []_srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)if len(_srt) > article_num:_srt = _srt[:article_num]for _ in _srt:res.append(int(_[0].decode().split(':')[1]))except Exception as e:res = []return res使用缓存策略

  • 如果 redis 缓存中存在数据 , 就直接从 redis 缓存中获取推荐结果
  • 如果 redis 缓存为空而 Hbase 的待推荐结果表 wait_recommend 不为空 , 则从 wait_recommend 中获取推荐结果 , 并将一定数量的待推荐结果放入 redis 缓存中
  • 若 redis 和 wait_recommend 都为空 , 则需读取召回结果并进行排序 , 将排序结果写入 Hbase 的待推荐结果表 wait_recommend 中及 redis 中
(每次读取的推荐结果都要将其写入 Hbase 的历史推荐结果表 history_recommend 中)
读取 redis 缓存
#读取redis对应的键key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)# 读取 , 删除 , 返回结果pl = cache_client.pipeline()# 读取redis数据res = cache_client.zrevrange(key, 0, temp.article_num - 1)if res:# 手动删除读取出来的缓存结果pl.zrem(key, *res)如果 redis 缓存为空
else:# 删除键cache_client.delete(key)try:# 从wait_recommend中读取wait_cache = eval(hbu.get_table_row('wait_recommend','reco:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode()))except Exception as e:wait_cache = []# 如果为空则直接返回空if not wait_cache:return wait_cache# 如果wait_recommend中有数据if len(wait_cache) > 100:cache_redis = wait_cache[:100]# 前100个数据放入redispl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))# 100个后面的数据 , 在放回wait_recommendhbu.get_table_put('wait_recommend','reco:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),str(wait_cache[100:]).encode())else:# 清空wait_recommend数据hbu.get_table_put('wait_recommend','reco:{}'.format(temp.user_id).encode(),'channel:{}'.format(temp.channel_id).encode(),str([]).encode())# 所有不足100个数据 , 放入redispl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))res = cache_client.zrange(key, 0, temp.article_num - 1)最后 , 在 Supervisor 中配置 gRPC 实时推荐程序
【新闻推荐系统源代码之推荐业务逻辑控制中心】[program:online]environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/pythoncommand=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/abtest/routing.pydirectory=/root/toutiao_project/reco_sys/abtestuser=rootautorestart=trueredirect_stderr=truestdout_logfile=/root/logs/recommendsuper.logloglevel=infostopsignal=KILLstopasgroup=truekillasgroup=true