Spark+AI Summit 2018:当 Spark 遇上 AI

Spark+AI Summit 2018:当 Spark 遇上 AI

为期三天的 2018 Spark Summit 在美国时间 6/4 - 6/6 于旧金山的 Moscone Center 举行,不少人已经注意到,今年的会议已经更名为 Spark+AI, 去年 12 月份时,Databricks 在他们的博客中就已经提到过,2018 年的会议将包括更多人工智能的内容,某种意义上也代表着Spark 未来的发展方向。

Spark+AI Summit 2018:当 Spark 遇上 AI

流利说工程师在 Spark 会议现场

Hydrogen

在去年的会议上,相信还有人记得 Matei 在 Keynote 中提到过 Spark 的哲学,

 

Unified engine for complete data applications,回顾去年的会议,我们来看 Apache Spark 这一年的发展成果:

Reynold Xin 在第一天的 Keynote 似乎就重申了他们去年要做的内容,Unifying AI and Big Data in Apache Spark。在过去,数据集的准备以及模型的训练工作是分开的,这两部分工作相当于存在两套系统中,Reynold 提到这种方式对于开发、测试都极其麻烦。是否存在于一套方案,同时解决这两类问题呢?

Spark+AI Summit 2018:当 Spark 遇上 AI

很遗憾,现在的 Spark 还无法做到,原因是模型的 Training 在 Spark 的 DAGScheduler 中支持的并不好。在现有的 Spark 执行模型中,一个 Job 会被切分成多个 tasks,彼此之间相互独立,如果其中一个 Task 失败,那么会被单独重试,但在诸多 ML 的框架中,如 MPI,Task 之间会有依赖关系,预期的结果是如果有一个 Task 失败,那么同一个 Stage 下所有的 Tasks 必须都认为是失败的状态,简单来讲,一个 Stage 下所有的 Tasks 必须被一起调度。

Spark+AI Summit 2018:当 Spark 遇上 AI

Reynold 在会议上介绍了 Spark 接下来的一个重点项目,Hydrogen,其核心模块

 

Gang scheduling

 

用来解决 Spark 与 ML frameworks 之间兼容性问题。使用 barrier API 来手动对 Job 切分 stage,并告诉 Scheduler 接下来的 Tasks 需要一起调度,如果某 Task 失败,则一组 Tasks 被一起重试,也就是会议中提到的所有的 Taks

 

All or nothing,但

 

Hydrogen

 

需要解决的问题,看上去远没这么简单,Spark 需要在 Task 级别上做到感知,简单来讲,ML 的问题需要特殊硬件,如 GPU,而剩下的工作需要被路由到更经济的 CPU 上处理。以下是现场的 Demo code:

# barrier() introduces a new execution mode where all tasks run together# runHorovod() launches a Horovod job via MPI using the taskmodel = digits.repartition(2) \     .toPandasRdd() \     .barrier() \     .mapPartitions(runHorovod) \     .collect()[0]MLflow

在 DAY2 的 keynote 上,Spark 和 Mesos 的核心作者兼 Databricks 的 CTO Matei 宣布推出开源的机器学习平台 MLflow,基本覆盖了数据准备到模型训练所有的流程。宗旨仍然为 Data Scientist 简化构建、测试和部署机器学习模型的过程。这个在会议前夕,Matei 还写了篇文章来介绍 MLflow,感兴趣的朋友可以到 databricks 博客看一下。

Delta

目前大多数企业在大数据管理和分析上,仍然采用混合式系统构建,系统之间的对接和维护会大大增加了各类企业成本,导致数据管理异常复杂。

Databricks 的 CEO Ali 宣布了他们的产品,Delta

 

一个统一的数据分析平台。在介绍该产品之前,他认为真正在 AI 领域成功的公司仅有 1%,其他 99% 仍然在 Data 跟 AI 之间挣扎(即便这类公司有大量的数据和工程师),他罗列了三类主要问题,以及三个主要挑战,我觉得可以对应起来看:

  • 数据孤岛 - Data is not ready for Analytics

  • 数据工程与数据分析之间缺少有效对接 - Data Engineers and Data Scientists are Siloed

  • 大部分 AI 应用未成为最终产品 - 

    It"s hard to productionize ML

  • Spark+AI Summit 2018:当 Spark 遇上 AI

    传统的 Data Pipeline 是数据从多个业务系统流入 Kafka,然后由数据工程师接入并做相关的 ETL,这个当中包括数据的清洗,分区,小文件处理,甚至索引的构建,完了之后,这些数据会写回到

     

    Data Lakes(如 Amazon S3),这个过程的问题主要是 ETL 过程容易出错,并且数据团队需要花大部分时间来构建 ETL 作业,除了延时,出错还会影响下游数据分析业务。

    而 Delta 去掉了人工 ETL 的这部分工作,静态或是流数据直接进入 Delta,数据分析人员可以直接基于这上面的数据做出分析,比如 batch job的效果是这样:

    CREATE TABLE connectionsUSING delta AS SELECT * FROM json."/data/connections";SELECT * FROM connections WHERE dest_port = 666;

    对于 streaming job,可以直接把 kafka 中的数据实时 load 到上面的 Delta table

     

    connections:

    INSERT INTO connections SELECT * FROM kafkaStream;

    在数据进入 Delta 后,我们创建的表并支持 ACID 事务和列索引。关于 Delta 的更多信息,感兴趣的朋友可以到 databricks 博客看一下。

    Spark 2.3

    Facebook 工程师,Spark committer,同样也是 Spark 2.3 的 release manager Sameer 介绍了该版的几大主要特性。一共 1406 JIRAs,其中 52% 是 sql/streaming,12% 是 spark core,9% 为 pyspark,以及 8% 为 ML 部分。

  • SPARK-20928 Structured Streaming 加入 Continuous Processing,新的方式会将延时从原先的 10-100ms 降低到 1ms,但消息的处理从 

    exactly once

     改为 

    At-least once

    ,并支持 Stream-Stream Join

  • SPARK-21866 支持加载 image 到 

    DataFrames

  • SPARK-15689 DataSource v2 APIs 

  • PySpark 由于序列化和 Python 的 interpreter 等原因,pyspark 的效率相对于 java/scala 会慢在多,新版本将支持 pandas_udf,使用方式类似:

    from pyspark.sql.functions import pandas_udf, PandasUDFType@pandas_udf("double", PandasUDFType.SCALAR) def pandas_plus_one(v):return v + 1df.witchColumn("v2", pandas_plus_one(df.v))

  • SPARK-18278 Spark on Kubernetes(k8s),目前 spark 运行在 k8s 集群上,还是有很多限制的,比如只能使用 Cluster Mode,并且 k8s 版本需在 1.6+,在 Spark 2.4+ 的 load map 中,会增加 Client Mode,动态资源分配,以及 External shuffle Service. 

  • Optimizing shuffle I/O

    Facebook 介绍他们过去在 shuffle I/O 的研究,其主要成果是降低整体 reducer 端与 mapper 之间的通信次数。如果在 mapper 端都是小的 tasks,那么 shuffle 就会产生大量的分片小文件(Large Amount of Fragmented Shuffle I/O),但在 mapper 端如果是较为大的 tasks,那么 shuffle 阶段在拉取 map 输出时就很轻松,因为不仅仅输出文件少,还有更多顺序读的好处(Fewer, Sequential Shuffle I/O)。但这里的拉取次数,是 M * R,优化方案是在这两种情况下,做出平衡。

     

    他们的做法是在物理 node 节点启动一个叫

     

    SOS Shuffle Service的进程,而在 Driver 端启动一个

     

    SOS Merge Scheduler

     

    的进程,当 worker 端完成 task 时,会向 Job/Task Scheduler 汇报状态,然后此时 Driver 端的

     

    Merge Scheduler

     

    会向 worker 端的

     

    Shuffle service

     

    发出合并请求,完成合并之后再告诉 Driver。 merge 主要有两个目的:

  • Combines small shuffle files into Large ones

  • Keeps partitioned file layout 

  • 最终的 I/O 次数降低到 (M * R) / (merge factor)。 Facebook 最后也透露将会计划提交到 Spark 中,我们拭目以待。感兴趣的朋友可以阅读 Facebook 提供的 paper.

    回顾

    回顾流利说数据团队2017、2016在 Spark Summit 的见闻,欢迎点击如下链接:

  • 流利说 @Spark Summit 2017

     

  • 打开 Spark 的正确姿势

     

  • 招聘

    流利说数据团队正在招聘数据平台开发工程师(

     Hadoop/Spark/Kafka/Presto

    ),负责

    离线计算/实时计算等相关开发。如果对 Spark 和分布式计算感兴趣,欢迎发送简历到邮箱:jobs@liulishuo.com , 期待与你们的交流。

    Spark+AI Summit 2018:当 Spark 遇上 AI