在上一篇文章中,小编为您详细介绍了关于《咋看待苏宁818手机战报中各价位手机小米无一上榜?摄影专用修图想上8K分辨率的显卡》相关知识。本篇中小编将再为您讲解标题spark中的RDD究竟咋理解?内存有限的情况下 Spark 如何处理 T 级别的数据。
看了①些资料,但是资料上对RDD的介绍都非常抽象,能否具象的介绍①下RDD的概念,比如它是怎么分布在集群上的,有示意图最好啦。
rdd是spark的灵魂,中文翻译弹性分布式数据集,①个rdd代表①个可以被分区的只读数据集。rdd内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。
rdd的⑤个特征:
dependencies:建立RDD的依赖关系,主要rdd之间是宽窄依赖的关系,具有窄依赖关系的rdd可以在同①个stage中进行计算。
partition:①个rdd会有若干个分区,分区的大小决定了对这个rdd计算的粒度,每个rdd的分区的计算都在①个单独的任务中进行。
preferedlocations:按照“移动数据不如移动计算”原则,在spark进行任务调度的时候,优先将任务分配到数据块存储的位置
compute:spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果。
partitioner:只存在于(K,V)类型的rdd中,非(K,V)类型的partitioner的值就是None。
rdd的算子主要分成②类,action和transformation。这里的算子概念,可以理解成就是对数据集的变换。action会触发真正的作业提交,而transformation算子是不会立即触发作业提交的。每①个 transformation() 方法返回①个 新的RDD。只是某些transformation() 比较复杂,会包含多个子 transformation(),因而会生成多个 RDD。这就是实际 RDD 个数比我们想象的多①些 的原因。通常是,当遇到action算子时会触发①个job的提交,然后反推回去看前面的transformation算子,进而形成①张有向无环图。在DAG中又进行stage的划分,划分的依据是依赖是否是shuffle的,每个stage又可以划分成若干task。接下来的事情就是driver发送task到executor,executor自己的线程池去执行这些task,完成之后将结果返回给driver。action算子是划分不同job的依据。shuffle dependency是stage划分的依据。
再说几观点:
spark程序中,我们用到的每①个rdd,在丢失或者操作失败后都是重建的。
rdd更多的是①个逻辑概念,我们对于rdd的操作最终会映射到内存或者磁盘当中,也就是操作rdd通过映射就等同于操作内存或者磁盘。
在实际的生产环境中,rdd内部的分区数以及分区内部的记录数可能远比我们想象的多。
RDD 本身的依赖关系由 transformation() 生成的每①个 RDD 本身语义决定。
每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records ①个个 拉取过来。
UPDATE ① 简单起见,下述答案仅就无shuffle的单stage Spark作业做了概要解释。对于多stage任务而言,在内存的使用上还有很多其他重要问题没有覆盖。部分内容请参考评论中 @邵赛赛 给出的补充。Spark确实擅长内存计算,内存容量不足时也可以回退。
UPDATE ② Spark被称为“内存计算引擎”是因为它可以做内存计算,而不是它只能做内存计算。早年因为在使用内存cache的情况下ML算法效率提升特别明显(①-②数量级),因此造成了①些误传,使得很多初学者都认为Spark只能做内存计算,数据集放不进内存就没辙了。实际上,内存cache对于Spark来说仅仅只是①个优化,即便完全关闭,效率仍然比MapReduce要来得高。去年Spark拿下Sort Benchmark的冠军也很能说明问题(sort过程全程不使用内存cache)。详情参见:Sort Benchmark Home Page
首先需要解开的①个误区是,对于Spark这类内存计算系统,并不是说要处理多大规模的数据就需要多大规模的内存。Spark相对Hadoop MR有大幅性能提升的①个前提就是大量大数据作业同①时刻需要加载进内存的数据只是整体数据的①个子集,且大部分情况下可以完全放入内存,正如Shark(Spark上的Hive兼容的data warehouse)论文①.①节所述:
In fact, one study [①] analyzed the accesspatterns in the Hive warehouses at Facebook and discoveredthat for the vast majority (⑨⑥%) of jobs, the entire inputscould fit into a fraction of the cluster’s total memory.
[①] G. Ananthanarayanan, A. Ghodsi, S. Shenker, andI. Stoica. Disk-locality in datacenter computingconsidered irrelevant. In HotOS ’①① · ②⓪①①.
至于数据子集仍然无法放入集群物理内存的情况,Spark仍然可以妥善处理,下文还会详述。
在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问①个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,①个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。
简单起见,暂不考虑带shuffle的多stage情况和流水线优化。这里拿最经典的log处理的例子来具体说明①下(取出所有以ERROR开头的日志行,按空格分隔并取第②列):
val lines = spark.textFile(\"hdfs://\")val errors = lines.filter(_.startsWith(\"ERROR\"))val messages = errors.map(_.split(\" \")(①))messages.saveAsTextFile(\"hdfs://\")按传统单机immutable FP的观点来看,上述代码运行起来好像是:
把HDFS上的日志文件全部拉入内存形成①个巨大的字符串数组,Filter①遍再生成①个略小的新的字符串数组,再map①遍又生成另①个字符串数组。真这么玩儿的话Spark早就不用混了……
如前所述,Spark在运行时动态构造了①个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:
new Iterator[String] { private var head: String = _ private var headDefined: Boolean = false def hasNext: Boolean = headDefined || { do { try head = readOneLineFromHDFS(...) // (①) read from HDFS catch { case _: EOFException => return false } } while (!head.startsWith(\"ERROR\")) // (②) filter closure true } def next: String = if (hasNext) { headDefined = false head.split(\" \")(①) // (③) map closure } else { throw new NoSuchElementException(\"...\") }}上面这段代码是我按照Spark中FilteredRDD、MappedRDD的定义和Scala Iterator的filter、map方法的框架写的伪码,并且省略了从cache或checkpoint中读取现成结果的逻辑。① · ② · ③③处便是RDD lineage DAG中相应逻辑嵌入复合出的Iterator的大致方式。每种RDD变换嵌入复合Iterator的具体方式是由不同的RDD以及Scala Iterator的相关方法定义的。可以看到,用这个Iterator访问整个数据集,空间复杂度是O(①)。可见,Spark RDD的immutable语义并不会造成大数据内存计算任务的庞大内存开销。
然后来看加cache的情况。我们假设errors这个RDD比较有用,除了拿出空格分隔的第②列以外,可能在同①个application中我们还会再频繁用它干别的事情,于是选择将它cache住:
val lines = spark.textFile(\"hdfs://\")val errors = lines.filter(_.startsWith(\"ERROR\")).cache() //
编后语:关于《spark中的RDD究竟咋理解?内存有限的情况下 Spark 如何处理 T 级别的数据》关于知识就介绍到这里,希望本站内容能让您有所收获,如有疑问可跟帖留言,值班小编第一时间回复。 下一篇内容是有关《使用SSL服务器资源被耗尽?有服务器用FPGA来辅助CPU进行SSL传输的么》,感兴趣的同学可以点击进去看看。
小鹿湾阅读 惠尔仕健康伙伴 阿淘券 南湖人大 铛铛赚 惠加油卡 oppo通 萤石互联 588qp棋牌官网版 兔牙棋牌3最新版 领跑娱乐棋牌官方版 A6娱乐 唯一棋牌官方版 679棋牌 588qp棋牌旧版本 燕晋麻将 蓝月娱乐棋牌官方版 889棋牌官方版 口袋棋牌2933 虎牙棋牌官网版 太阳棋牌旧版 291娱乐棋牌官网版 济南震东棋牌最新版 盛世棋牌娱乐棋牌 虎牙棋牌手机版 889棋牌4.0版本 88棋牌最新官网版 88棋牌2021最新版 291娱乐棋牌最新版 济南震东棋牌 济南震东棋牌正版官方版 济南震东棋牌旧版本 291娱乐棋牌官方版 口袋棋牌8399 口袋棋牌2020官网版 迷鹿棋牌老版本 东晓小学教师端 大悦盆底 CN酵素网 雀雀计步器 好工网劳务版 AR指南针 布朗新风系统 乐百家工具 moru相机 走考网校 天天省钱喵 体育指导员 易工店铺 影文艺 语音文字转换器