http://www.cnblogs.com/jerrylead/archive/2012/08/13/2636115.html

摘要:Spark是继Hadoop之后的新一代大数据分布式处理框架,由UC Berkeley的Matei Zaharia主导开发。我只能说是神一样的人物造就的神器,详情请猛击http://www.spark-project.org/

Created 2012-05-09

Modified 2012-08-13

1 Scala安装

当前,Spark最新版本是0.5,由于我写这篇文档时,版本还是0.4,因此本文下面的所有描述基于0.4版本。

不过淘宝的达人已经尝试了0.5,并写了相关安装文档在此http://rdc.taobao.com/team/jm/archives/tag/spark。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

我使用的Spark的版本是0.4,只存在于github上,该版本使用的Scala版本是0.9.1.final。所以先到http://www.scala-lang.org/node/165下载scala-2.9.1.final.tar.gz。解压后放到本地 /opt 下面,在 /etc/profile 里添加

export SCALA_HOME=/opt/scala-2.9.1.final

export PATH=$SCALA_HOME/bin:$PATH

2 git安装
  
由于下载Spark和编译Spark需要git,因此先安装git,安装方法可以到Ubuntu软件中心直接装,也可以apt-get装。装好后需要到https://github.com 去注册一个帐号,我注册的是JerryLead,注册邮箱和密码,然后根据网站上的get-start提示生成RSA密码。

注意:如果本地之前存在rsa_id.pub,authorized_keys等,将其保存或着将原来的密码生成为dsa形式,这样git和原来的密码都不冲突。

3 Spark安装
  
首先下载最新的源代码

git clone git://github.com/mesos/spark.git
  
得到目录spark后,进入spark目录,进入conf子目录,将 spark-env.sh-template 重命名为spark-env.sh,并添加以下代码行:

export SCALA_HOME=/opt/scala-2.9.1.final
  
回到spark目录,开始编译,运行

$ sbt/sbt update compile
  
这条命令会联网下载很多jar,然后会对spark进行编译,编译完成会提示success

[success] Total time: 1228 s, completed May 9, 2012 3:42:11 PM
  
可以通过运行spark-shell来和spark进行交互。

也可以先运行测试用例./run <class> <params>

./run spark.examples.SparkLR local[2]
  
在本地启动两个线程运行线性回归。

./run spark.examples.SparkPi local
  
在本地启动运行Pi估计器。

更多的例子在examples/src/main/scala里面

3 Spark导出
  
在使用Spark之前,先将编译好的classes导出为jar比较好,可以

$ sbt/sbt assembly
  
将Spark及其依赖包导出为jar,放在

core/target/spark-core-assembly-0.4-SNAPSHOT.jar
  
可以将该jar添加到CLASSPATH里,开发Spark应用了。

一般在开发Spark应用时需要导入Spark一些类和一些隐式的转换,需要再程序开头加入

import spark.SparkContext

import SparkContext._
  
4 使用Spark交互模式
  
1. 运行./spark-shell.sh

2. scala> val data = Array(1, 2, 3, 4, 5) //产生data

data: Array[Int] = Array(1, 2, 3, 4, 5)

3. scala> val distData = sc.parallelize(data) //将data处理成RDD

distData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850 (显示出的类型为RDD)

4. scala> distData.reduce(_+_) //在RDD上进行运算,对data里面元素进行加和

12/05/10 09:36:20 INFO spark.SparkContext: Starting job...

5. 最后运行得到

12/05/10 09:36:20 INFO spark.SparkContext: Job finished in 0.076729174 s

res2: Int = 15
  
5 使用Spark处理Hadoop Datasets
  
Spark可以从HDFS/local FS/Amazon S3/Hypertable/HBase等创建分布式数据集。Spark支持text files,SequenceFiles和其他Hadoop InputFormat。

比如从HDFS上读取文本创建RDD

scala> val distFile = sc.textFile("hdfs://m120:9000/user/LijieXu/Demo/file01.txt")

12/05/10 09:49:01 INFO mapred.FileInputFormat: Total input paths to process : 1

distFile: spark.RDD[String] = spark.MappedRDD@59bf8a16
  
然后可以统计该文本的字符数,map负责处理文本每一行map(_size)得到每一行的字符数,多行组成一个List,reduce负责将List中的所有元素相加。

scala> distFile.map(_.size).reduce(_+_)

12/05/10 09:50:02 INFO spark.SparkContext: Job finished in 0.139610772 s

res3: Int = 79
  
textFile可以通过设置第二个参数来指定slice个数(slice与Hadoop里的split/block概念对应,一个task处理一个slice)。Spark默认将Hadoop上一个block对应为一个slice,但可以调大slice的个数,但不能比block的个数小,这就需要知道HDFS上一个文件的block数目,可以通过50070的dfs的jsp来查看。

对于SequenceFile,可以使用SparkContext的sequenceFile[K,V]方法生成RDD,其中K和V肯定要是SequenceFile存放时的类型了,也就是必须是Writable的子类。Spark也允许使用native types去读取,如sequenceFile[Int, String]。

对于复杂的SequenceFile,可以使用SparkContext.hadoopRDD方法去读取,该方法传入JobConf参数,包含InputFormat,key class,value class等,与Hadoop Java客户端读取方式一样。

6 分布式数据集操作
  
分布式数据集支持两种类型的操作:transformation和action。transformation的意思是从老数据集中生成新的数据集,action是在数据集上进行计算并将结果返回给driver program。每一个Spark应用包含一个driver program用来执行用户的main函数,比如,map就是一个transformation,将大数据集划分处理为小数据集,reduce是action,将数据集上内容进行聚合并返回给driver program。有个例外是reduceByKey应该属于transformation,返回的是分布式数据集。

需要注意的是,Spark的transformation是lazy的,transformation先将操作记录下来,直到接下来的action需要将处理结果返回给driver program的时候。

另一个特性是caching,如果用户指定cache一个数据集RDD,那么该数据集中的不同slice会按照partition被存放到相应不同节点的内存中,这样重用该数据集的时候,效率会高很多,尤其适用于迭代型和交互式的应用。如果cache的RDD丢失,那么重新使用transformation生成。

7 共享变量
  
与Hadoop的MapReduce不同的是,Spark允许共享变量,但只允许两种受限的变量:broadcast和accumulators。

Broadcast顾名思义是"广播",在每个节点上保持一份read-only的变量。比如,Hadoop的map task需要一部只读词典来处理文本时,由于不存在共享变量,每个task都需要加载一部词典。当然也可以使用DistributedCache来解决。在Spark中,通过broadcast,每个节点存放一部词典就够了,这样从task粒度上升到node粒度,节约的资源可想而知。Spark的broadcast路由算法也考虑到了通信开销。

通过使用SparkContext.broadcast(v)来实现对变量v的包装和共享。

scala> val broadcastVar = sc.broadcast(Array(1,2,3))

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Asked to add key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0)

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Estimated size for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) is 12

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Size estimation for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) took 0 ms

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: ensureFreeSpace((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d), 12) called with curBytes=12, maxBytes=339585269

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Adding key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0)

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Number of entries is now 2

broadcastVar: spark.broadcast.Broadcast[Array[Int]] = spark.Broadcast(a5c2a151-185d-4ea4-aad1-9ec642eebc5d)
  
创建broadcast变量后,可以通过.value来访问只读原始变量v。

scala> broadcastVar.value

res4: Array[Int] = Array(1, 2, 3)
  
另一种共享变量是Accumulators,顾名思义就是可以被"added"的变量,比如MapReduce中的counters就是不断累加的变量。Spark原生支持Int和Double类型的累加变量。

通过SparkContext.accumulator(v)来创建accumulator类型的变量,然后运行的task可以使用"+="操作符来进行累加。但是task不能读取到该变量,只有driver program能够读取(通过.value),这也是为了避免使用太多读写锁吧。

创建0的accumulator版本。

scala> val accum = sc.accumulator(0)

accum: spark.Accumulator[Int] = 0
  
对生成的RDD进行累加,这次不要reduce了。

scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)

12/05/10 11:05:48 INFO spark.SparkContext: Starting job...

scala> accum.value

res7: Int = 20
  
8 安装Mesos
  
Spark-0.4推荐的Mesos版本是1205738,不是最新版的Mesos,我想最新版应该也可以,这里暂且使用1205738。

首先下载Mesos

svn checkout –r 1205738 https://svn.apache.org/repos/asf/incubator/mesos/trunkmesos
  
得到mesos目录后,先安装编译所需的软件

apt-get install python2.6 python2.6-dev

很遗憾,虽然Ubuntu 11.04上有python 2.7,但webui(mesos的web界面)需要python 2.6,因此要装

apt-get install libcppunit-dev (安装cppunit)

确保g++版本大于4.1

如果缺automake,那么安装

apt-get install autoconf automake libtool
  
由于系统是Ubuntu 11.04 (GNU/Linux 2.6.38-8-generic x86_64)-natty,可以直接使用./configure.template.ubuntu-natty-64。但我使用的JDK是Sun的,因此修改./configure.template.ubuntu-natty-64里面-with-java-home为/opt/jdk1.6.0_27。

总体如下:

cp configure.template.ubuntu-natty-64 configure.template.ubuntu-my-natty-64

修改configure.template.ubuntu-my-natty-64得到如下内容

1 #!/bin/sh

2 export PYTHON=python2.7

3

4 $(dirname $0)/configure \

5 -with-python-headers=/usr/include/python2.7 \

6 -with-java-home=/opt/jdk1.6.0_27 \

7 -with-webui \

8 -with-included-zookeeper $@
  
编译mesos

root@master:/opt/mesos# ./configure.template.ubuntu-my-natty-64

完了之后

root@master:/opt/mesos# make
  
9 配置Mesos和Spark
  
先在slave1、slave2、slave3和master上安装mesos,我这里安装在/opt/mesos。

进入conf目录,修改deploy-env.sh,添加MESOS_HOME

\# This works with a newer version of hostname on Ubuntu.

#FULL_IP="hostname -all-ip-addresses"

#export LIBPROCESS_IP=\`echo $FULL_IP | sed 's/\([^ ]\*\) .\*/\1/'\`

export MESOS_HOME=/opt/mesos
  
修改mesos.conf,添加

\# mesos-slave with -help.

failover_timeout=1
  
进入/opt/spark,修改conf/spark-env.sh,添加

\# variables to set are:

\# - MESOS_HOME, to point to your Mesos installation

\# - SCALA_HOME, to point to your Scala installation

\# - SPARK_CLASSPATH, to add elements to Spark's classpath

\# - SPARK_JAVA_OPTS, to add JVM options

\# - SPARK_MEM, to change the amount of memory used per node (this should

\# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g).

\# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.

export SCALA_HOME=/opt/scala-2.9.1.final

export MESOS_HOME=/opt/mesos

export PATH=$PATH:/opt/jdk1.6.0_27/bin

export SPARK_MEM=10g (根据自己机器的内存大小设置,指示Spark可以使用的最大内存量)