写在前面:我也是新手。本文仅供个人使用和总结。可能存在一些片面的理解或错误。如果各位高手看到的话可以留言指正。我必须努力学习。同时,我也只是想给自己做个笔记,方便以后整理,所以才贴出来。如果能帮助刚开始使用Seatunnel 的朋友避免一些陷阱那就最好了。
只列出我目前需要的。基本上所有的数据源都是兼容的。列出只是为了阐明需要下载的连接器jar 包。
来源:
配置单元、jdbc、maxcomputer、localfile、mysqlcdc、ossfile、rocketMQ、sftpfile、套接字
sink:console, datahub, mysql, sftpfile, hive, jdbc, localfile, maxcomputer, ossfile, socket, RocketMQ
# 固定环境变量,我使用本地模式
环境{
执行并行度=2
job.mode='流式传输'
检查点间隔=2000
} 插座
来源{
插座{
主机='1**.***.**.**1'
端口=9999
}
}
#最大计算
来源{
最大计算{
accessId='************************'
访问密钥='****************************'
端点='http://service.cn.maxcompute.aliyun.com/api'
项目='professional_test_dev'
表名='test_person__info'
partition_spec='pt=20230628'
split_row=10000
字段=[姓名、年龄、地址]
}
}
#oss文件
来源{
奥斯文件{
路径='/seatunnel/sink/age=20/'
Bucket='oss://you_bucket_name'
access_key='************************'
access_secret='****************************'
端点='oss-cn-shanghai.aliyuncs.com'
文件格式类型='文本'
#字段映射
read_columns=['姓名','年龄','地址','开始日期']
#是否根据路径解析分区,请注意,如果启用,sink端需要有一个额外的字段映射到分区字段。
parse_partition_from_path=true
# 跳过前几行(去掉行头功能)
跳过标题行数=1
#什么格式的字符串需要解析成日期
date_format='yyyy-MM-dd'
#字段分隔符
分隔符='\t'
# 读取所有列
模式{
字段{
名称=字符串
年龄=整数
地址=字符串
开始日期=日期
}
}
}
} 插座
下沉{
插座{
主机='1**.***.**.**1'
端口=8888
}
}
#OssFile
下沉{
奥斯文件{
路径='/座位隧道/水槽'
Bucket='oss://you_bucket_name'
access_key='************************'
access_secret='****************************'
端点='oss-cn-shanghai.aliyuncs.com'
文件格式类型='文本'
字段分隔符='\t'
行分隔符='\n'
sink_columns=['姓名','年龄']
}
}
#数据中心
下沉{
数据中心{
source_table_name='test_split_table'
端点='https://dh-cn-shanghai.aliyuncs.com'
accessId='='************************''
accessKey='********************************'
项目='you_project_name'
主题='test_seatunnel_socket'
超时=3000
重试次数=3
}
}
# mysql
下沉{
jdbc {
# url中的参数rewriteBatchedStatements=true是批量执行,否则mysql会一笔一笔的执行,导致性能低下。
url='jdbc:mysql://host/test?serverTimezone=GMT%2b8useSSL=falserewriteBatchedStatements=true'
驱动程序='com.mysql.cj.jdbc.Driver'
用户='root'
密码='123456'
Primary_keys=['姓名', '年龄']
# upsert功能,性能较低,仅在数据库没有upsert功能时启用,mysql启用报错Duplicateentry '*' for key 'PRIMARY'
# support_upsert_by_query_primary_key_exist=true
# 不需要写查询语句。根据generate_sink_sql=true参数,将根据以下数据库名和表名自动生成SQL语句。运行时会报错。未知
数据库='测试'
表='clmp_user'
# 如何在mysql中编写upsert
查询='''
插入clmp_user(name,age,address,date,pt) 值
(??)
关于重复密钥更新
名称=值(名称),
年龄=值(年龄),
地址=值(地址),
日期=值(日期),
pt=值(pt);
'''
连接超时
连接检查超时秒=100
# 重试失败提交的次数。如果你想只设置一次,则需要将其设置为0,否则可能会导致重复。
最大重试次数=0
# 对于批量写入,当缓冲记录数达到batch_size或者时间达到batch_interval_ms时,数据就会被flush到数据库
批量大小=10000
批次间隔毫秒=60000
# 精确一次,如果启用,必须定义xa_data_source_class_name,并且mysql版本必须大于等于8.0.29
# is_exactly_once=true
#xa_data_source_class_name='com.mysql.cj.jdbc.MysqlXADataSource'
#交易提交失败重试次数
最大提交尝试次数=10
# 交易开启后超时,默认值为-1(永不超时)。注意设置超时可能会影响once语义
transaction_timeout_sec=-1
# 启用自动交易提交,默认为true
自动提交=true
}
}
https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions
用法: Seatunnel.sh [选项]
命令
影响
--异步
异步运行作业。当作业提交后,客户端将退出(默认值:false)
-可以,--取消作业
通过JobId 取消作业
- 查看
是否检查配置(默认:false)
-cj,--结束工作
关闭客户端任务也会被关闭(默认:true)
-cn,--集群
集群名称
-c,--config
配置文件
--解密
解密配置文件。当同时指定--decrypt和--encrypt时,仅--encrypt生效(默认值:false)
-m、--master、-e、--部署模式
SeaTunnel作业提交master,支持[本地,集群](默认:集群)
- 加密
解密配置文件。当同时指定--decrypt和--encrypt时,仅--encrypt生效(默认值:false)
-h,--帮助
显示使用信息
-j, --工作ID
通过JobId获取作业状态
-l,--列表
列出作业状态(默认:false)
--指标
通过JobId获取作业指标
-n,--名称
SeaTunnel 作业名称(默认: SeaTunnel)
-r, --恢复
通过jobId恢复保存点
-s, --保存点
通过jobId 保存点作业
-i, --变量
变量替换,例如-i City=beijing,或-i date=20190318(默认值:[])
如果想要快速案例v2.batch.config.template 成功运行,需要将下面链接的两个jar 包添加到Seatunnel 安装路径/lib 目录下(版本取决于您的版本) hadoop,我的没有使用hadoop,所以直接和seatunnel版本一致)下载地址
Seatunnel-hadoop3-3.1.4-uber-2.3.2.jar
Seatunnel-hadoop3-3.1.4-uber-2.3.2-可选.jar
写入文件时,所有字符串指示符都需要使用双引号。单引号只能用于引用变量。如果文件中某个指标的内容太长,需要分多行写入,则需要用三引号括起来。内容和变量不能用三引号括起来。如果必须引用,可以使用多个三引号拼接语句,中间穿插变量引用。启动任务时,使用-i为变量赋值(-i date=20230627);注意:引用变量在本地模式下不可用,仅flink 或Spark 引擎可用。
#示例:多行写入并引用变量
变量='''
你的字符串1
'''${you_var}'''你的字符串2'''
# 示例:在语句中应用变量
转换{
sql {
query='select * from user_view where city=''${city}'' and dt=''${date}'''
}
}
# 示例:变量启动赋值
/app/clmp/seatunnel-2.3.2/bin/seatunnel.sh --config /root/test_seatunnel/--driver-memory 4g -e local -i date=20230627
# 主要有两个错误信息。
1. java.lang.RuntimeException: java.lang.ClassNotFoundException: 类org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem 未找到
2. org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 聚合提交错误。
# 一开始感觉oss连接jar包不存在,后来排查发现是存在的。然后我在github上搜索了和我类似的问题。最终的解决办法是添加hadoop-aliyun-2.7.2.jar放到seatunnel-2.3.2/lib下,果然解决了。 github上的情况是类加载器有问题。
# 注意:如果是私有云,除了放置jar包外,还需要关闭cname,否则会出现新的错误SignatureDoesNotMatch。如何关闭cname可以咨询阿里云驻地或者提交工单解决。这里的解决方案是Ping不添加http://的oss地址。返回的结果中有一个IP。使用这个IP代替原来的oss地址
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription: [配置项验证失败] - PluginName: jdbc, PluginType: source, Message: com.mysql .cj.jdbc.exceptions.CommunicationsException: 通信链路故障
最后一次成功发送到服务器的数据包是在0 毫秒前。驱动程序尚未收到来自服务器的任何数据包。
这件事情是由很多原因导致的。我目前遇到两种情况:
# 1、使用jdbc连接数据库时,必须自己提供数据库驱动,并复制到$SEATNUNNEL_HOME/plugins/jdbc/lib/目录下才能使用。例如,如果您使用MySQL,则应下载并复制MySQL-connector-java-xxx.jar到$SEATNUNNEL_HOME/plugins/jdbc/lib/,然后正常放置。
# 2. SSL 连接原因; MySQL在高版本中需要指明是否进行SSL连接。默认情况下它是启用的。所以只需要在配置文件中的url后面添加useSSL=false即可,如:url='jdbc:mysql://local_host/test?serverTimezone=GMT%2b8useSSL=false'Caused by: org.apache.seatunnel.api.common。 PreparationFailException: ErrorCode:[API-01]、ErrorDescription:[配置项验证失败] - PluginName: jdbc、PluginType: 源、Message: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
# 首先请注意,官网提供的驱动类名是com.mysql.cj.jdbc.Driver,但这应该是8以上的驱动包。我第一次使用mysql-connector-java-5.1.47.jar,它的驱动类名不是这个,而是com.mysql.jdbc.Driver。如果直接使用官方的会报上面的错误。
Seatunnel本身没有impala作为source或sink,但是impala保留了hive和jdbc的连接方式,所以我准备尝试使用这两种方式进行连接。
2.5.1 使用jdbc连接
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
ErrorCode: [JDBC-06],ErrorDescription: [找不到合适的方言工厂] - 找不到任何可以处理实现“org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory”的url '' 的jdbc 方言工厂在类路径中
可用工厂有:个
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2DialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbDialectFactory
org.apache.seatunnel.connectors.seatunnel.jabc.internal.dialect.gbasea.GbaseBaDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum.GreenplunDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.My5qDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix.PhoenixDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake.SnowflakeDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite.SqliteDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestorel.TablestoreDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata.TeradataDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica.VerticaDialectFactory
# 根据错误报告和源代码,当前jdbc连接不包含impala数据源类型,因此在调用jdbc等连接器时,无法解析URL,无法创建类加载器。错误内容后面是当前可以使用jdbc 2.5.2 Connect using hive连接的所有数据源列表
2023-07-12 16:14:47,905 警告org.apache.hadoop.util.NativeCodeLoader - 无法为您的平台加载本机hadoop 库.在适用的情况下使用内置java 类
2023-07-1216:14:47,983 警告hive 元存储- setlugi() 不成功,可能导致: 新客户端与旧服务器通信。没有它继续。 org.apache.thrift。 TApplicationException: 方法名称无效:'set ugit'
在org.apache.thrift.TApplicationException.read (TApplicationException.java: 111) ~ [hive-exec-2.3.9.jar: 2.3.9]
在org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79) ~ [hive-exec-2.3.9.jar: 2.3.9]
2023-07-1216:14:51,089 错误org.apache.seatunnel.core.starter.SeaT
unne1 - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException:SeaTunneljobexecutedfailed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute (ClientExecuteCommand. java: 188) at org. apache.seatunnel.core.starter.SeaTunnel.run (SeaTunnel.java:40) at org.apache. seatunnel.core.starter.seatunnel.SeaTunnelClient.main (SeaTunnelClient.java: 34) Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_table' at org.apache.thrift.TApplicationException.read (TApplicationException.java: 111) at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79) # 第一次使用报错信息是这样,提示新客户端访问老版服务,且后面还有一个报错为get_table这个方法不存在,更能确定时版本的问题。这里主要是因为用seatunnelengine,需要把seatunnel-hadoop3-3.1.4-uber.jar和hive-exec-2.3.9.jar放在$SEATUNNEL_HOME/lib/目录下。这个hadoop和hive的版本需要和impala中的相应版本对应。2.5.3 解决方式 按照源码该路径下(seatunnel-dev\seatunnel-dev\seatunnel-connectors-v2\connector-jdbc\src\main\java\org\apache\seatunnel\ connectors\seatunnel\jdbc\internal\dialect\mysql)的代码重写4个方法,编写一个hive的jdbc连接器。路径定义为hive-jdbc-connector \src\main\java\org\apache\seatunnel\connectors\seatunnel\jdbc\hive\internal\dialect\hive。之后单独打包,放入seatunnel_home /pligins/jdbc/lib目录下。之后再运行代码,可能会出现jar包冲突的问题,根据报错解决依赖的冲突后重新打包即可 # 冲突报错举例 TTransport 即为冲突的内容: Exception in thread "main" java.1ang.LinkageError:loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/thrift/transport/TTransport"