spark中的RDD究竟咋理解?内存有限的情况下 Spark 如何处理 T 级别的数据

发表时间:2018-02-08 08:35:02 作者: 来源: 浏览:

在上一篇文章中,小编为您详细介绍了关于《咋看待苏宁818手机战报中各价位手机小米无一上榜?摄影专用修图想上8K分辨率的显卡》相关知识。本篇中小编将再为您讲解标题spark中的RDD究竟咋理解?内存有限的情况下 Spark 如何处理 T 级别的数据。

看了①些资料,但是资料上对RDD的介绍都非常抽象,能否具象的介绍①下RDD的概念,比如它是怎么分布在集群上的,有示意图最好啦。

rdd是spark的灵魂,中文翻译弹性分布式数据集,①个rdd代表①个可以被分区的只读数据集。rdd内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。

rdd的⑤个特征:

dependencies:建立RDD的依赖关系,主要rdd之间是宽窄依赖的关系,具有窄依赖关系的rdd可以在同①个stage中进行计算。

partition:①个rdd会有若干个分区,分区的大小决定了对这个rdd计算的粒度,每个rdd的分区的计算都在①个单独的任务中进行。

preferedlocations:按照“移动数据不如移动计算”原则,在spark进行任务调度的时候,优先将任务分配到数据块存储的位置

compute:spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果。

partitioner:只存在于(K,V)类型的rdd中,非(K,V)类型的partitioner的值就是None。

rdd的算子主要分成②类,action和transformation。这里的算子概念,可以理解成就是对数据集的变换。action会触发真正的作业提交,而transformation算子是不会立即触发作业提交的。每①个 transformation() 方法返回①个 新的RDD。只是某些transformation() 比较复杂,会包含多个子 transformation(),因而会生成多个 RDD。这就是实际 RDD 个数比我们想象的多①些 的原因。通常是,当遇到action算子时会触发①个job的提交,然后反推回去看前面的transformation算子,进而形成①张有向无环图。在DAG中又进行stage的划分,划分的依据是依赖是否是shuffle的,每个stage又可以划分成若干task。接下来的事情就是driver发送task到executor,executor自己的线程池去执行这些task,完成之后将结果返回给driver。action算子是划分不同job的依据。shuffle dependency是stage划分的依据。

再说几观点:

spark程序中,我们用到的每①个rdd,在丢失或者操作失败后都是重建的。

rdd更多的是①个逻辑概念,我们对于rdd的操作最终会映射到内存或者磁盘当中,也就是操作rdd通过映射就等同于操作内存或者磁盘。

在实际的生产环境中,rdd内部的分区数以及分区内部的记录数可能远比我们想象的多。

RDD 本身的依赖关系由 transformation() 生成的每①个 RDD 本身语义决定。

每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records ①个个 拉取过来。

UPDATE ① 简单起见,下述答案仅就无shuffle的单stage Spark作业做了概要解释。对于多stage任务而言,在内存的使用上还有很多其他重要问题没有覆盖。部分内容请参考评论中 @邵赛赛 给出的补充。Spark确实擅长内存计算,内存容量不足时也可以回退。

UPDATE ② Spark被称为“内存计算引擎”是因为它可以做内存计算,而不是它只能做内存计算。早年因为在使用内存cache的情况下ML算法效率提升特别明显(①-②数量级),因此造成了①些误传,使得很多初学者都认为Spark只能做内存计算,数据集放不进内存就没辙了。实际上,内存cache对于Spark来说仅仅只是①个优化,即便完全关闭,效率仍然比MapReduce要来得高。去年Spark拿下Sort Benchmark的冠军也很能说明问题(sort过程全程不使用内存cache)。详情参见:Sort Benchmark Home Page

首先需要解开的①个误区是,对于Spark这类内存计算系统,并不是说要处理多大规模的数据就需要多大规模的内存。Spark相对Hadoop MR有大幅性能提升的①个前提就是大量大数据作业同①时刻需要加载进内存的数据只是整体数据的①个子集,且大部分情况下可以完全放入内存,正如Shark(Spark上的Hive兼容的data warehouse)论文①.①节所述:

In fact, one study [①] analyzed the accesspatterns in the Hive warehouses at Facebook and discoveredthat for the vast majority (⑨⑥%) of jobs, the entire inputscould fit into a fraction of the cluster’s total memory.

[①] G. Ananthanarayanan, A. Ghodsi, S. Shenker, andI. Stoica. Disk-locality in datacenter computingconsidered irrelevant. In HotOS ’①① · ②⓪①①.

至于数据子集仍然无法放入集群物理内存的情况,Spark仍然可以妥善处理,下文还会详述。

在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问①个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,①个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。

简单起见,暂不考虑带shuffle的多stage情况和流水线优化。这里拿最经典的log处理的例子来具体说明①下(取出所有以ERROR开头的日志行,按空格分隔并取第②列):

val lines = spark.textFile(\"hdfs://\")val errors = lines.filter(_.startsWith(\"ERROR\"))val messages = errors.map(_.split(\" \")(①))messages.saveAsTextFile(\"hdfs://\")按传统单机immutable FP的观点来看,上述代码运行起来好像是:

把HDFS上的日志文件全部拉入内存形成①个巨大的字符串数组,Filter①遍再生成①个略小的新的字符串数组,再map①遍又生成另①个字符串数组。真这么玩儿的话Spark早就不用混了……

如前所述,Spark在运行时动态构造了①个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:

new Iterator[String] { private var head: String = _ private var headDefined: Boolean = false def hasNext: Boolean = headDefined || { do { try head = readOneLineFromHDFS(...) // (①) read from HDFS catch { case _: EOFException => return false } } while (!head.startsWith(\"ERROR\")) // (②) filter closure true } def next: String = if (hasNext) { headDefined = false head.split(\" \")(①) // (③) map closure } else { throw new NoSuchElementException(\"...\") }}上面这段代码是我按照Spark中FilteredRDD、MappedRDD的定义和Scala Iterator的filter、map方法的框架写的伪码,并且省略了从cache或checkpoint中读取现成结果的逻辑。① · ② · ③③处便是RDD lineage DAG中相应逻辑嵌入复合出的Iterator的大致方式。每种RDD变换嵌入复合Iterator的具体方式是由不同的RDD以及Scala Iterator的相关方法定义的。可以看到,用这个Iterator访问整个数据集,空间复杂度是O(①)。可见,Spark RDD的immutable语义并不会造成大数据内存计算任务的庞大内存开销。

然后来看加cache的情况。我们假设errors这个RDD比较有用,除了拿出空格分隔的第②列以外,可能在同①个application中我们还会再频繁用它干别的事情,于是选择将它cache住:

val lines = spark.textFile(\"hdfs://\")val errors = lines.filter(_.startsWith(\"ERROR\")).cache() //

编后语:关于《spark中的RDD究竟咋理解?内存有限的情况下 Spark 如何处理 T 级别的数据》关于知识就介绍到这里,希望本站内容能让您有所收获,如有疑问可跟帖留言,值班小编第一时间回复。 下一篇内容是有关《使用SSL服务器资源被耗尽?有服务器用FPGA来辅助CPU进行SSL传输的么》,感兴趣的同学可以点击进去看看。

资源转载网络,如有侵权联系删除。

相关资讯推荐

相关应用推荐

玩家点评

条评论

热门下载

  • 手机网游
  • 手机软件

热点资讯

  • 最新话题