[TOC]
数据类型:csv,orc,excel,word,txt,log
数据存储位置:不是在一台机器上,第三方,公司内部多个系统,
因为历史积累原因。公司的数据集存储在了不同的引擎:redis、mysql、hbase、hdfs、elasticsearch、kafka等
因为早集团没有统一规划数据源,导致数据源多达十几种;所以不好规划统一 ,导致查询关联及其麻烦(要各种预先抽取多个数据源到同一个地方,然后在做统一处理,最后出报表 ,而且查询及其缓慢)
1、前后端任务交换指令:Akka
2、计算引擎:sparkSQL
3、二次定义sparkSQL语法:Antlr
4、服务自动发现:zookeeper
【逻辑架构】
【项目架构】
通讯原理图:
Java内置线程模型 | Scala Actor模型 |
---|---|
“共享数据锁模型” | share nothing |
每个object有一个monitor,监视多线程对共享数据的访问【线程内部】 | 不共享数据,actor之间通过message传递(基于事件驱动) |
加锁的代码通过synchronized标志 | |
死锁的问题 | |
每个线程内部是顺序执行的 | 每个actor内部是顺序执行的 |
1、调用start()方法启动Actor
2、调用start()方法后其act()方法会被执行
3、向Actor发送消息
! | 发送异步消息,没有返回值。 |
---|---|
!? | 发送同步消息,等待返回值。 |
!! | 发送异步消息,返回值是 Future[Any]。 |
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>2.11.8</version>
</dependency>
驱动程序
说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息
注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行
akka,一款高性能,高容错,分布式的并行框架
特点:
1.并行与并发
2.异步非阻塞
3.容错
4.持久化
5.轻量级,每个actor占用内存比较小 300byte,1G 内存容纳300w个actor
场景:
分布式计算中的分布式通讯,解决的是高并发场景的问题,(消息体比较小),吞吐量不是很高,零拷贝()
密集型计算场景
总结:对高并发和密集型的计算场景,akka都可以使用
驱动:
pom里面需要引入akka的配置:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.9</version>
</dependency>
1):创建模块名称和工程包名
各个模块配置工程以来pom.xml文件
本节的目的:
把驱动程序编写好,并启动起来;
但是让驱动能够顺利启动,我们需要完成如下操作:
然后写任务状态的基类:
在提供对外的解释器接口
问题:对spark-shell绑定变量的作用是什么?
spark-shell绑定变量,第一个要绑定的就是sparkSession
以及:sparkContext下面的内容、隐士转换、sparkSQL、udf函数等内容
但是现在我们还没有sparkSession , 所以我们先实现一个sparkSession的构建
【com.kkb.engine下面构建EngineSession】
因为执行了enableHiveSupport,所以需要加入服务器的hive-site.xml文件
然后启动metastore服务
因为我们参照的是livy代码,本身就是一个rest 服务,用来做spark和web端的一种交互;
所以livy很好的帮我们解决了,spark-shell从初始化的绑定、到绑定变量错误的处理;
都已经帮我们解决好了
这样,我们构建好了spark的解释器,实际就是为了构建一个属于自己的spark-shell;
好在其他框架实现了,我们只需要把其他框架的源码拿来即可
所以我们在回到 驱动程序App类:
【App类】
这样,我们继续按照流程往下走 , 那么此时就要构建zk的客户端了
因为,我们后面会把 引擎注册到zk里面,并且依赖于zk进行服务的自动发现
【在common工程下,创建zk的工具类】
我们采用第三方的zk工具,尽量帮我们封装代码
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
然后创建ZKUtils工具类
1):
2):
3):
4):
把之前写的加载配置文件的工具类,放入common工程下
然后把common工程打入engine工程里面
就是在engine的pom文件里面添加common工程包
<dependency>
<groupId>com.kkb.platform</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
5):
那么,我们如果注册引擎,其实就是把PlatEngine这个类注册到zk里面去;
那么问题来了:
最开始运行的时候 ,zk里面肯定没有引擎信息(所谓引擎信息 , 我们认为其实就是id --> ip:port);
1 --> node1:3001
2--->node2:3002
但是如果不是最初启动时候,那么引擎肯定是存在的,那么我们就要确保,再次注册引擎的时候 ,
id--->ip:port
这里面的id和端口,绝对不能重复
所以我们需要让id和端口绝对不一致,那么最好的方式就是用zk来维护状态 , 如果这个状态存在,那么就将id和端口递增
注册引擎,大概分成4个步骤:
1、准备好目录信息
2、创建引擎的父目录
3、创建引擎的临时节点(数据写入节点信息)
4、把前3步合并
那么接下来,我们来实现这个功能:
如果上面这个路径在zk里面是不存在的,那么直接创建一个永久节点路径;作为引擎的存储路径
这样,有了父目录之后,我们就可以把数据信息写入;
所以健壮的写法是,必须考虑,如果父目录不存在的情况:
‘
获取完子节点后,把里面的数据拿出来,就是IP:port
【将1.2代码利用到1.3】
我们第一步获取子节点名称(id:1、2、3)
然后根据子节点的名称,来通过第第三步,来获取具体的引擎信息
所以,我们这一步做个合并处理
这样我们封装好了,如何获取zookeeper里面的引擎信息,那么按照步骤,我们接下来要顺序注册引擎
比如:
最开始
id = 1 , port = 3000
顺序增长
id = 2 , port = 3001
那么顺序增长的前提是,当前zookeeper里面已经存在这个引擎了,所以才会顺序增长
最后在把我们刚刚写好的,注册引擎拿到最下面,顺序增长完port和id以后,开始注册
这样,我们在回到驱动类,注册Akka信息
然后我们在测试下,上面写的代码是不是达到预期的效果
然后去zookeeper里面查看
经过测试,我们上面写的代码没有任何问题;
那么接下来,我们要获取当前akka的地址(也就是引擎的地址)
我们在构建spark-shell功能时候,把sparkSession维护到了EngineSession里面了
那么干脆 , 我们就把这个类作为任务状态的统计类;
那么我们把引擎信息,也维护进来
然后我们回到驱动程序,实例化一个EngineSession
那么接下来,我们就要启动Akka了 ;
这样,我们有了内部参数的维护,那么我们在返回驱动程序 , 把并行度设置上
我们之所以设置并行度,最终目标就是要并行的启动Akka任务模型
所以接下来,我们要创建 一个Akka的任务模型
现在我们有了akka模型 , 而且我们也拿到了并行度
接下来启动所有模型
这样我们就正常的启动了;
但是有这样一种可能,就是很可能主线程提前结束了,子线程还在继续运行;
就是可能会出现僵尸进程!
所以我们需要让主线程等待子线程结束后,在执行关闭回收操作
所以我们在EngineSession里面,添加一些功能,让主线程等待子线程
然后让JobActor继承这个日志功能
因为我们来初始化zk的客户端
首先我们在ZKUtils里面封装个方法,专门来对接jobActor的注册
然后在jobActor的初始化里面,进行注册jobActor的引擎
然后启动测试,查看zk里面是否注册进去
测试结果
首先我们把后续需要的一些变量提前初始化好:
1、spark的解释器
2、sparkSession
上面这俩变量,会在actor的生命周期结束时候进行回收
首先我们要开始编写一个actor的钩子,目的很简单,万一出现了错误,我们可以 catch住这个错误,然后把错误回显给客户端(web端)
之所以这样做:
1、可以对任务作出描述,方便任务的web端定位
2、有了任务的描述,那么后续是可以取消已经提交的任务
接下来,我们要匹配具体的操作,就是看传递过来的指令是代码还是SQL;
然后根据指令的不同,选择不同的操作方式
定义一个变量assemble_instruction,来组装命令
接受命令:
上面处理后,会返回一个response的结果 , 我们需要把结果回显给客户端
因此,我们需要接收这个response,然后解析他 , 然后收集他
去EngineSession添加一个记录任务的map集合 , 主要是为了保存批处理的作业信息
SQL:select name from person where age > 18
sql执行流程(sql生命周期):
不管解析被划分为几步,在Spark 执行环境中,都要转化成RDD的调用代码,才能被spark core所执行
那么这里面有个关键的点,就是查询的SQL , 怎么转化成Unresolved LogicalPlan;
Unresolved LogicalPlan 这个阶段接收的是抽象的语法树,所以我们需要知道的就是,这个SQL语句是怎么转成抽象语法树的;
答案就是:antlr4(spark是在2.0以后,开始使用antlr4解析的sql语法)
spark通过antlr4去解析SQL语句,形成抽象语法树AST;
也就是说,详细的流程是这样的:
grammar
名称和文件名要一致- Parser 规则(即 non-terminal)以小写字母开始
- Lexer 规则(即 terminal)以大写字母开始
- 用
'string'
单引号引出字符串
首先你要有配置antlr的环境变量
export CLASSPATH=".:/usr/local/lib/antlr-4.5.3-complete.jar:$CLASSPATH"
alias antlr4='java -Xmx500M -cp "/usr/local/lib/antlr-4.5.3-complete.jar:$CLASSPATH" org.antlr.v4.Tool'
alias grun='java org.antlr.v4.runtime.misc.TestRig'
然后我们按照语法格式来写一个hello word的代码;
【随便打开 一个maven工程,然后导包】
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.5.3</version>
</dependency>
通过上面,我们大概了解,antlr其实主要就是写正则
接下来,通过命令生成java代码
antlr4 learnAntlr.g4
假如我想实现加载文件操作,形成一个表,然后在基于这个表做查询操作;
以上的动作是:数据源--->load(加载)——>select这个表
比如:
那么我们在Engine.g4文件中就是:
他们的对应关系就是:
比如我们从kafka记载了数据,形成tb表 , 然后将数据写入mysql
简单说就是: kafka -->load -->save --->mysql
load kafka.veche4
where `spark.job.mode`="stream"
and `kafka.bootstrap.servers`="39.100.249.234:9092"
as tb;
save tb as jdbc.`t3`
where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://cdh2:3306/test?useUnicode=true&characterEncoding=utf8"
and user="root"
and password="root"
and `failOnDataLoss`="false"
and `outputMode`="Append"
and `streamName`="Stream"
and `duration`="2"
and `checkpointLocation`="/Users/angel/Desktop/data/S1_2020080120"
and coalesce="1";
那么对应我们antlr文件就是:
load操作,我们刚刚讲过了,接下来在说下,怎么save的
【看图做对比】
命令:
antlr4 Engine.g4
就像我们自己写的例子一样,此时我们要重写一下EngineBaseListener,对这个类的功能做增量,来满足我们的需求
重写这个方法的依据:
antlr4在离开sql的时候,会触发exitSql
所以我们要重写这块儿,来触发我们的业务逻辑
所以我们写个方法,专门来执行词法 和语法的解析
接下来,我们来完善load操作:
编写trait来规范化操作,方便后期维护,也是一种面向接口的编程思想
在这个里面,我们要解析语法:
LOAD format POINT? path WHERE? expression? booleanExpression* AS tablename
spark.job.mode = stream 就代表是流处理 , 否则就是离线处理
测试LoadAdaptor是否对当前节点树做了解析,那么直接打印一下这个option信息
在测试类,编写测试命令
离线:
val instruction = "load jdbc.testTableName " +
"where driver=\"com.mysql.jdbc.Driver\" " +
" and url=\"jdbc:mysql://cdh1:3306/hive?characterEncoding=utf8\" " +
" and user=\"root\" " +
" and password=\"root\" " +
"as tb; " +
"SELECT * FROM tb LIMIT 100;"
流:
val instruction = "load kafka.`veche4`" +
"where `spark.job.mode`=\"stream\" " +
"and `kafka.bootstrap.servers`=\"39.100.249.234:9092\" " +
"as tb; " +
"save tb as hbase.`TES6`" +
"where `hbase.zookeeper.quorum`=\"angel_rsong1:2181\" " +
"and `hbase.table.rowkey.field`=\"id_\" " +
"and `hbase.table.family`=\"MM\" " +
"and `failOnDataLoss`=\"false\" " +
"and `hbase.table.numReg`=\"10\" " +
"and `hbase.check_table`=\"true\" " +
"and `outputMode`=\"Append\" " +
"and `streamName`=\"Stream\" " +
"and `duration`=\"10\" " +
"and `sendDingDingOnTerminated`=\"true\" " +
"and `checkpointLocation`=\"/Users/angel/Desktop/data/ess312ldd\";"
这一步,我们要根据刚刚load操作出来的结果,然后去匹配操作。然后去查询出数据
也就是;
load —>匹配到离线 --->select结果
因为我们对接数据源是通过spark sql去处理的,所以我们先在EngineSQLExecListener这个监听者实现类里面添加sparkSession
这样我们的BatchJobLoadAdaptor(匹配离线数据源)就可以使用sparkSQL,去查询数据源的数据了
最后回到JobActor , 给EngineSQLExecListener添加sparkSession的参数
然后在LoadAdaptor里面添加上BatchJobLoadAdaptor匹配数据源的操作
在BatchJobLoadAdaptor里面打印一下结果
最后在测试类执行如下代码
结果:
出现如上结果,此时说明流程已经走通
sparkSession.sql(查询sql)的操作
所以 我们创建一个selectAdaptor类,来匹配select操作
加载这个结果的原因说明:
我们后面要将处理的结果返回给客户端 , 并且还要将处理的结果保存起来;
比如说,后续公司需要开发一些功能,将处理后的数据结果,可以下载下来,保存成csv文件;
那么此时对处理结果进行保存,显然是必要的
我们在JobActor里面封装一下解析SQL的操作,让我们解析并执行完SQL以后,可以拿到结果
我们返回的处理结果是dataFrame , 所以我们需要将dataFrame转成json
这样在将结果返回给客户端(因为客户端没法直接解析dataFrame)
在adaptor工程下,写个方法,专门来将dataFrame转成json
方法:
使用SparkSQL内置函数接口开发StructType/Row转Json函数,我们以前肯定使用过:
【列子】
def main(args: Array[String]): Unit = {
val spark = getSparkSession(getSparkConf)
val df = spark.createDataFrame(Seq(
("ming", 20, 15552211521L),
("hong", 19, 13287994007L),
("zhi", 21, 15552211523L)
)) toDF("name", "age", "phone")
val data: Dataset[String] = df.toJSON
println(data.collect())
}
跟踪上述,发现最终都会调用Spark源码中的org.apache.spark.sql.execution.datasources.json.JacksonGenerator类,使用Jackson,根据传入的StructType、JsonGenerator和InternalRow,生成Json字符串。
JacksonGenerator类
【我们在adaptor工程下使用】
sql的结果
hdfs上的数据
在存储数据时,我们需要考虑两个场景:
1:考虑场景问题:离线、流式
2:考虑数据格式问题:json、text、hbase、mysql....
('save'|'SAVE') (overwrite | append | errorIfExists | ignore | update)* tableName 'as' format '.' path ('where' | 'WHERE')? expression? booleanExpression* ('partitionBy' col)? ('coalesce' numPartition)?
上面的语法解析,实际就是解析类似如下的SQL
接下来要提供一些局部变量,我们解析出来值后,赋值给这些局部变量
接下来遍历节点,按照我们自己写的antlr语法规则,开始遍历语法规则