天作棋盘星作子,谁人能下? 地为琵琶路为弦,哪个敢弹!

位置服务项目不止一期,也断断续续写了一年,不知从哪里谈起,在此着重记录一些问题以及一些解决方案,重要的是在解决问题过程中的思考。

背景

需求

需求比较复杂,细节比较多,在这里简明扼要说一下:

  1. 用户A(spId)在中国地图上画了100个圆圈(locationId),想让我们实时告诉他这些圈内新来的人和他的位置信息,哪些是游客哪些是土著;
  2. A的100个圆圈会随时修改,增大,变小,或者再多加100个圈圈,我们要不停程序,秒级给他这些数据;
  3. A觉得这样推送的数据量太大,因为有的人本来就住这个圈里,所以对每个不同的圈,里面的人在 7天(可调)之内不重复;
  4. 用户B也想使用这个服务,在中国地图上画了100个圈,B的圈和A的圈可能重复可能不重复,需求是一样的;
  5. 用户C也想使用这个服务,不过他觉得画个圈不符合他的要求,他要在地图上随便画100个什么图形,然后给他他想要的数据;
  6. 用户D也想使用这个服务,不过他不想要知道这个图形里的人的细节,而是只想实时知道这个图形当下有多少人。

上面是位置服务二期的需求的简单总结,一期太简单不值一提,三期主要是数据的存储,以后再谈,这里只提二期。

最终,我们采用spark streaming来实现了上述需求,使用一个Spark Streaming程序支持多用户随时订阅随时修改,在yarn持续稳定运行超过一千个小时。

数据输入

用户的位置数据是实时得通过kafka传过来的,数据量大概为几十万条每秒,以一个电话号码msisdn表示一个用户,每条数据都代表一个用户当前所在的经纬度位置,以及一些省市相关的信息;
每个用户的数据,平均每隔半个多小时会上传一条,当然实际情况不是这样简单;

实现思路

整体架构如下图,我们借助kafka作为数据源的数据中转,redis作为数据缓存,mysql来实现实时订阅,flume来作为数据管道,Spring写的订阅服务以及基于SpringCloud的微服务来实现一些查询功能和监控功能,zabbix主要用户监控和报警,后期引入ES实现了数据多样化数据的存储;
wzfw2_1.png

整体记录意义不大,这里着重介绍遇到的一些问题:

1. spark Streaming程序在yarn上稳定运行的问题

这个问题的解决需要多方面的支持:

  1. 首先我们是基于Kerberos的,在hdp2.6上如果nn有主备的话,有一个大坑,导致超过两天就不行,需要hadoop打个patch或者spark打个patch,传送门:Spark踩坑之Streaming在Kerberos的hadoop中renew失败
  2. spark Streaming的稳定运行需要yarn的稳定,如果你依赖kafka的话,需要kafka稳定,我们遇到多次kafka找不到broker的错误,导致spark崩溃,所以要做好监控
  3. spark Streaming开发连接redis与mysql的时候也踩了一些坑,请看踩坑集锦;
  4. 如果你的日志是INFO模式的话,请改成WARN模式,过多的日志会让程序变得不稳定,当时忘记把日志模式改成WARN,导致程序跑了两个月的日志是几百G。。。
  5. 无论如何,由于程序依赖的组件较多,不敢说spark Streaming永远稳定,所以要做好程序的监控与失败后的拉起以及数据的恢复,我们这边写了一个通用的脚本,通过yarn的API实时监控Streaming程序的状态,然后发送zabbix记录与报警,如果发现挂了,就自动拉起,以防万一;

2. 判断一个用户是否在该区域的算法

圆形:只需要判断圆形中心点和用户所在的点的半径是否小于圆形的半径即可。
任意规则图形:根据PNPoly 算法实现。
都是现成的算法用Scala简单实现了而已。传送门

值得一提的是,如果全国的每条数据都对每个区域来判断,这个计算量太大了,我们在进行区域判断之前做了一个优化,优化方案如下:
订阅者订阅的每个区域都会有一个它所属的城市列表(如果在多个城市之间,则它属于多个城市),这样我们会通过实时扫描数据库,统一实时得去维护一个 HashMap,它的数据结构如下:
util.HashMap[Int, util.LinkedList[PositionSubData]],这里以城市id为key,以订阅景点的数据结构 PositionSubData 的列表为value,代表这个城市里的所有区域信息。
这样的话,每次有新数据过来,首先判断一下它在不在订阅到的城市,如果在该城市,再只要判断它所属的那几个景区就好了。

3. spark开发过程中的一些踩坑经验

网上有很多经验,这里总结网上没有的,或者不常发现的:

  1. 通过RDD的flatMap方法,可以实现map方法和filter方法效果的结合,也可以实现每一条输入数据,可以输出多条数据的情况;传送门
  2. 不是所有的方法都用foreachRDD就可以的,因为有监控的需求,必须要在transform方法中实现一些复杂的逻辑,我们知道spark是lazy的,所以在编写过程中(尤其与外部redis或者mysql进行交互)要注意很多细节;这里不详细展开,看我其它相关总结;
  3. 如果有多个action操作的话,一定要cache,否则前面会执行多次,不仅仅会浪费计算,更会导致结果是错误的(如果依赖redis或者mysql类似的外部数据的话);
  4. 很多时候会觉得,这个需求在流处理里是根本无法实现的感觉,但到最后发现没有什么需求是不能实现的,不过总会有比你的方案更优的方案;
  5. 前期组件的性能调研非常重要,预估需要的性能非常重要,写到后面大数据量上来了后才发现,组件扛不住,就坑了。。。

4. 返回区域实时人数的思路与总结

传送门

思考与未来

Spark Streaming基本上可以实现大部分的业务需求,但是一些时间窗口相关的需求难以搞定,需要借助外部存储机制来完善,接下来主要做以下几件事:

  1. 实时程序的部署监控报警的统一化;现在从kafka导数据到其它组件还没有做到自动化,需要人肉运维,并且flume的配置也是基于命令行的,这些都需要统一;
  2. 调研与应用structured Streaming;
  3. 根据需求,完成流式计算平台,将简单的逻辑傻瓜化,用户只需要输入简单的一些指令或者sql语句,就能实现程序的调试,部署与上线;
  4. 跟着Apache Beam的发展,逐渐将业务逻辑与处理引擎分开,这是大势所趋。