他们之前把数据导入ES是通过单机的程序导的,或者通过logstash从kafka往ES导,但当数据量很大的时候就会变得很低效,我这两天调研了一下把数据从hdfs直接通过spark导入ES的方法,当然,也适合spark Streaming程序;
这里指出版本号是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API变动比较频繁,因此最好参考官网文档。

连接ES的方法列举

  1. ES官网中给出了一个与spark连接的方法:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html,是通过RDD可以直接调用 saveToEs 方法实现的;
  2. 如果数据量不大的话,可以参考ES提供的RestFulAPI来实现,https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html
  3. 本文主要说明我使用的方法,通过 TransportClient 和 bulk 批处理操作来实现,这种方法比较适合数据量很大的情况,又可以灵活处理。
    https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
    https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk.html

使用TransportClient往ES批量导入的方法

样例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
* Author: wangxiaogang
* Date: 2017/7/11
* Email: Adamyuanyuan@gmail.com
* hdfs 中的数据根据格式写到ES中
*/
object HdfsToEs {
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
System.exit(1)
}
val hdfsInputPath: String = args(0)
println("hdfsInputPath: " + hdfsInputPath)
val conf = new SparkConf().setAppName("HdfsToEs")
val sc = new SparkContext(conf)
//插入相关,索引 类型 id相关 以args方式提供接口。
val esIndex: String = args(1)
val esType: String = args(2)
val partition: Int = args(3).toInt
val bulkNum: Int = args(4).toInt
val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
val startTime: Long = System.currentTimeMillis
println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
hdfsRdd.foreachPartition {
eachPa => {
// 生产环境
val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
.put("transport.type", "netty3").put("http.type", "netty3").build
val client: TransportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
var bulkRequest: BulkRequestBuilder = null
var flag = true
var lineNum = 0
for (eachLine <- eachPa) {
// 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
if (flag) {
bulkRequest = client.prepareBulk
flag = false
}
val strArray: Array[String] = eachLine.split("###")
if (strArray.length != 25) {
// 表示这行数据又问题,为了不影响整体,则跳过
println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
} else {
// LinkedHashMap让ES中的数据变得有序
val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
val id: String = strArray(0)
esDataMap.put("msisdn", id)
// 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
for (i <- 1 to 24) {
val locTimesListStr = strArray(i)
val esDataKey = "w" + (i - 1)
if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
esDataMap.put(esDataKey, "")
} else {
esDataMap.put(esDataKey, locTimesListStr)
}
}
bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
lineNum += 1
if (lineNum % bulkNum == 0) {
val endTime: Long = System.currentTimeMillis
println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
val bbq: BulkResponse = bulkRequest.execute.actionGet()
flag = true
if (bbq.hasFailures) {
println("bbq.hasFailures: " + bbq.toString)
bulkRequest.execute.actionGet
}
}
}
}
if (bulkRequest != null) {
bulkRequest.execute().actionGet()
}
client.close()
val endTime: Long = System.currentTimeMillis
println("ths time is: " + (endTime - startTime) / 1000 + "s ")
}
}
sc.stop()
}
}

踩坑说明:在编写代码中踩了如下坑:

  1. 依赖冲突的问题: ES5.2与Spark1.6有如下包会产生依赖: netty-all:io.nettycom.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
    解决方案:
    通过 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依赖原因,然后在pom.xml中增加所需的相关依赖的最高版本;

  2. 每个bulk的大小,根据网上的经验是10M-15M为宜,大概计算一下就好了;

  3. 后来在单机测试通过,但在集群模式中还是会出现 netty4的依赖冲突:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
    java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
    at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
    at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
    at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
    at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
    at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
    at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
    at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
    at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
    at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有一种解决方案我没有尝试成功,就是在pom中将冲突的依赖包exclusions掉,各位感兴趣可以尝试,成功了麻烦告知我一下。参考链接:https://www.elastic.co/blog/to-shade-or-not-to-shade, 使用 maven-shade-plugin 工具打包。

上个方法我尝试几次不成功后,使用了比较暴力的方法,直接将ES的netty参数由netty4改成了netty3,

1
.put("transport.type", "netty3").put("http.type", "netty3").build

好了,打包好之后,程序就可以完美运行了。

ES中创建索引

就算如果ES中是自动创建索引的,也希望你能手动创建索引和字段属性,因为默认的字段属性是Text,ES会自动对它进行分词相关的操作,如果ES中存的字符串你不想让它被分隔的话,就用keyword替代为Text类型,命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT /weekend-20170718
{
"settings" : {
"index" : {
"number_of_shards" : 5,
"number_of_replicas" : 1,
"refresh_interval" : "60s"
},
"index.routing.allocation.include.zone": "light"
},
"mappings": {
"offline": {
"properties": {
"msisdn": {
"type": "keyword"
},"w0": {
"type": "keyword"
} ...后面省略
}
}
}
}

创建好索引后检查一下:

1
GET /weekend-20170718/_mapping

集群中运行

这个比较简单,只需要注意以下几点就好了:

  1. 使用jdk1.8版本;
  2. 注意内存的申请,可能会出现跑了一段时间后,内存不够用导致程序退出的情况;
  3. 观测好ES集群的状态,一段时间后,ES机器的GC比较高
  4. 最好别一下子跑所有数据,分几批跑,这样就算出问题,只需要重跑那一部分就好了

数据:通过观察,导入的速度随着时间的增长呈下降趋势,整体来说,ES集群隔离的小集群共有五台物理机,共2.23亿条,751G的数据导入用了约4.5小时,平均速度为 45M/s, 1.38W条/s。

参考

感谢 技术交流群里 @叫我老陈行了吧,@岑玉海 等大神网友;

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk.html