Hadoop 家族有多个组件,本文将深入探讨演示各功能的具体代码示例。如果您在自己的 PC 上尝试这些示例,是不会造成系统混乱的。
关于大数据,有很多令人兴奋的事情,但使用它也带来了很多困惑。本文将提供一个可行的大数据定义,然后完成一系列示例,让您可以对在大数据领域领先的开源技术 Hadoop 的一些功能有直接的了解。具体来说,我们集中讨论以下几个问题。
什么是大数据、Hadoop、Sqoop、Hive 和 Pig,为什么这个领域有这么多让人兴奋的事情?
Hadoop 与 IBM DB2 及 Informix 有何关系?这些技术可以配合使用吗?
我如何开始使用大数据?有哪些简单的示例可以在单台 PC 上运行?
对于超级的急性子,如果您已经可以定义 Hadoop 并希望马上使用代码样例,那么请执行以下操作。
启动您的 Informix 或 DB2 实例。
从 Cloudera 网站下载 VMWare 映像,并将虚拟机 RAM 设置增加至 1.5 GB。
直接跳到包含代码样例的部分。
VMWare 映像中已内置了一个 MySQL 实例。如果您在没有网络连接的情况下做练习,请使用 MySQL 示例。
对于所有其他人,请继续阅读...
什么是大数据?
大数据的数量庞大、捕获速度极快,且可以是结构化的也可以是非结构化的,或者是上述特点的某种组合。这些因素使得大数据难以使用传统的方法进行捕获、挖掘和管理。在该领域有如此多的炒作,以至于仅仅是大数据的定义就有可能是长期的争论战。
使用大数据技术并不局限于庞大的数量。本文中的示例使用小样本阐述该技术的功能。截至 2012 年,大 集群均在 100 PB 的范围内。
大数据既可以是结构化的,也可以是非结构化的。传统的关系型(如 Informix 和 DB2)为结构化数据提供了行之有效的解决方案。它们还可以通过可扩展性来管理非结构化数据。Hadoop 技术为处理包含结构化和非结构化数据的海量数据存储带来了更方便的新编程技术。
为什么有这么多令人兴奋的事情?
有很多因素推动了围绕大数据的炒作,具体包括以下因素。
在商用硬件上结合计算和存储:其结果是以低成本实现惊人的速度。
性价比:Hadoop 大数据技术提供了显著的成本节约(系数大约为 10),以及显著的性能改进(同样,系数为 10)。您的成就可能会有所不同。如果现有的技术输得如此一塌糊涂,那么就值得研究 Hadoop 是否可以补充或取代您当前架构的某些方面。
线性可扩展性:每一个并行技术都声称可以垂直扩展。Hadoop 具有真正的可扩展性,因为最新的版本将节点数量的限制扩展至 4,000 个以上。
可完全访问非结构化数据:具备良好的并行编程模型 MapReduce 的高度可扩展的数据存储在本行业中成为挑战已经有一段时间了。Hadoop 的编程模型并不能解决所有问题,但它对于许多任务来说都是一个强大的解决方案。
术语突破性技术 经常被严重过度使用,但在这种情况下,它可能是适当的。
什么是 Hadoop?
以下是 Hadoop 的几种定义,每种定义都针对的是企业内的不同受众:
对于高管:Hadoop 是 Apache 的一个开源软件项目,目的是从令人难以置信的数量/速度/多样性等有关组织的数据中获取价值。使用数据,而不是扔掉大部分数据。
对于技术管理人员:一个开源软件套件,挖掘有关您的企业的结构化和非结构化大数据。Hadoop 集成您现有的商业智能生态系统。
法律:一个由多个供应商打包和支持的开源软件套件。
工程:大规模并行、无共享、基于 Java 的 map-reduce 执行环境。打算使用数百台到数千台计算机处理相同的问题,具有内置的故障恢复能力。Hadoop 生态系统中的项目提供了数据加载、更高层次的语言、自动化的云部署,以及其他功能。
安全性:由 Kerberos 保护的软件套件。
Hadoop 的组件有哪些?
Apache Hadoop 项目有两个核心组件,被称为 Hadoop 分布式文件系统 (Hadoop Distributed File System, HDFS) 的文件存储,以及被称为 MapReduce 的编程框架。有一些支持项目充分利用了 HDFS 和 MapReduce。本文将提供一个概要,并鼓励您参阅 OReily 的书 “Hadoop The Definitive Guide”(第 3 版)了解更多详细信息。
下面的定义是为了提供足够的背景,让您可以使用随后的代码示例。本文的真正意义在于让您开始该技术的实践经验。这是一篇 “指南” 性质的文章,而不是一篇 “内容介绍” 或 “讨论” 类型的文章。
HDFS:如果您希望有 4000 多台电脑处理您的数据,那么最好将您的数据分发给 4000 多台电脑。HDFS 可以帮助您做到这一点。HDFS 有几个可以移动的部件。Datanodes 存储数据,Namenode 跟踪存储的位置。还有其他部件,但这些已经足以使您开始了。
MapReduce:这是一个面向 Hadoop 的编程模型。有两个阶段,毫不意外,它们分别被称为 Map 和 Reduce。如果希望给您的朋友留下深刻的印象,那么告诉他们,Map 和 Reduce 阶段之间有一个随机排序。JobTracker 管理您的 MapReduce 作业的 4000 多个组件。TaskTracker 从 JobTracker 接受订单。如果您喜欢 Java,那么用 Java 编写代码。如果您喜欢 SQL 或 Java 以外的其他语言,您的运气仍然不错,您可以使用一个名为 Hadoop Streaming 的实用程序。
Hadoop Streaming:一个实用程序,在任何语言(C、Perl 和 Python、C++、Bash 等)中支持 MapReduce 代码。示例包括一个 Python 映射程序和一个 AWK 缩减程序。
Hive 和 Hue:如果您喜欢 SQL,您会很高兴听到您可以编写 SQL,并使用 Hive 将其转换为一个 MapReduce 作业。不,您不会得到一个完整的 ANSI-SQL 环境,但您的确得到了 4000 个注释和多 PB 级的可扩展性。Hue 为您提供了一个基于浏览器的图形界面,可以完成您的 Hive 工作。
Pig: 一个执行 MapReduce 编码的更高层次的编程环境。Pig 语言被称为 Pig Latin。您可能会发现其命名约定有点不合常规,但是您会得到令人难以置信的性价比和高可用性。
Sqoop:在 Hadoop 和您最喜爱的关系数据库之间提供双向数据传输。
Oozie:管理 Hadoop 工作流。这并不能取代您的调度程序或 BPM 工具,但它在您的 Hadoop 作业中提供 if-then-else 分支和控制。
HBase:一个超级可扩展的键值存储。它的工作原理非常像持久的散列映射(对于 Python 爱好者,可以认为是词典)。尽管其名称是 HBase,但它并不是一个关系数据库。
FlumeNG:一个实时的加载程序,用来将数据流式传输到 Hadoop 中。它将数据存储在 HDFS 和 HBase 中。您会希望从 FlumeNG 开始,因为它对原始的水槽有所改进。
Whirr:面向 Hadoop 的云配置。您可以在短短几分钟内使用一个很短的配置文件启动一个集群。
Mahout:面向 Hadoop 的机器学习。用于预测分析和其他高级分析。
Fuse:让 HDFS 系统看起来就像一个普通的文件系统,所以您可以对 HDFS 数据使用 ls、rm、cd 和其他命令。
Zookeeper:用于管理集群的同步性。您不需要为 Zookeeper 做很多事情,但它在为您努力工作。如果您认为自己需要编写一个使用 Zookeeper 的程序,您要么非常非常聪明,并且可能是 Apache 项目的一个委员会,要么终将会有过得非常糟糕的一天。
图 1 显示了 Hadoop 的关键部分。
图 1. Hadoop 架构
HDFS(底层)位于商品硬件的集群之上。简单的机架式, 每台都配置 2 个十六核 CPU、6 到 12 个磁盘,以及 32G RAM。在一个 map-reduce 作业中,映射程序层以极高的速度从磁盘读取。映射程序向缩减程序发出已进行排序和提供的键值对,然后,缩减程序层汇总键值对。不,您不必汇总,实际上,您 的 map-reduce 作业中可以只包含映射程序。当您学习 Python-awk 示例时,这应该会变得更容易理解。
Hadoop 如何与我的 Informix 或 DB2 基础架构集成?
Hadoop 利用 Sqoop 可以与 Informix 和 DB2 数据库很好地集成。Sqoop 是领先的开源实现,用于在 Hadoop 和关系数据库之间移动数据。它使用 JDBC 来读取和写入 Informix、DB2、MySQL、Oracle 和其他数据源。有几个数据库都有优化的适配器,包括 Netezza 和 DB2。这些示例都是特定于 Sqoop 的示例。
入门:如何运行简单的 Hadoop、Hive、Pig、Oozie 和 Sqoop 示例
您已经完成了简介和定义,现在是时候来点好东西了。要继续下去,您就需要从 Cloudera 的 Web 站点下载 VMware、虚拟盒或其他映像,并开始执行 MapReduce!虚拟映像假设您有一台 64 位的计算机和流行的虚拟化环境之一。大多数虚拟化环境都提供免费下载。当您尝试启动一个 64 位的虚拟映像时,您可能会看到有关 BIOS 设置的投诉。图 2 显示了在 BIOS 中所要求的更改,在本例中使用的是 Thinkpad。进行更改时,请务必小心。更改 BIOS 设置后,一些企业安全软件包在系统重新启动之前将需要一个密码。
图 2. 一个 64 位虚拟来宾的 BIOS 设置
这里使用的大数据其实相当小。目的是不要让您的笔记本电脑因为持续使用一个巨大的文件而着火,而只是向您显示感兴趣的数据的来源,以及回答有意义的问题的 map-reduce 作业。
下载 Hadoop 虚拟映像
我们强烈建议您使用 Cloudera 映像运行这些示例。Hadoop 是一个解决问题的技术。Cloudera 映像包使您能够专注于大数据的问题。但是,如果您决定自己组装所有部件,那么 Hadoop 就会成为一个问题,而不是解决方案。
下载一个映像。CDH4 映像是此处可提供的最新产品:CDH4 映像。上一个版本 CDH3 在此处提供:CDH3 映像。
您可以自己选择虚拟化技术。可以从 VMware 和其他网站下载一个免费的虚拟化环境。例如,访问 vmware.com 并下载 vmware-player。您的笔记本电脑可能在运行 Windows,所以您需要下载 vmware-player for windows。本文中的示例将使用 VMWare,并使用 “tar”(而不是 “winzip” 或等效的软件)来运行 Ubuntu Linux。
下载之后,untar/unzip 如下所示:tar -zxvf cloudera-demo-vm-cdh4.0.0-vmware.tar.gz。
或者,如果您使用 CDH3,那么使用以下命令:tar -zxvf cloudera-demo-vm-cdh3u4-vmware.tar.gz。
tar 文件一般可以解压缩。解压缩后,您可以启动映像,如下所示:
vmplayer cloudera-demo-vm.vmx。现在,您将看到的屏幕类似于图 3 所示。
图 3. Cloudera 虚拟映像
vmplayer 命令一矢中的,并启动虚拟机。如果您使用的是 CDH3,那么您需要关闭机器并更改内存设置。使用屏幕中下方的时钟旁边的电源按钮图标关闭虚拟机电源。然后,您可以编辑虚拟机设置的访问权限。
对于 CDH3,下一个步骤是在虚拟映像中增加更多的 RAM。大多数设置只能在虚拟机的电源关闭时进行更改。图 4 显示了如何访问设置和将所分配的 RAM 增加至超过 2GB。
图 4. 对虚拟机增加 RAM
如图 5 所示,您可以将网络设置更改为桥接。使用此设置,虚拟机将会获得自己的 IP 地址。如果这会在您的网络上引起问题,那么您可以选择使用网络地址转换 (NAT)。您将要使用网络来连接到数据库。
图 5. 将网络设置更改为桥接
您受限于主机系统上的 RAM,所以不要尝试让分配的 RAM 多于您的计算机上现有的 RAM。如果这样做了,计算机运行速度会很慢。
现在,正是您一直期待的时刻,去启动虚拟机吧。用户 cloudera 是在启动时自动登录。如果您需要它,Cloudera 的密码是:cloudera。
安装 Informix 和 DB2
您将需要使用一个数据库。如果您还没有数据库,那么您可以在这里下载 ,或 。
安装 DB2 的另一种方法是下载已经在 SuSE Linux 操作系统上安装了 DB2 的 VMWare 映像。以 root 身份登录,密码是:password。
切换到 db2inst1 userid。以 root 身份工作,就像开车没有系安全带一样。请和您本地友好的 DBA 谈谈有关让数据库运行的问题。本文将不再赘述。不要尝试在 Cloudera 虚拟映像内安装数据库,因为没有足够的可用磁盘空间。
虚拟机将使用 Sqoop 连接到数据库,这需要一个 JDBC 驱动程序。在虚拟映像中,您将需要一个面向您的数据库的 JDBC 驱动程序。您可以安装这里的 Informix 驱动程序。
此处提供了 DB2 驱动程序:http://www..com/services/forms/preLogin.do?source=swg-idsdjs 或 http://www-01..com/support/docview.wss?rs=4020&uid=swg21385217。
Informix JDBC 驱动程序(记住,只是虚拟映像中的驱动程序,不是数据库中的驱动程序)安装如清单 1 所示。
清单 1. Informix JDBC 驱动程序安装
tar -xvf ../JDBC.3.70.JC5DE.tarfollowed by java -jar setup.jar
注意:选择一个与 /home/cloudera 有关的子目录,那么安装时就不会需要 root 权限。
DB2 JDBC 驱动程序是压缩格式的,只需将它解压缩到目标目录中,如清单 2 所示。
清单 2. DB2 JDBC 驱动程序安装mkdir db2jdbccd db2jdbcunzip ../ibm_data_server_driver_for_jdbc_sqlj_v10.1.zip
快速了解 HDFS 和 MapReduce
在您开始在关系数据库和 Hadoop 之间移动数据之前,需要快速了解一下 HDFS 和 MapReduce。有很多 “Hello World” 风格的 Hadoop ,所以这里的示例只是为了提供足够的背景知识,以使数据库练习对您有意义。
HDFS 在集群中跨节点提供存储空间。使用 Hadoop 的第一步是将数据放入 HDFS 中。如清单 3 所示的代码获得 Mark Twain 的一本书和 James Fenimore 的一本书的副本,并将这些文本复制到 HDFS 中。
清单 3. 将 Mark Twain 和 James Fenimore Cooper 加载到 HDFS 中
# install wget utility into the virtual p_w_picpathsudo yum install wget # use wget to download the Twain and Cooper's works$ wget -U firefox http://www.gutenberg.org/cache/epub/76/pg76.txt$ wget -U firefox http://www.gutenberg.org/cache/epub/3285/pg3285.txt # load both into the HDFS file system# first give the files better names# DS for Deerslayer# HF for Huckleberry Finn$ mv pg3285.txt DS.txt$ mv pg76.txt HF.txt # this next command will fail if the directory already exists$ hadoop fs -mkdir /user/cloudera # now put the text into the directory $ hadoop fs -put HF.txt /user/cloudera # way too much typing, create aliases for hadoop commands$ alias hput="hadoop fs -put"$ alias hcat="hadoop fs -cat"$ alias hls="hadoop fs -ls"# for CDH4 $ alias hrmr="hadoop fs -rm -r"# for CDH3 $ alias hrmr="hadoop fs -rmr" # load the other article# but add some compression because we can $ gzip DS.txt # the . in the next command references the cloudera home directory# in hdfs, /user/cloudera $ hput DS.txt.gz . # now take a look at the files we have in place$ hlsFound 2 items-rw-r--r-- 1 cloudera supergroup 459386 2012-08-08 19:34 /user/cloudera/DS.txt.gz-rw-r--r-- 1 cloudera supergroup 597587 2012-08-08 19:35 /user/cloudera/HF.txt
现在,在 HDFS 中的目录里面有两个文件。请控制您的兴奋。真的,在单个节点上,只有约 1 兆字节的数据,这就像看着油漆变干那样令人兴奋。但是,如果这是一个具有 400 个节点的集群,而您有 5 PB 数据,那么您真的很难控制自己的兴奋程度。
很多 Hadoop 教程使用示例 jar 文件中所包含的单词计数示例。事实证明,大量分析都涉及计数和汇总。清单 4 中的示例显示如何调用计数器。
清单 4. 对 Twain 和 Cooper 的单词进行计数
# hadoop comes with some examples# this next line uses the provided java implementation of a # word count program # for CDH4:hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar wordcount HF.txt HF.out# for CDH3:hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount HF.txt HF.out # for CDH4:hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar wordcount DS.txt.gz DS.out# for CDH3:hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount DS.txt.gz DS.out
DS.txt.gz 上的 .gz 后缀告诉 Hadoop 要在 map-reduce 处理过程中处理解压缩。Cooper 的单词有点冗长,所以应该进行压缩。
运行您的单词计数作业会产生相当多的消息流。Hadoop 很乐意为您提供有关以您的名义运行的映射和缩减程序的大量详细信息。您要寻找的关键行如清单 5 所示,包括失败作业的第二列表,以及如何解决您在运行 MapReduce 时会遇到的最常见错误之一。
清单 5. MapReduce 消息 - “快乐之路”
$ hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount HF.txt HF.out12/08/08 19:23:46 INFO input.FileInputFormat: Total input paths to process : 112/08/08 19:23:47 WARN snappy.LoadSnappy: Snappy native library is available12/08/08 19:23:47 INFO util.NativeCodeLoader: Loaded the native-hadoop library12/08/08 19:23:47 INFO snappy.LoadSnappy: Snappy native library loaded12/08/08 19:23:47 INFO mapred.JobClient: Running job: job_201208081900_000212/08/08 19:23:48 INFO mapred.JobClient: map 0% reduce 0%12/08/08 19:23:54 INFO mapred.JobClient: map 100% reduce 0%12/08/08 19:24:01 INFO mapred.JobClient: map 100% reduce 33%12/08/08 19:24:03 INFO mapred.JobClient: map 100% reduce 100%12/08/08 19:24:04 INFO mapred.JobClient: Job complete: job_201208081900_000212/08/08 19:24:04 INFO mapred.JobClient: Counters: 2612/08/08 19:24:04 INFO mapred.JobClient: Job Counters 12/08/08 19:24:04 INFO mapred.JobClient: Launched reduce tasks=112/08/08 19:24:04 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=595912/08/08 19:24:04 INFO mapred.JobClient: Total time spent by all reduces...12/08/08 19:24:04 INFO mapred.JobClient: Total time spent by all maps waiting...12/08/08 19:24:04 INFO mapred.JobClient: Launched map tasks=112/08/08 19:24:04 INFO mapred.JobClient: Data-local map tasks=112/08/08 19:24:04 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=943312/08/08 19:24:04 INFO mapred.JobClient: FileSystemCounters12/08/08 19:24:04 INFO mapred.JobClient: FILE_BYTES_READ=19229812/08/08 19:24:04 INFO mapred.JobClient: HDFS_BYTES_READ=59770012/08/08 19:24:04 INFO mapred.JobClient: FILE_BYTES_WRITTEN=49874012/08/08 19:24:04 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=13821812/08/08 19:24:04 INFO mapred.JobClient: Map-Reduce Framework12/08/08 19:24:04 INFO mapred.JobClient: Map input records=1173312/08/08 19:24:04 INFO mapred.JobClient: Reduce shuffle bytes=19229812/08/08 19:24:04 INFO mapred.JobClient: Spilled Records=2767612/08/08 19:24:04 INFO mapred.JobClient: Map output bytes=103301212/08/08 19:24:04 INFO mapred.JobClient: CPU time spent (ms)=243012/08/08 19:24:04 INFO mapred.JobClient: Total committed heap usage (bytes)=18370150412/08/08 19:24:04 INFO mapred.JobClient: Combine input records=11336512/08/08 19:24:04 INFO mapred.JobClient: SPLIT_RAW_BYTES=11312/08/08 19:24:04 INFO mapred.JobClient: Reduce input records=1383812/08/08 19:24:04 INFO mapred.JobClient: Reduce input groups=1383812/08/08 19:24:04 INFO mapred.JobClient: Combine output records=1383812/08/08 19:24:04 INFO mapred.JobClient: Physical memory (bytes) snapshot=25647923212/08/08 19:24:04 INFO mapred.JobClient: Reduce output records=1383812/08/08 19:24:04 INFO mapred.JobClient: Virtual memory (bytes) snapshot=102704742412/08/08 19:24:04 INFO mapred.JobClient: Map output records=113365
所有这些信息有什么意义呢?Hadoop 已经做了很多工作,并且尝试告诉您这些工作,具体包括以下内容。
检查输入文件是否存在。
检查输出目录是否存在,如果存在,中止作业。没有什么比因为一个简单的键盘错误而要重写数小时的计算更糟糕的了。
将 Java jar 文件分发到负责执行工作的所有节点。在本例中,只有一个节点。
运行作业的映射程序阶段。通常情况下,这会解析输入文件,并发出一个键值对。注意:键和值可以是对象。
运行排序阶段,这会根据键对映射程序输出进行排序。
运行归约阶段,这通常会汇总键值流,并将输出写入 HDFS 中。
创建多个进度指标。
图 6 显示了运行 Hive 练习后的 Hadoop 作业指标的一个样例 Web 页面。
图 6. Hadoop 的样例 Web 页面
作业执行了什么,其输出在哪里?这两个都是很好的问题,如清单 6 所示。
清单 6. map-reduce 输出
# way too much typing, create aliases for hadoop commands$ alias hput="hadoop fs -put"$ alias hcat="hadoop fs -cat"$ alias hls="hadoop fs -ls"$ alias hrmr="hadoop fs -rmr" # first list the output directory$ hls /user/cloudera/HF.outFound 3 items-rw-r--r-- 1 cloudera supergroup 0 2012-08-08 19:38 /user/cloudera/HF.out/_SUCCESSdrwxr-xr-x - cloudera supergroup 0 2012-08-08 19:38 /user/cloudera/HF.out/_logs-rw-r--r-- 1 cl... sup... 138218 2012-08-08 19:38 /user/cloudera/HF.out/part-r-00000 # now cat the file and pipe it to the less command$ hcat /user/cloudera/HF.out/part-r-00000 | less # here are a few lines from the file, the word elephants only got used twiceelder, 1eldest 1elect 1elected 1electronic 27electronically 1electronically, 1elegant 1elegant!--'deed 1elegant, 1elephants 2
在该事件中,您运行了两次相同的作业,但却忘记了删除输出目录,您将收到如清单 7 所示的错误消息。要修复这个错误很简单,只需要删除该目录即可。
清单 7. MapReduce 消息 - 由于 HDFS 中已经存在输出而引起的失败
# way too much typing, create aliases for hadoop commands$ alias hput="hadoop fs -put"$ alias hcat="hadoop fs -cat"$ alias hls="hadoop fs -ls"$ alias hrmr="hadoop fs -rmr" $ hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount HF.txt HF.out12/08/08 19:26:23 INFO mapred.JobClient: Cleaning up the staging area hdfs://0.0.0.0/var/l...12/08/08 19:26:23 ERROR security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory HF.out already existsorg.apache.hadoop.mapred.FileAlreadyExistsException: Output directory HF.out already existsat org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:872)at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833) .... lines deleted # the simple fix is to remove the existing output directory $ hrmr HF.out # now you can re-run the job successfully # if you run short of space and the namenode enters safemode# clean up some file space and then $ hadoop dfsadmin -safemode leave
Hadoop 包括一个检查 HDFS 状态的浏览器界面。图 7 显示了单词计数作业的输出。
图 7. 使用浏览器查看 HDFS
Cloudera 网站免费提供了一个更复杂的控制台。这个控制台提供了大量超出标准 Hadoop Web 界面的功能。请注意,图 8 所示的 HDFS 健康状态为 Bad。
图 8. 由 Cloudera Manager 管理的 Hadoop 服务
为什么是 Bad(不好)?因为在单个虚拟机中,HDFS 无法制作数据块的三个副本。当块不足以复制时,就会存在数据丢失的风险,因此系统的健康状态是不好的。您没有尝试在单个节点上运行生产 Hadoop 作业,这是好事。
您的 MapReduce 作业并不会受限于 Java。最后这个 MapReduce 示例使用 Hadoop Streaming 支持用 Python 编写的一个映射程序和用 AWK 编写的缩减程序。不,您不必是一个 Java 大师也可以编写 map-reduce!
Mark Twain 并不是 Cooper 的铁杆粉丝。在这个用例中,Hadoop 将提供比较 Twain 和 Cooper 的一些简单的文学评论。Flesch-Kincaid 测试对特定文本的阅读级别进行计算。此分析的因素之一是句子的平均长度。解析句子原来比只是查找句号字符要复杂得多。openNLP 包和 Python NLTK 包有出色的句子分析程序。为了简单起见,清单 8 中的示例将使用字长替代一个单词中的音节数。如果您想将这项工作带到一个新的水平,在 MapReduce 中实施 Flesch-Kincaid 测试,抓取 Web,并计算出您最喜爱的新闻站点的阅读级别。
清单 8. 基于 Python 的映射程序文学评论
# here is the mapper we'll connect to the streaming hadoop interface # the mapper is reading the text in the file - not really appreciating Twain's humor# # modified from # http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/$ cat mapper.py #!/usr/bin/env pythonimport sys # read stdinfor linein in sys.stdin:# strip blankslinein = linein.strip()# split into wordsmywords = linein.split()# loop on mywords, output the length of each wordfor word in mywords:# the reducer just cares about the first column, # normally there is a key - value pairprint '%s %s' % (len(word), 0)
针对单词 “Twain” 的映射程序输出将是 5 0。字长按数值顺序进行排序,并按排序顺序提交给缩减程序。在清单 9 和清单 10 中的示例中,不需要对数据进行排序,就可以得到正确的输出,但排序是内置在 MapReduce 基础架构中的,无论如何都会发生。
清单 9. 用于文学评论的 AWK 缩减程序
# the awk code is modified from http://www.commandlinefu.com # awk is calculating# NR - the number of words in total# sum/NR - the average word length# sqrt(mean2/NR) - the standard deviation $ cat statsreducer.awk awk '{delta = $1 - avg; avg += delta / NR; \mean2 += delta * ($1 - avg); sum=$1+sum } \END { print NR, sum/NR, sqrt(mean2 / NR); }'
清单 10. 使用 Hadoop Streaming 运行 Python 映射程序和 AWK 缩减程序
# test locally # because we're using Hadoop Streaming, we can test the # mapper and reducer with simple pipes # the "sort" phase is a reminder the keys are sorted# before presentation to the reducer#in this example it doesn't matter what order the # word length values are presented for calculating the std deviation $ zcat ../DS.txt.gz | ./mapper.py | sort | ./statsreducer.awk 215107 4.56068 2.50734 # now run in hadoop with streaming # CDH4hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \-input HF.txt -output HFstats -file ./mapper.py -file \./statsreducer.awk -mapper ./mapper.py -reducer ./statsreducer.awk # CDH3$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \-input HF.txt -output HFstats -file ./mapper.py -file ./statsreducer.awk \-mapper ./mapper.py -reducer ./statsreducer.awk $ hls HFstatsFound 3 items-rw-r--r-- 1 cloudera supergroup 0 2012-08-12 15:38 /user/cloudera/HFstats/_SUCCESSdrwxr-xr-x - cloudera supergroup 0 2012-08-12 15:37 /user/cloudera/HFstats/_logs-rw-r--r-- 1 cloudera ... 24 2012-08-12 15:37 /user/cloudera/HFstats/part-00000 $ hcat /user/cloudera/HFstats/part-00000113365 4.11227 2.17086 # now for cooper $ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \-input DS.txt.gz -output DSstats -file ./mapper.py -file ./statsreducer.awk \-mapper ./mapper.py -reducer ./statsreducer.awk $ hcat /user/cloudera/DSstats/part-00000215107 4.56068 2.50734
Mark Twain 的粉丝若知道 Hadoop 发现 Cooper 使用较长的单词,并且其标准偏差令人震惊,那么他们就可以愉快地放松了(幽默意图)。当然,要是假设较短的单词会更好。让我们继续,下一步是将 HDFS 中的数据写入 Informix 和 DB2。
使用 Sqoop 通过 JDBC 将来自 HDFS 的数据写入 Informix、DB2 或 MySQL
Sqoop Apache 项目是一个开源的基于 JDBC 的 Hadoop,用于数据库的数据移动实用程序。Sqoop 最初由在 Cloudera 的***马拉松 (hackathon) 创建,后来成为开源的工具。
将数据从 HDFS 移动到关系数据库是一种常见的用例。HDFS 和 map-reduce 在执行繁重工作方面是非常棒的。对于简单的查询或 Web 站点的后端存储,在关系存储区中缓存 map-reduce 输出是一个很好的设计模式。您可以避免重新运行 map-reduce 单词计数,只需将结果 Sqoop 到 Informix 和 DB2 中即可。您已经生成了关于 Twain 和 Cooper 的数据,现在,让我们把这些数据移动到一个数据库,如清单 11 所示。
清单 11. JDBC 驱动程序安装
#Sqoop needs access to the JDBC driver for every# database that it will access # please copy the driver for each database you plan to use for these exercises# the MySQL database and driver are already installed in the virtual p_w_picpath# but you still need to copy the driver to the sqoop/lib directory #one time copy of jdbc driver to sqoop lib directory$ sudo cp Informix_JDBC_Driver/lib/ifxjdbc*.jar /usr/lib/sqoop/lib/$ sudo cp db2jdbc/db2jcc*.jar /usr/lib/sqoop/lib/$ sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/
清单 12 至 15 所示的示例分别对应于每种数据库。请跳到您感兴趣的示例,包括 Informix、DB2 或 MySQL。对于掌握多种数据库语言的人,请享受执行每个示例的乐趣。如果这里没有包括您首选的数据库,让这些示例在其他地方工作也不会是一个巨大的挑 战。
清单 12. Informix 用户:Sqoop 将单词计数的结果写入 Informix
# create a target table to put the data# fire up dbaccess and use this sql # create table wordcount ( word char(36) primary key, n int); # now run the sqoop command# this is best put in a shell script to help avoid typos... $ sqoop export -D sqoop.export.records.per.statement=1 \--fields-terminated-by '\t' --driver com.informix.jdbc.IfxDriver \--connect \"jdbc:informix-sqli://myhost:54321/stores_demo:informixserver=i7;user=me;password=mypw" \--table wordcount --export-dir /user/cloudera/HF.out
清单 13. Informix 用户:Sqoop 将单词计数的结果写入 Informix
12/08/08 21:39:42 INFO manager.SqlManager: Using default fetchSize of 100012/08/08 21:39:42 INFO tool.CodeGenTool: Beginning code generation12/08/08 21:39:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/08 21:39:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/08 21:39:43 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop12/08/08 21:39:43 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar12/08/08 21:39:45 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/248b77c05740f863a15e0136accf32cf/wordcount.jar12/08/08 21:39:45 INFO mapreduce.ExportJobBase: Beginning export of wordcount12/08/08 21:39:45 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/08 21:39:46 INFO input.FileInputFormat: Total input paths to process : 112/08/08 21:39:46 INFO input.FileInputFormat: Total input paths to process : 112/08/08 21:39:46 INFO mapred.JobClient: Running job: job_201208081900_001212/08/08 21:39:47 INFO mapred.JobClient: map 0% reduce 0%12/08/08 21:39:58 INFO mapred.JobClient: map 38% reduce 0%12/08/08 21:40:00 INFO mapred.JobClient: map 64% reduce 0%12/08/08 21:40:04 INFO mapred.JobClient: map 82% reduce 0%12/08/08 21:40:07 INFO mapred.JobClient: map 98% reduce 0%12/08/08 21:40:09 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_0, Status : FAILEDjava.io.IOException: java.sql.SQLException: Encoding or code set not supported.at ...SqlRecordWriter.close(AsyncSqlRecordWriter.java:187)at ...$NewDirectOutputCollector.close(MapTask.java:540)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)at org.apache.hadoop.mapred.Child$4.run(Child.java:270)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:396)at ....doAs(UserGroupInformation.java:1177)at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: java.sql.SQLException: Encoding or code set not supported.at com.informix.util.IfxErrMsg.getSQLException(IfxErrMsg.java:413)at com.informix.jdbc.IfxChar.toIfx(IfxChar.java:135)at com.informix.jdbc.IfxSqli.a(IfxSqli.java:1304)at com.informix.jdbc.IfxSqli.d(IfxSqli.java:1605)at com.informix.jdbc.IfxS12/08/08 21:40:11 INFO mapred.JobClient: map 0% reduce 0%12/08/08 21:40:15 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_1, Status : FAILEDjava.io.IOException: java.sql.SQLException: Unique constraint (informix.u169_821) violated.at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:223)at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:49)at .mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:531)at .mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:82)at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:40)at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)at .mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:189)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)at org.apache.hadoop.mapred.Child$4.run(Child.java:270)at java.security.AccessController.doPrivileged(Native Method)at javax.security.a12/08/08 21:40:20 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_2, Status : FAILEDjava.sql.SQLException: Unique constraint (informix.u169_821) violated.at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:223)at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:49)at .mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:531)at .mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:82)at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:40)at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)at .mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:189)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)at org.apache.hadoop.mapred.Child$4.run(Child.java:270)at java.security.AccessController.doPrivileged(Native Method)at javax.security.a12/08/08 21:40:27 INFO mapred.JobClient: Job complete: job_201208081900_001212/08/08 21:40:27 INFO mapred.JobClient: Counters: 712/08/08 21:40:27 INFO mapred.JobClient: Job Counters 12/08/08 21:40:27 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=3847912/08/08 21:40:27 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=012/08/08 21:40:27 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=012/08/08 21:40:27 INFO mapred.JobClient: Launched map tasks=412/08/08 21:40:27 INFO mapred.JobClient: Data-local map tasks=412/08/08 21:40:27 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=012/08/08 21:40:27 INFO mapred.JobClient: Failed map tasks=112/08/08 21:40:27 INFO mapreduce.ExportJobBase: Transferred 0 bytes in 41.5758 seconds (0 bytes/sec)12/08/08 21:40:27 INFO mapreduce.ExportJobBase: Exported 0 records.12/08/08 21:40:27 ERROR tool.ExportTool: Error during export: Export job failed! # despite the errors above, rows are inserted into the wordcount table# one row is missing# the retry and duplicate key exception are most likely related, but# troubleshooting will be saved for a later article # check how we did# nothing like a "here document" shell script $ dbaccess stores_demo - <select count(*) from wordcount;> eoj Database selected.(count(*)) 138371 row(s) retrieved.Database closed.
清单 14. DB2 用户:Sqoop 将单词计数的结果写入 DB2
# here is the db2 syntax# create a destination table for db2##db2 => connect to sample## Database Connection Information## Database server = DB2/LINUXX8664 10.1.0# SQL authorization ID = DB2INST1# Local database alias = SAMPLE##db2 => create table wordcount ( word char(36) not null primary key , n int)#DB20000I The SQL command completed successfully.# sqoop export -D sqoop.export.records.per.statement=1 \--fields-terminated-by '\t' \--driver com.ibm.db2.jcc.DB2Driver \--connect "jdbc:db2://192.168.1.131:50001/sample" \--username db2inst1 --password db2inst1 \--table wordcount --export-dir /user/cloudera/HF.out 12/08/09 12:32:59 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.12/08/09 12:32:59 INFO manager.SqlManager: Using default fetchSize of 100012/08/09 12:32:59 INFO tool.CodeGenTool: Beginning code generation12/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/09 12:32:59 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop12/08/09 12:32:59 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar12/08/09 12:33:00 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/5532984df6e28e5a45884a21bab245ba/wordcount.jar12/08/09 12:33:00 INFO mapreduce.ExportJobBase: Beginning export of wordcount12/08/09 12:33:01 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=012/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 112/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 112/08/09 12:33:02 INFO mapred.JobClient: Running job: job_201208091208_000212/08/09 12:33:03 INFO mapred.JobClient: map 0% reduce 0%12/08/09 12:33:14 INFO mapred.JobClient: map 24% reduce 0%12/08/09 12:33:17 INFO mapred.JobClient: map 44% reduce 0%12/08/09 12:33:20 INFO mapred.JobClient: map 67% reduce 0%12/08/09 12:33:23 INFO mapred.JobClient: map 86% reduce 0%12/08/09 12:33:24 INFO mapred.JobClient: map 100% reduce 0%12/08/09 12:33:25 INFO mapred.JobClient: Job complete: job_201208091208_000212/08/09 12:33:25 INFO mapred.JobClient: Counters: 1612/08/09 12:33:25 INFO mapred.JobClient: Job Counters 12/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=2164812/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=012/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=012/08/09 12:33:25 INFO mapred.JobClient: Launched map tasks=112/08/09 12:33:25 INFO mapred.JobClient: Data-local map tasks=112/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=012/08/09 12:33:25 INFO mapred.JobClient: FileSystemCounters12/08/09 12:33:25 INFO mapred.JobClient: HDFS_BYTES_READ=13835012/08/09 12:33:25 INFO mapred.JobClient: FILE_BYTES_WRITTEN=6942512/08/09 12:33:25 INFO mapred.JobClient: Map-Reduce Framework12/08/09 12:33:25 INFO mapred.JobClient: Map input records=1383812/08/09 12:33:25 INFO mapred.JobClient: Physical memory (bytes) snapshot=10514841612/08/09 12:33:25 INFO mapred.JobClient: Spilled Records=012/08/09 12:33:25 INFO mapred.JobClient: CPU time spent (ms)=925012/08/09 12:33:25 INFO mapred.JobClient: Total committed heap usage (bytes)=4200857612/08/09 12:33:25 INFO mapred.JobClient: Virtual memory (bytes) snapshot=59644723212/08/09 12:33:25 INFO mapred.JobClient: Map output records=1383812/08/09 12:33:25 INFO mapred.JobClient: SPLIT_RAW_BYTES=12612/08/09 12:33:25 INFO mapreduce.ExportJobBase: Transferred 135.1074 KB in 24.4977 seconds (5.5151 KB/sec)12/08/09 12:33:25 INFO mapreduce.ExportJobBase: Exported 13838 records. # check on the results...##db2 => select count(*) from wordcount ##1 #-----------# 13838## 1 record(s) selected.##
清单 15. MySQL 用户:Sqoop 将单词计数的结果写入 MySQL
# if you don't have Informix or DB2 you can still do this example# mysql - it is already installed in the VM, here is how to access # one time copy of the JDBC driver sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/ # now create the database and table $ mysql -u rootWelcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id is 45Server version: 5.0.95 Source distribution Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or itsaffiliates. Other names may be trademarks of their respectiveowners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> create database mydemo;Query OK, 1 row affected (0.00 sec) mysql> use mydemoDatabase changedmysql> create table wordcount ( word char(36) not null primary key, n int);Query OK, 0 rows affected (0.00 sec) mysql> exitBye # now export $ sqoop export --connect jdbc:mysql://localhost/mydemo \--table wordcount --export-dir /user/cloudera/HF.out \--fields-terminated-by '\t' --username root