问题背景

先说一下整个位置服务的背景,需求比较复杂,细节比较多,在这里简明扼要说一下:

  1. 用户A(spId)在中国地图上画了100个圆圈(locationId),想让我们实时告诉他这些圈内新来的人和他的位置信息,哪些是游客哪些是土著;
  2. A的100个圆圈会随时修改,增大,变小,或者再多加100个圈圈,我们要不停程序,秒级给他这些数据;
  3. A觉得这样推送的数据量太大,因为有的人本来就住这个圈里,所以对每个不同的圈,里面的人在 7天(可调)之内不重复;
  4. 用户B也想使用这个服务,在中国地图上画了100个圈,B的圈和A的圈可能重复可能不重复,需求是一样的;
  5. 用户C也想使用这个服务,不过他觉得画个圈不符合他的要求,他要在地图上随便画100个什么图形,然后给他他想要的数据;
  6. 用户D也想使用这个服务,不过他不想要知道这个图形里的人的细节,而是只想实时知道这个图形当下有多少人。

本文只关注返回区域实时人数的需求:

需求分为两大类,第一类是返回刚进入这个区域的人(返回明细),这个实现起来比较简单,因为它没有窗口的概念,只需要有数据过来,通过redis和mysql,经过一系列计算,判断它在景区并且是新来的人的话,直接推送到下游就好了;
第二个需求是实时返回这个区域的当前人数(返回人数)。这个听起来好像挺简单,比如当前上海迪士尼有56000个人,就给订阅者 56000这个数字就好了,但仔细思考一下就会发现较难实现,主要原因如下:

  1. 实时上传过来的数据并不是全量数据,如果以5s为一批,这批上传的数据可能只占全国总量2亿用户中的100W条,这样就导致无法在一批数据中就得到结果;
  2. 用户上传数据的频率也是不一致的,有的可能一分钟会发多条,有的可能四五十分钟才会上传一条,有的干脆好久都不会上传;
  3. 多个订阅者同时订阅多个景区,每个订阅者的数据都要是对的;
  4. 数据量太大,导致难以用时间窗口把它装住,数据还是得流进来,五秒内就得流出去;

一些简单可能的方案

为了满足上述需求,之前提出过一些方案:

  1. 每次处理一个小时的数据,因为大概一个小时以内,所有的用户基本都会上传一遍数据,这样通过去重就能知道上个小时该区域的人数的,但这是相当于批处理,没有实时性,是最差的选择了;
  2. 使用时间窗口,维护所有用户当前的位置信息,每隔一段时间计算,然后输出(类型与Apache Beam的定义模式),这个由于数据量太大,难以实现,因为有其它方案,所以也没有尝试;
  3. 使用外部的一个数据库,维护所有用户的当前位置信息,然后每次计算,输出;如果用户量在千万级别以下的话,觉得用Mysql就可以了,但是2亿的用户,Mysql难以胜任,如果用Hbase或redis保存所有人状态的话,的话,每次统计计算多个区域的人数信息需要的计算量也很大,后来也没有尝试;

基于双层redis的方案

方案思路

首先引入两个状态:用户这次是否在景区 thisTime 和 用户上次是否在景区 lastTime,基于这两个状态,可以得到如下表格

用户位置判断 本次是否在该区域 上次是否在该区域 处理逻辑
情况1 0 0 不处理
情况2 0 1 原子删除(一级总表和二级表)
情况3 1 0 原子增加(一级总表和二级表)
情况4 1 1 不处理

如上表格所示,每个人都维护这次与上次两个状态,如果这个人这次不在该区域,上次也不在该区域,说明什么都不需要做,如果这个人这次不在该区域,上次在,说明这个人离开了这个区域,则就要把它从该区域中剔除;同理可得,如果这次在,上次不在,就把该用户加到该区域。这样一来,我们就可以对每个区域的人了如指掌,而且在理论上,这是目前位置最精确的判断方法。

可行性

这套方案在这个问题上是可行的,虽然用户的位置轨迹类似于用户登陆网页的轨迹,但是又有极大的不同,因为每个人虽然都可离开这个网站,但是每个人都不会跳出地球!如果是网站的话,我只知道你访问了我这个网页所做的操作,但我不知道你什么时候没有访问我的网页,比如你从baidu.com跑到了google,baidu的数据里是不知道的,但你从这个区域跑到另外一个区域,我们的数据里是知道的,这就让该方案可行。

实现方案

如下图右侧是实现方案的简化版,我们使用了双层redis的方法实现了这个思路,第一层redis的数据结构为hash,订阅者id(spId)命名(当然实际名称多了些前缀),key是电话号码msisdn,value是它对应的区域id(locationId),第二层redis的数据结构为Set,以区域id(locationId)命名,里面直接是 msisdn。
wzfw2_2.png
判断本次是否在景区,直接根据位置算法进行计算就可以知道了,判断上次是否在景区,只需要循环去第一层redis中查有没有这个msisdn,如果有就返回它的locationId,如果没有,就说明该用户不在任何景区。如果要往该景区增加这个人,则需要同时往一级redis和二级对应的某一个redis中同时增加,同理,删除的话同时删除。值得注意的是,redis本身支持的事务并不支持回滚,所以还是要自己去控制它,通过返回值来进行原子删除和增加。上述操作的时间复杂度均为O(1),道理上讲还是很快的。然后每个set都代表一个景区,想知道它的人数,so easy,O(1)的时间复杂度就能搞定了,连具体是谁都能列出来。

当然具体的代码实现就繁杂很多了:

  1. 因为是不同的订阅者订阅好多景区,一个用户可能同时在多个订阅者订阅的好多个景区,这就要循环判断很多,会存在RDD的一条输入,输出是多条的情况;
  2. 除了上述四种情况,还有几种情况需要考虑到,比如这个人从这个区域跑到同一个订阅者订阅的另一个区域的话,就需要使用 redis set的move操作,同时改变一级hash表的值。如果跑到另一个订阅者的另一个区域的话,又会有它相应的逻辑。
    但大部分情况下还是以上四种逻辑,毕竟中国很大,订阅的区域间距还是比较远的。
  3. 为了防止各种原因导致redis中脏数据的产生,在存入的时候会加一个时间戳,如果时间过长,就会将其中的数据删除。
    实现逻辑大概代码流程图如下:wzfw2_3.png

为什么要以双层redis的方法呢?单层的不行嘛?由于我们的数据量非常大,如果仅仅是单层的hash的话,每次去计算某个景区的人数,又要把所有的数据全部遍历一遍,这个性能消耗是非常大的,这里采用空间换时间的方法,用极少的空间,节省了大量的时间。说白了,如果数据量小的话,用mysql之类的关系型数据库最好了,查的时候只要类似于select count(*) from spId where locationId = locationA之类的语句就可以了。使用Hbase能不能达到这个要求呢?个人没有进行过更深入的调研,浅薄地认为因为hbase的数据本身是存在hdfs的,如果是全量数据的话,它缓存命中的概率是比较低的,这样延迟会比较大,那么大的数据,难以秒级全部响应,所以就再也没有深入调研。

问题与提高

这个方案,理论上的结果是很精确的,但是它有一个小问题,就是在判断用户上次是否在该区域的时候,“在判断该用户上次是否在该区域之前,我们是不知道他上次是否在某个区域的”,这话听起来比较拗口,换一种方式表达:对于全国每一个用户的每一条数据,我们都要判断它上次是否在某个区域,这代表20W/s的redis读!当然我这边会做一些优化,先根据城市id进行过滤,仅仅读相关城市的数据,不过这个数据量也是比较大的,也到了1W/s的级别。

实际生产压测过程中,当全量数据上来后,由于redis的每次连接导致网络IO过多,响应会变慢,导致程序会逐渐产生延迟,也影响到了稳定运行的明细数据需求,所以存在问题!

由于我们的redis是与其它需求共用的,并且当时为了稳定使用的sentinel模式,也难以在有限的几天内就申请到cluster模式的redis集群,也难以使用pipeline来优化,还有几天就要上线了。所以提出了下面的基于数据过期的模糊方案来进行过渡。

基于数据过期的模糊方案

提出模糊方案的一个前提是,用户上传的数据本来就是模糊的,如果一个用户半个小时上传一次他的位置数据,我们无法得知他在这半个小时之间的位置,也无法得知他下半个小时在哪里

方案思路

用户每次进入某个区域后,就将它存入该区域集合中,不用考虑其它,区域中的数据是有时效的,超过30min(可调)后,区域中的数据就会被直接删除。

实现方案

对于每一个位置 locationId,我们使用redis的 sorted set数据结构,里面保存的值是msisdn,里面的score值是当前时间戳精确到分钟。它插入(zadd)的时间复杂度是O(log(N)),移除(ZREMRANGEBYSCORE)的时间复杂度是 O(log(N)+M)。每次有符合条件的数据,我们就直接插入到相应的locationId,然后异步地通过外部程序,对其中过期超过30min(可调)的数据进行批量删除,实际中这个删除是在spark Streaming的driver中实现每10s删除一次。

该方案虽然在插入时时间复杂度有所增加,但是极大得简化了redis的连接次数,ZREMRANGEBYSCORE操作也只是统一每隔10s执行一次,最终程序稳定运行在了生产。

问题与提高

该方案的优点是简单易行,问题是它并不像上一个方案一样是理论上精确的,但由于原始数据并不精确,所以有它存在的道理。
关于超时删除时间,这个时间是可以调优的,调优方案很简单,就一个变量而已,通过批处理的方式对区域的数据进行精确的统计,然后对比两者的结果,最终找出一个最靠谱的超时删除时间。
当然,区域的订阅面积越大,redis的响应就越慢,所以当订阅面积大到一定程度,redis肯定也是扛不住的。

总结与思考

  1. 判断区域人数是一个比较通用的需求,比如判断当前网站在线用户数,当前频道观众,但是具体实现起来的确觉得不那么简单,我目前能想到的算法就这两个,总觉得会有更好的方法,不同的问题背景产生不同的方法;
  2. 我们设计与实现双层redis方法的时候,没有考虑到redis网络IO过多导致延迟的问题,以后一定要评估好性能瓶颈,提前申请机器;
  3. 有时候追求完美不是好事,完美得向现实妥协,最优的算法并不是最适合的,要在满足需求的基础上,以最少的代码完成这个事情,增加鲁棒性;
  4. 之前对时间窗口相关的不太深入,接下来深入理解一下structured Streaming与Apache Beam的API的逻辑;