Hadoop是對(duì)大數(shù)據(jù)集進(jìn)行分布式計(jì)算的標(biāo)準(zhǔn)工具,這也是為什么當(dāng)你穿過(guò)機(jī)場(chǎng)時(shí)能看到”大數(shù)據(jù)(Big Data)”廣告的原因。它已經(jīng)成為大數(shù)據(jù)的操作系統(tǒng),提供了包括工具和技巧在內(nèi)的豐富生態(tài)系統(tǒng),允許使用相對(duì)便宜的商業(yè)硬件集群進(jìn)行超級(jí)計(jì)算機(jī)級(jí)別的計(jì)算。2003和2004年,兩個(gè)來(lái)自Google的觀點(diǎn)使Hadoop成為可能:一個(gè)分布式存儲(chǔ)框架(Google文件系統(tǒng)),在Hadoop中被實(shí)現(xiàn)為HDFS;一個(gè)分布式計(jì)算框架(MapReduce)。
這兩個(gè)觀點(diǎn)成為過(guò)去十年規(guī)模分析(scaling analytics)、大規(guī)模機(jī)器學(xué)習(xí)(machine learning),以及其他大數(shù)據(jù)應(yīng)用出現(xiàn)的主要推動(dòng)力!但是,從技術(shù)角度上講,十年是一段非常長(zhǎng)的時(shí)間,而且Hadoop還存在很多已知限制,尤其是MapReduce。對(duì)MapReduce編程明顯是困難的。對(duì)大多數(shù)分析,你都必須用很多步驟將Map和Reduce任務(wù)串接起來(lái)。這造成類SQL的計(jì)算或機(jī)器學(xué)習(xí)需要專門的系統(tǒng)來(lái)進(jìn)行。更糟的是,MapReduce要求每個(gè)步驟間的數(shù)據(jù)要序列化到磁盤,這意味著MapReduce作業(yè)的I/O成本很高,導(dǎo)致交互分析和迭代算法(iterative algorithms)開銷很大;而事實(shí)是,幾乎所有的最優(yōu)化和機(jī)器學(xué)習(xí)都是迭代的。
為了解決這些問(wèn)題,Hadoop一直在向一種更為通用的資源管理框架轉(zhuǎn)變,即YARN(Yet Another Resource Negotiator, 又一個(gè)資源協(xié)調(diào)者)。YARN實(shí)現(xiàn)了下一代的MapReduce,但同時(shí)也允許應(yīng)用利用分布式資源而不必采用MapReduce進(jìn)行計(jì)算。通過(guò)將集群管理一般化,研究轉(zhuǎn)到分布式計(jì)算的一般化上,來(lái)擴(kuò)展了MapReduce的初衷。
Spark是第一個(gè)脫胎于該轉(zhuǎn)變的快速、通用分布式計(jì)算范式,并且很快流行起來(lái)。Spark使用函數(shù)式編程范式擴(kuò)展了MapReduce模型以支持更多計(jì)算類型,可以涵蓋廣泛的工作流,這些工作流之前被實(shí)現(xiàn)為Hadoop之上的特殊系統(tǒng)。Spark使用內(nèi)存緩存來(lái)提升性能,因此進(jìn)行交互式分析也足夠快速(就如同使用Python解釋器,與集群進(jìn)行交互一樣)。緩存同時(shí)提升了迭代算法的性能,這使得Spark非常適合數(shù)據(jù)理論任務(wù),特別是機(jī)器學(xué)習(xí)。
本文中,我們將首先討論如何在本地機(jī)器上或者EC2的集群上設(shè)置Spark進(jìn)行簡(jiǎn)單分析。然后,我們?cè)谌腴T級(jí)水平探索Spark,了解Spark是什么以及它如何工作(希望可以激發(fā)更多探索)。最后兩節(jié)我們開始通過(guò)命令行與Spark進(jìn)行交互,然后演示如何用Python寫Spark應(yīng)用,并作為Spark作業(yè)提交到集群上。
設(shè)置Spark
在本機(jī)設(shè)置和運(yùn)行Spark非常簡(jiǎn)單。你只需要下載一個(gè)預(yù)構(gòu)建的包,只要你安裝了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上運(yùn)行Spark。確保java程序在PATH環(huán)境變量中,或者設(shè)置了JAVA_HOME環(huán)境變量。類似的,python也要在PATH中。
假設(shè)你已經(jīng)安裝了Java和Python:
訪問(wèn)Spark下載頁(yè)
選擇Spark最新發(fā)布版(本文寫作時(shí)是1.2.0),一個(gè)預(yù)構(gòu)建的Hadoop 2.4包,直接下載。
現(xiàn)在,如何繼續(xù)依賴于你的操作系統(tǒng),靠你自己去探索了。Windows用戶可以在評(píng)論區(qū)對(duì)如何設(shè)置的提示進(jìn)行評(píng)論。
一般,我的建議是按照下面的步驟(在POSIX操作系統(tǒng)上):
1.解壓Spark
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
2.將解壓目錄移動(dòng)到有效應(yīng)用程序目錄中(如Windows上的
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0
3.創(chuàng)建指向該Spark版本的符號(hào)鏈接到<spark目錄。這樣你可以簡(jiǎn)單地下載新/舊版本的Spark,然后修改鏈接來(lái)管理Spark版本,而不用更改路徑或環(huán)境變量。
~$ ln -s /srv/spark-1.2.0 /srv/spark
4.修改BASH配置,將Spark添加到PATH中,設(shè)置SPARK_HOME環(huán)境變量。這些小技巧在命令行上會(huì)幫到你。在Ubuntu上,只要編輯~/.bash_profile或~/.profile文件,將以下語(yǔ)句添加到文件中:
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH
5.source這些配置(或者重啟終端)之后,你就可以在本地運(yùn)行一個(gè)pyspark解釋器。執(zhí)行pyspark命令,你會(huì)看到以下結(jié)果:
~$ pyspark
Python 2.7.8 (default, Dec 2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ `_/
/__ / .__/_,_/_/ /_/_ version 1.2.0
/_/
Using Python version 2.7.8 (default, Dec 2 2014 12:45:58)
SparkContext available as sc.
>>>
現(xiàn)在Spark已經(jīng)安裝完畢,可以在本機(jī)以”單機(jī)模式“(standalone mode)使用。你可以在本機(jī)開發(fā)應(yīng)用并提交Spark作業(yè),這些作業(yè)將以多進(jìn)程/多線程模式運(yùn)行的,或者,配置該機(jī)器作為一個(gè)集群的客戶端(不推薦這樣做,因?yàn)樵赟park作業(yè)中,驅(qū)動(dòng)程序(driver)是個(gè)很重要的角色,并且應(yīng)該與集群的其他部分處于相同網(wǎng)絡(luò))??赡艹碎_發(fā),你在本機(jī)使用Spark做得最多的就是利用spark-ec2腳本來(lái)配置Amazon云上的一個(gè)EC2 Spark集群了。
簡(jiǎn)略Spark輸出
Spark(和PySpark)的執(zhí)行可以特別詳細(xì),很多INFO日志消息都會(huì)打印到屏幕。開發(fā)過(guò)程中,這些非常惱人,因?yàn)榭赡軄G失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設(shè)置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”擴(kuò)展名。
~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
編輯新文件,用WARN替換代碼中出現(xiàn)的INFO。你的log4j.properties文件類似:
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
現(xiàn)在運(yùn)行PySpark,輸出消息將會(huì)更簡(jiǎn)略!感謝@genomegeek在一次District Data Labs的研討會(huì)中指出這一點(diǎn)。
在Spark中使用IPython Notebook
當(dāng)搜索有用的Spark小技巧時(shí),我發(fā)現(xiàn)了一些文章提到在PySpark中配置IPython notebook。IPython notebook對(duì)數(shù)據(jù)科學(xué)家來(lái)說(shuō)是個(gè)交互地呈現(xiàn)科學(xué)和理論工作的必備工具,它集成了文本和Python代碼。對(duì)很多數(shù)據(jù)科學(xué)家,IPython notebook是他們的Python入門,并且使用非常廣泛,所以我想值得在本文中提及。
這里的大部分說(shuō)明都來(lái)改編自IPython notebook: 在PySpark中設(shè)置IPython。但是,我們將聚焦在本機(jī)以單機(jī)模式將IPtyon shell連接到PySpark,而不是在EC2集群。如果你想在一個(gè)集群上使用PySpark/IPython,查看并評(píng)論下文的說(shuō)明吧!
1.為Spark創(chuàng)建一個(gè)iPython notebook配置
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'
記住配置文件的位置,替換下文各步驟相應(yīng)的路徑:
2.創(chuàng)建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代碼:
import os
import sys
# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/srv/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
3.使用我們剛剛創(chuàng)建的配置來(lái)啟動(dòng)IPython notebook。
~$ ipython notebook --profile spark
4.在notebook中,你應(yīng)該能看到我們剛剛創(chuàng)建的變量。
print SPARK_HOME
5.在IPython notebook最上面,確保你添加了Spark context。
from pyspark import SparkContext
sc = SparkContext( 'local', 'pyspark')
6.使用IPython做個(gè)簡(jiǎn)單的計(jì)算來(lái)測(cè)試Spark context。
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n &amp; 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
# Compute the number of primes in the RDD
print nums.filter(isprime).count()
如果你能得到一個(gè)數(shù)字而且沒(méi)有錯(cuò)誤發(fā)生,那么你的context正確工作了!
編輯提示:上面配置了一個(gè)使用PySpark直接調(diào)用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接啟動(dòng)一個(gè)notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark
哪個(gè)方法好用取決于你使用PySpark和IPython的具體情景。前一個(gè)允許你更容易地使用IPython notebook連接到一個(gè)集群,因此是我喜歡的方法。
在EC2上使用Spark
在講授使用Hadoop進(jìn)行分布式計(jì)算時(shí),我發(fā)現(xiàn)很多可以通過(guò)在本地偽分布式節(jié)點(diǎn)(pseudo-distributed node)或以單節(jié)點(diǎn)模式(single-node mode)講授。但是為了了解真正發(fā)生了什么,就需要一個(gè)集群。當(dāng)數(shù)據(jù)變得龐大,這些書面講授的技能和真實(shí)計(jì)算需求間經(jīng)常出現(xiàn)隔膜。如果你肯在學(xué)習(xí)詳細(xì)使用Spark上花錢,我建議你設(shè)置一個(gè)快速Spark集群做做實(shí)驗(yàn)。 包含5個(gè)slave(和1個(gè)master)每周大概使用10小時(shí)的集群每月大概需要$45.18。
完整的討論可以在Spark文檔中找到:在EC2上運(yùn)行Spark在你決定購(gòu)買EC2集群前一定要通讀這篇文檔!我列出了一些關(guān)鍵點(diǎn):
通過(guò)AWS Console獲取AWS EC2 key對(duì)(訪問(wèn)key和密鑰key)。
將key對(duì)導(dǎo)出到你的環(huán)境中。在shell中敲出以下命令,或者將它們添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey
注意不同的工具使用不同的環(huán)境名稱,確保你用的是Spark腳本所使用的名稱。
3.啟動(dòng)集群:
~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>
4.SSH到集群來(lái)運(yùn)行Spark作業(yè)。
ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>
5.銷毀集群
ec2$ ./spark-ec2 destroy <cluster-name>.
這些腳本會(huì)自動(dòng)創(chuàng)建一個(gè)本地的HDFS集群來(lái)添加數(shù)據(jù),copy-dir命令可以同步代碼和數(shù)據(jù)到該集群。但是你最好使用S3來(lái)存儲(chǔ)數(shù)據(jù),創(chuàng)建使用s3://URI來(lái)加載數(shù)據(jù)的RDDs。
Spark是什么?
既然設(shè)置好了Spark,現(xiàn)在我們討論下Spark是什么。Spark是個(gè)通用的集群計(jì)算框架,通過(guò)將大量數(shù)據(jù)集計(jì)算任務(wù)分配到多臺(tái)計(jì)算機(jī)上,提供高效內(nèi)存計(jì)算。如果你熟悉Hadoop,那么你知道分布式計(jì)算框架要解決兩個(gè)問(wèn)題:如何分發(fā)數(shù)據(jù)和如何分發(fā)計(jì)算。Hadoop使用HDFS來(lái)解決分布式數(shù)據(jù)問(wèn)題,MapReduce計(jì)算范式提供有效的分布式計(jì)算。類似的,Spark擁有多種語(yǔ)言的函數(shù)式編程API,提供了除map和reduce之外更多的運(yùn)算符,這些操作是通過(guò)一個(gè)稱作彈性分布式數(shù)據(jù)集(resilient distributed datasets, RDDs)的分布式數(shù)據(jù)框架進(jìn)行的。
本質(zhì)上,RDD是種編程抽象,代表可以跨機(jī)器進(jìn)行分割的只讀對(duì)象集合。RDD可以從一個(gè)繼承結(jié)構(gòu)(lineage)重建(因此可以容錯(cuò)),通過(guò)并行操作訪問(wèn),可以讀寫HDFS或S3這樣的分布式存儲(chǔ),更重要的是,可以緩存到worker節(jié)點(diǎn)的內(nèi)存中進(jìn)行立即重用。由于RDD可以被緩存在內(nèi)存中,Spark對(duì)迭代應(yīng)用特別有效,因?yàn)檫@些應(yīng)用中,數(shù)據(jù)是在整個(gè)算法運(yùn)算過(guò)程中都可以被重用。大多數(shù)機(jī)器學(xué)習(xí)和最優(yōu)化算法都是迭代的,使得Spark對(duì)數(shù)據(jù)科學(xué)來(lái)說(shuō)是個(gè)非常有效的工具。另外,由于Spark非常快,可以通過(guò)類似Python REPL的命令行提示符交互式訪問(wèn)。
Spark庫(kù)本身包含很多應(yīng)用元素,這些元素可以用到大部分大數(shù)據(jù)應(yīng)用中,其中包括對(duì)大數(shù)據(jù)進(jìn)行類似SQL查詢的支持,機(jī)器學(xué)習(xí)和圖算法,甚至對(duì)實(shí)時(shí)流數(shù)據(jù)的支持。
核心組件如下:
Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動(dòng)作。其他Spark的庫(kù)都是構(gòu)建在RDD和Spark Core之上的。
Spark SQL:提供通過(guò)Apache Hive的SQL變體Hive查詢語(yǔ)言(HiveQL)與Spark進(jìn)行交互的API。每個(gè)數(shù)據(jù)庫(kù)表被當(dāng)做一個(gè)RDD,Spark SQL查詢被轉(zhuǎn)換為Spark操作。對(duì)熟悉Hive和HiveQL的人,Spark可以拿來(lái)就用。
Spark Streaming:允許對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行處理和控制。很多實(shí)時(shí)數(shù)據(jù)庫(kù)(如Apache Store)可以處理實(shí)時(shí)數(shù)據(jù)。Spark Streaming允許程序能夠像普通RDD一樣處理實(shí)時(shí)數(shù)據(jù)。
MLlib:一個(gè)常用機(jī)器學(xué)習(xí)算法庫(kù),算法被實(shí)現(xiàn)為對(duì)RDD的Spark操作。這個(gè)庫(kù)包含可擴(kuò)展的學(xué)習(xí)算法,比如分類、回歸等需要對(duì)大量數(shù)據(jù)集進(jìn)行迭代的操作。之前可選的大數(shù)據(jù)機(jī)器學(xué)習(xí)庫(kù)Mahout,將會(huì)轉(zhuǎn)到Spark,并在未來(lái)實(shí)現(xiàn)。
GraphX:控制圖、并行圖操作和計(jì)算的一組算法和工具的集合。GraphX擴(kuò)展了RDD API,包含控制圖、創(chuàng)建子圖、訪問(wèn)路徑上所有頂點(diǎn)的操作。
由于這些組件滿足了很多大數(shù)據(jù)需求,也滿足了很多數(shù)據(jù)科學(xué)任務(wù)的算法和計(jì)算上的需要,Spark快速流行起來(lái)。不僅如此,Spark也提供了使用Scala、Java和Python編寫的API;滿足了不同團(tuán)體的需求,允許更多數(shù)據(jù)科學(xué)家簡(jiǎn)便地采用Spark作為他們的大數(shù)據(jù)解決方案。
對(duì)Spark編程
編寫Spark應(yīng)用與之前實(shí)現(xiàn)在Hadoop上的其他數(shù)據(jù)流語(yǔ)言類似。代碼寫入一個(gè)惰性求值的驅(qū)動(dòng)程序(driver program)中,通過(guò)一個(gè)動(dòng)作(action),驅(qū)動(dòng)代碼被分發(fā)到集群上,由各個(gè)RDD分區(qū)上的worker來(lái)執(zhí)行。然后結(jié)果會(huì)被發(fā)送回驅(qū)動(dòng)程序進(jìn)行聚合或編譯。本質(zhì)上,驅(qū)動(dòng)程序創(chuàng)建一個(gè)或多個(gè)RDD,調(diào)用操作來(lái)轉(zhuǎn)換RDD,然后調(diào)用動(dòng)作處理被轉(zhuǎn)換后的RDD。
這些步驟大體如下:
定義一個(gè)或多個(gè)RDD,可以通過(guò)獲取存儲(chǔ)在磁盤上的數(shù)據(jù)(HDFS,Cassandra,HBase,Local Disk),并行化內(nèi)存中的某些集合,轉(zhuǎn)換(transform)一個(gè)已存在的RDD,或者,緩存或保存。
通過(guò)傳遞一個(gè)閉包(函數(shù))給RDD上的每個(gè)元素來(lái)調(diào)用RDD上的操作。Spark提供了除了Map和Reduce的80多種高級(jí)操作。
使用結(jié)果RDD的動(dòng)作(action)(如count、collect、save等)。動(dòng)作將會(huì)啟動(dòng)集群上的計(jì)算。
當(dāng)Spark在一個(gè)worker上運(yùn)行閉包時(shí),閉包中用到的所有變量都會(huì)被拷貝到節(jié)點(diǎn)上,但是由閉包的局部作用域來(lái)維護(hù)。Spark提供了兩種類型的共享變量,這些變量可以按照限定的方式被所有worker訪問(wèn)。廣播變量會(huì)被分發(fā)給所有worker,但是是只讀的。累加器這種變量,worker可以使用關(guān)聯(lián)操作來(lái)“加”,通常用作計(jì)數(shù)器。
Spark應(yīng)用本質(zhì)上通過(guò)轉(zhuǎn)換和動(dòng)作來(lái)控制RDD。后續(xù)文章將會(huì)深入討論,但是理解了這個(gè)就足以執(zhí)行下面的例子了。
Spark的執(zhí)行
簡(jiǎn)略描述下Spark的執(zhí)行。本質(zhì)上,Spark應(yīng)用作為獨(dú)立的進(jìn)程運(yùn)行,由驅(qū)動(dòng)程序中的SparkContext協(xié)調(diào)。這個(gè)context將會(huì)連接到一些集群管理者(如YARN),這些管理者分配系統(tǒng)資源。集群上的每個(gè)worker由執(zhí)行者(executor)管理,執(zhí)行者反過(guò)來(lái)由SparkContext管理。執(zhí)行者管理計(jì)算、存儲(chǔ),還有每臺(tái)機(jī)器上的緩存。
重點(diǎn)要記住的是應(yīng)用代碼由驅(qū)動(dòng)程序發(fā)送給執(zhí)行者,執(zhí)行者指定context和要運(yùn)行的任務(wù)。執(zhí)行者與驅(qū)動(dòng)程序通信進(jìn)行數(shù)據(jù)分享或者交互。驅(qū)動(dòng)程序是Spark作業(yè)的主要參與者,因此需要與集群處于相同的網(wǎng)絡(luò)。這與Hadoop代碼不同,Hadoop中你可以在任意位置提交作業(yè)給JobTracker,JobTracker處理集群上的執(zhí)行。
與Spark交互
使用Spark最簡(jiǎn)單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。
~$ pyspark
[… snip …]
>>>
PySpark將會(huì)自動(dòng)使用本地Spark配置創(chuàng)建一個(gè)SparkContext。你可以通過(guò)sc變量來(lái)訪問(wèn)它。我們來(lái)創(chuàng)建第一個(gè)RDD。
>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法將莎士比亞全部作品加載到一個(gè)RDD命名文本。如果查看了RDD,你就可以看出它是個(gè)MappedRDD,文件路徑是相對(duì)于當(dāng)前工作目錄的一個(gè)相對(duì)路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。我們轉(zhuǎn)換下這個(gè)RDD,來(lái)進(jìn)行分布式計(jì)算的“hello world”:“字?jǐn)?shù)統(tǒng)計(jì)”。
>>> from operator import add
>>> def tokenize(text):
... return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43
我們首先導(dǎo)入了add操作符,它是個(gè)命名函數(shù),可以作為加法的閉包來(lái)使用。我們稍后再使用這個(gè)函數(shù)。首先我們要做的是把文本拆分為單詞。我們創(chuàng)建了一個(gè)tokenize函數(shù),參數(shù)是文本片段,返回根據(jù)空格拆分的單詞列表。然后我們通過(guò)給flatMap操作符傳遞tokenize閉包對(duì)textRDD進(jìn)行變換創(chuàng)建了一個(gè)wordsRDD。你會(huì)發(fā)現(xiàn),words是個(gè)PythonRDD,但是執(zhí)行本應(yīng)該立即進(jìn)行。顯然,我們還沒(méi)有把整個(gè)莎士比亞數(shù)據(jù)集拆分為單詞列表。
如果你曾使用MapReduce做過(guò)Hadoop版的“字?jǐn)?shù)統(tǒng)計(jì)”,你應(yīng)該知道下一步是將每個(gè)單詞映射到一個(gè)鍵值對(duì),其中鍵是單詞,值是1,然后使用reducer計(jì)算每個(gè)鍵的1總數(shù)。
首先,我們map一下。
>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
| shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
| shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一個(gè)匿名函數(shù)(用了Python中的lambda關(guān)鍵字)而不是命名函數(shù)。這行代碼將會(huì)把lambda映射到每個(gè)單詞。因此,每個(gè)x都是一個(gè)單詞,每個(gè)單詞都會(huì)被匿名閉包轉(zhuǎn)換為元組(word, 1)。為了查看轉(zhuǎn)換關(guān)系,我們使用toDebugString方法來(lái)查看PipelinedRDD是怎么被轉(zhuǎn)換的。可以使用reduceByKey動(dòng)作進(jìn)行字?jǐn)?shù)統(tǒng)計(jì),然后把統(tǒng)計(jì)結(jié)果寫到磁盤。
>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")
一旦我們最終調(diào)用了saveAsTextFile動(dòng)作,這個(gè)分布式作業(yè)就開始執(zhí)行了,在作業(yè)“跨集群地”(或者你本機(jī)的很多進(jìn)程)運(yùn)行時(shí),你應(yīng)該可以看到很多INFO語(yǔ)句。如果退出解釋器,你可以看到當(dāng)前工作目錄下有個(gè)“wc”目錄。
$ ls wc/
_SUCCESS part-00000 part-00001
每個(gè)part文件都代表你本機(jī)上的進(jìn)程計(jì)算得到的被保持到磁盤上的最終RDD。如果對(duì)一個(gè)part文件進(jìn)行head命令,你應(yīng)該能看到字?jǐn)?shù)統(tǒng)計(jì)元組。
$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(, 1)
(, 1)
(, 1)
(, 1)
(, 1)
(u'fleeces', 1)
(, 1)
注意這些鍵沒(méi)有像Hadoop一樣被排序(因?yàn)镠adoop中Map和Reduce任務(wù)中有個(gè)必要的打亂和排序階段)。但是,能保證每個(gè)單詞在所有文件中只出現(xiàn)一次,因?yàn)槟闶褂昧藃educeByKey操作符。你還可以使用sort操作符確保在寫入到磁盤之前所有的鍵都被排過(guò)序。
編寫一個(gè)Spark應(yīng)用
編寫Spark應(yīng)用與通過(guò)交互式控制臺(tái)使用Spark類似。API是相同的。首先,你需要訪問(wèn)<SparkContext,它已經(jīng)由<pyspark自動(dòng)加載好了。
使用Spark編寫Spark應(yīng)用的一個(gè)基本模板如下:
## Spark Application - execute with spark-submit
## Imports
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "My Spark Application"
## Closure Functions
## Main functionality
def main(sc):
pass
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
這個(gè)模板列出了一個(gè)Spark應(yīng)用所需的東西:導(dǎo)入Python庫(kù),模塊常量,用于調(diào)試和Spark UI的可識(shí)別的應(yīng)用名稱,還有作為驅(qū)動(dòng)程序運(yùn)行的一些主要分析方法學(xué)。在ifmain中,我們創(chuàng)建了SparkContext,使用了配置好的context執(zhí)行main。我們可以簡(jiǎn)單地導(dǎo)入驅(qū)動(dòng)代碼到pyspark而不用執(zhí)行。注意這里Spark配置通過(guò)setMaster方法被硬編碼到SparkConf,一般你應(yīng)該允許這個(gè)值通過(guò)命令行來(lái)設(shè)置,所以你能看到這行做了占位符注釋。
使用<sc.stop()或<sys.exit(0)來(lái)關(guān)閉或退出程序。
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
"""
Parses a row and returns a named tuple.
"""
row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])
def split(line):
"""
Operator function for splitting a line with csv module
"""
reader = csv.reader(StringIO(line))
return reader.next()
def plot(delays):
"""
Show a bar chart of the total delay per airline
"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))
fig, axe = plt.subplots()
bars = axe.barh(index, minutes)
# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))
# minimize chart junk
plt.grid(axis = 'x', color ='white', linestyle='-')
plt.title('Total Minutes Delayed per Airline')
plt.show()
## Main functionality
def main(sc):
# Load the airlines lookup dictionary
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)
# Read the CSV Data into an RDD
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
# Map the total delay to the airline (joined using the broadcast value)
delays = flights.map(lambda f: (airline_lookup.value[f.airline],
add(f.dep_delay, f.arv_delay)))
# Reduce the total delay for the month to the airline
delays = delays.reduceByKey(add).collect()
delays = sorted(delays, key=itemgetter(1))
# Provide output from the driver
for d in delays:
print "%0.0f minutes delayedt%s" % (d[1], d[0])
# Show a bar chart of the delays
plot(delays)
if __name__ == "__main__":
# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)
# Execute Main functionality
main(sc)
使用<spark-submit命令來(lái)運(yùn)行這段代碼(假設(shè)你已有ontime目錄,目錄中有兩個(gè)CSV文件):
~$ spark-submit app.py
這個(gè)Spark作業(yè)使用本機(jī)作為master,并搜索app.py同目錄下的ontime目錄下的2個(gè)CSV文件。最終結(jié)果顯示,4月的總延誤時(shí)間(單位分鐘),既有早點(diǎn)的(如果你從美國(guó)大陸飛往夏威夷或者阿拉斯加),但對(duì)大部分大型航空公司都是延誤的。注意,我們?cè)赼pp.py中使用matplotlib直接將結(jié)果可視化出來(lái)了:
這段代碼做了什么呢?我們特別注意下與Spark最直接相關(guān)的main函數(shù)。首先,我們加載CSV文件到RDD,然后把split函數(shù)映射給它。split函數(shù)使用csv模塊解析文本的每一行,并返回代表每行的元組。最后,我們將collect動(dòng)作傳給RDD,這個(gè)動(dòng)作把數(shù)據(jù)以Python列表的形式從RDD傳回驅(qū)動(dòng)程序。本例中,airlines.csv是個(gè)小型的跳轉(zhuǎn)表(jump table),可以將航空公司代碼與全名對(duì)應(yīng)起來(lái)。我們將轉(zhuǎn)移表存儲(chǔ)為Python字典,然后使用sc.broadcast廣播給集群上的每個(gè)節(jié)點(diǎn)。
接著,main函數(shù)加載了數(shù)據(jù)量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數(shù)映射給CSV行,此函數(shù)會(huì)把日期和時(shí)間轉(zhuǎn)成Python的日期和時(shí)間,并對(duì)浮點(diǎn)數(shù)進(jìn)行合適的類型轉(zhuǎn)換。每行作為一個(gè)NamedTuple保存,名為Flight,以便高效簡(jiǎn)便地使用。
有了Flight對(duì)象的RDD,我們映射一個(gè)匿名函數(shù),這個(gè)函數(shù)將RDD轉(zhuǎn)換為一些列的鍵值對(duì),其中鍵是航空公司的名字,值是到達(dá)和出發(fā)的延誤時(shí)間總和。使用reduceByKey動(dòng)作和add操作符可以得到每個(gè)航空公司的延誤時(shí)間總和,然后RDD被傳遞給驅(qū)動(dòng)程序(數(shù)據(jù)中航空公司的數(shù)目相對(duì)較少)。最終延誤時(shí)間按照升序排列,輸出打印到了控制臺(tái),并且使用matplotlib進(jìn)行了可視化。
這個(gè)例子稍長(zhǎng),但是希望能演示出集群和驅(qū)動(dòng)程序之間的相互作用(發(fā)送數(shù)據(jù)進(jìn)行分析,結(jié)果取回給驅(qū)動(dòng)程序),以及Python代碼在Spark應(yīng)用中的角色。
結(jié)論
盡管算不上一個(gè)完整的Spark入門,我們希望你能更好地了解Spark是什么,如何使用進(jìn)行快速、內(nèi)存分布式計(jì)算。至少,你應(yīng)該能將Spark運(yùn)行起來(lái),并開始在本機(jī)或Amazon EC2上探索數(shù)據(jù)。你應(yīng)該可以配置好iPython notebook來(lái)運(yùn)行Spark。
Spark不能解決分布式存儲(chǔ)問(wèn)題(通常Spark從HDFS中獲取數(shù)據(jù)),但是它為分布式計(jì)算提供了豐富的函數(shù)式編程API。這個(gè)框架建立在伸縮分布式數(shù)據(jù)集(RDD)之上。RDD是種編程抽象,代表被分區(qū)的對(duì)象集合,允許進(jìn)行分布式操作。RDD有容錯(cuò)能力(可伸縮的部分),更重要的時(shí),可以存儲(chǔ)到節(jié)點(diǎn)上的worker內(nèi)存里進(jìn)行立即重用。內(nèi)存存儲(chǔ)提供了快速和簡(jiǎn)單表示的迭代算法,以及實(shí)時(shí)交互分析。
由于Spark庫(kù)提供了Python、Scale、Java編寫的API,以及內(nèi)建的機(jī)器學(xué)習(xí)、流數(shù)據(jù)、圖算法、類SQL查詢等模塊;Spark迅速成為當(dāng)今最重要的分布式計(jì)算框架之一。與YARN結(jié)合,Spark提供了增量,而不是替代已存在的Hadoop集群,它將成為未來(lái)大數(shù)據(jù)重要的一部分,為數(shù)據(jù)科學(xué)探索鋪設(shè)了一條康莊大道。
有用的鏈接
希望你喜歡這篇博文!寫作并不是憑空而來(lái)的,以下是一些曾幫助我寫作的有用鏈接;查看這些鏈接,可能對(duì)進(jìn)一步探索Spark有幫助。注意,有些圖書鏈接是推廣鏈接,意味著如果你點(diǎn)擊并購(gòu)買了這些圖書,你將會(huì)支持District Data Labs!
這篇更多是篇入門文章,而不是District Data Labs的典型文章,有些與此入門相關(guān)的數(shù)據(jù)和代碼你可以在這里找到:
Github上的代碼
莎士比亞數(shù)據(jù)集
航空公司時(shí)間數(shù)據(jù)集改編自美國(guó)交通統(tǒng)計(jì)局(US DOT)
Spark論文
Spark與Hadoop一樣,有一些基礎(chǔ)論文,我認(rèn)為那些需要對(duì)大數(shù)據(jù)集進(jìn)行分布式計(jì)算的嚴(yán)謹(jǐn)數(shù)據(jù)科學(xué)家一定要讀。首先是HotOS(“操作系統(tǒng)熱門話題”的簡(jiǎn)寫)的一篇研討會(huì)論文,簡(jiǎn)單易懂地描述了Spark。第二個(gè)是偏理論的論文,具體描述了RDD。
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: cluster computing with working sets,” in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,” in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.
Spark圖書
學(xué)習(xí)Spark
使用Spark進(jìn)行高級(jí)分析
有用的博文
設(shè)置IPython以使用PySpark
Databricks的Spark參考應(yīng)用程序
在EC2上運(yùn)行Spark
在Amazon Elastic MapReduce上運(yùn)行Spark和SparkSQL
更多信息請(qǐng)查看IT技術(shù)專欄