2.1、kettle 如何添加所需要的驱动jar包
报错信息
报错原因
缺失mysql的驱动jar包
解决方案
1、下载jdbc驱动,放到kettle的lib目录下
2、正确填写数据库信息
测试现在是否正常
2.2、kettle学习之子映射组件
映射组件就跟java中的函数方法一样,类似一个子流程。
根据数据库表中的id查询出想要的字段,并把字段存到excel表中
一、表输入
二、子映射
映射输入规范,类似java方法中的形参
name vsxcd是方法返回的参数
三、excel输出
运行结果成功展示
2.3、kettle 读取记事本文件给java组件处理
kettle9.4
用到两个组件
文本文件输入
文件内容如下
文本文件输入---文件
文本文件输入---内容
注意事项:分隔符这里,我一直没注意,导致不管怎么读数据都读不到;可以用换行符,可以用其他的,视情况而定;
到这里,文本文件输入组件的部分结束了;
java代码组件
跟普通的java类不一样,没有类的申明,只有方法在里面!!
getRow()是kettle的自己的方法,是获取行的意思;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.trans.step.StepMetaInterface; import org.pentaho.di.core.row.RowMetaInterface; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { // 获取输入行数据 Object[] rowData = getRow(); // 如果输入行为空,则返回 false if (rowData == null) { setOutputDone(); return false; } // 获取文本文件内容(假设内容在第一个字段中) String fileContent = rowData[0].toString(); logBasic("数据是: " +fileContent.toUpperCase()); // 处理完成,返回 true return true; }运行,查看结果
把小写转换为大写了
2.4、kettle学习之字段选择组件
这个组件就是对字段进行改名或者删除等操作的
文本文件输入
字段选择
字段选择2
运行结果
2.5、kettle之 Concat fields将字符串拼接起来
用到两个组件,一个是文本文件输入,一个是 Concat fields
成功截图
文本文件输入
根据;将文本内容分成两部分,第一部分是a,第二部分是b
Concat fields
运行即可
这里的Fields是上一个步骤里面的输出的字段名称
TargetField Name是输出的字段名称;
2.6、kettle学习之mysql数据库结果分组聚合查询
1、查Mysql表
2、对表结果进行排序分组
先看表结构
再看最终结果
一、转换图
二、表输入
数据库连接17是这样的配置
三、排序合并
四、分组
运行起来就可以了
3.2、kettle学习之表的输入输出
需求
把表A里的数据传送到表B中,在此之前,清空表B内的数据
表输入
执行SQL脚本
表输出
4.1、kettle组件之java代码,快速上手必看
我们先了解不同于java代码的kettle的一些方法
1、getRow();
获取每一行数据,循环读数据;返回的是Object[]数组
2、get(Fields.in,"字段名");
获取具体的某个字段的名称
3、get(Fields.in,"字段名").getString(r);
获取这一行数据中,对应的字段名并且是值是字符串类型的数据
4、setOutputDone()
结束输出,不往后面的步骤传递数据
5、putRow(outputRowMeta, outputRow);
把数据的结构以及值传给下个数据
6、logBasic("数据是: " +b);
打印基础日志
功能、获取记事本里面的数据,把这个数据进行简单的处理,然后输出;
难点:1、我不想要把处理后的值覆盖掉原来的值,而是创建一个新的字段,给这个新的字段赋值
2、其次我不想要之前的字段,很冗余
记事本配置
java代码如下
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //1、获取输入行数据 Object[] r = getRow(); //2、若为null,则停止输出 if (r == null) { setOutputDone(); return false; } //3、获取数据 Object a=r[0]; //4、打印日志 logBasic("获取到的数据是 "+a); //5、创建一个输出行,是基于输入行的数据的,形参个数也一致 r = createOutputRow(r, data.outputRowMeta.size()); //6、给输出行的字段名为yy的赋值 get(Fields.Out, "yy").setValue(r, a+"--hello--"); //7、输出行 putRow(data.outputRowMeta, r); return true; }结果如下
如果不勾选清空结果字段
那么结果就会包含输入行信息
到这里,我们做一个小练习,对redis进行读取与设置吧
1、把redis的jar包放入kettle的lib下
2、重启kettle
3、编写代码
redis set代码
import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //1、读取输入行信息 Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } //2、获取信息 String kinfo=get(Fields.In,"kinfo").getString(r); String kname=get(Fields.In,"kname").getString(r); //3、把redis的key存放到key的字段里 get(Fields.Out,"key").setValue(r,kinfo); //4、连接redis Jedis jedis = new Jedis("10.20.1.17", 6379); // 设置键值对 jedis.set(kinfo,kname); // 关闭连接 jedis.close(); //5、创建输出行 r = createOutputRow(r, data.outputRowMeta.size()); //6、输出行 putRow(data.outputRowMeta, r); return true; }redis get 代码
import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } String key=get(Fields.In,"key").getString(r); Jedis jedis = new Jedis("10.20.1.17", 6379); // 获取键值对 String name = jedis.get(key); // 关闭连接 jedis.close(); logBasic("获取redis信息 "+name); putRow(data.outputRowMeta, r); return true; }4、测试运行
4.2、kettle之java组件 对redis集群进行增删改查
1、kettle9.4
2、jdk8
所需的jar包
jedis-2.9.0.jar
commons-pool2-2.4.2.jar(kettle自带commons-pool-1.5.7.jar)
如果不引入会报错
ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : org.codehaus.commons.compiler.CompileException: org/apache/commons/pool2/impl/GenericObjectPoolConfig完整的测试demo
import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Set; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //1、连接redis JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(5); poolConfig.setMaxIdle(1); poolConfig.setMaxWaitMillis(1000); Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>(); nodes.add(new HostAndPort("11.11.1.40", 7001)); nodes.add(new HostAndPort("11.11.1.41", 7001)); nodes.add(new HostAndPort("11.11.1.42", 7001)); JedisCluster jedis = new JedisCluster(nodes, poolConfig); //2、获取key值 //2、获取key值 String name = get(Fields.In, "name").getString(r); boolean exists = jedis.exists(name); logBasic("redis的key是 "+name +" 是否存在 "+exists); //2.1、获取上一次的机米长度 String lastVsxzl = "0"; if (!exists) { jedis.hset(name, "vsxzl", "0"); } else { if (!jedis.hexists(name, "vsxzl")) { jedis.hset(name, "vsxzl", "0"); } lastVsxzl = jedis.hget(name, "vsxzl"); } //2.2、获取状态值 String status="0"; if (!jedis.hexists(name, "status")) { jedis.hset(name, "status", "0"); }else{ status=jedis.hget(name,"status"); } //2.3、获取vsxg String vsxg="0"; if (!jedis.hexists(name, "vsxg")) { jedis.hset(name, "vsxg", "0"); }else{ vsxg=jedis.hget(name,"vsxg"); } logBasic("上次长 "+lastVsxzl+" 上次状态 "+status+" 上次vsxg "+vsxg); //3、输出key值 get(Fields.Out, "lastVsxzl").setValue(r, lastVsxzl); get(Fields.Out, "lastStatus").setValue(r, status); get(Fields.Out, "lastVsxg").setValue(r, vsxg); // 关闭连接 try { jedis.close(); } catch (IOException e) { e.printStackTrace(); } putRow(data.outputRowMeta, r); return true; }<?xml version="1.0" encoding="UTF-8"?> <transformation> <info> <name>redisCuster</name> <description/> <extended_description/> <trans_version/> <trans_type>Normal</trans_type> <trans_status>0</trans_status> <directory>/</directory> <parameters> </parameters> <log> <trans-log-table> <connection/> <schema/> <table/> <size_limit_lines/> <interval/> <timeout_days/> <field> <id>ID_BATCH</id> <enabled>Y</enabled> <name>ID_BATCH</name> </field> <field> <id>CHANNEL_ID</id> <enabled>Y</enabled> <name>CHANNEL_ID</name> </field> <field> <id>TRANSNAME</id> <enabled>Y</enabled> <name>TRANSNAME</name> </field> <field> <id>STATUS</id> <enabled>Y</enabled> <name>STATUS</name> </field> <field> <id>LINES_READ</id> <enabled>Y</enabled> <name>LINES_READ</name> <subject/> </field> <field> <id>LINES_WRITTEN</id> <enabled>Y</enabled> <name>LINES_WRITTEN</name> <subject/> </field> <field> <id>LINES_UPDATED</id> <enabled>Y</enabled> <name>LINES_UPDATED</name> <subject/> </field> <field> <id>LINES_INPUT</id> <enabled>Y</enabled> <name>LINES_INPUT</name> <subject/> </field> <field> <id>LINES_OUTPUT</id> <enabled>Y</enabled> <name>LINES_OUTPUT</name> <subject/> </field> <field> <id>LINES_REJECTED</id> <enabled>Y</enabled> <name>LINES_REJECTED</name> <subject/> </field> <field> <id>ERRORS</id> <enabled>Y</enabled> <name>ERRORS</name> </field> <field> <id>STARTDATE</id> <enabled>Y</enabled> <name>STARTDATE</name> </field> <field> <id>ENDDATE</id> <enabled>Y</enabled> <name>ENDDATE</name> </field> <field> <id>LOGDATE</id> <enabled>Y</enabled> <name>LOGDATE</name> </field> <field> <id>DEPDATE</id> <enabled>Y</enabled> <name>DEPDATE</name> </field> <field> <id>REPLAYDATE</id> <enabled>Y</enabled> <name>REPLAYDATE</name> </field> <field> <id>LOG_FIELD</id> <enabled>Y</enabled> <name>LOG_FIELD</name> </field> <field> <id>EXECUTING_SERVER</id> <enabled>N</enabled> <name>EXECUTING_SERVER</name> </field> <field> <id>EXECUTING_USER</id> <enabled>N</enabled> <name>EXECUTING_USER</name> </field> <field> <id>CLIENT</id> <enabled>N</enabled> <name>CLIENT</name> </field> </trans-log-table> <perf-log-table> <connection/> <schema/> <table/> <interval/> <timeout_days/> <field> <id>ID_BATCH</id> <enabled>Y</enabled> <name>ID_BATCH</name> </field> <field> <id>SEQ_NR</id> <enabled>Y</enabled> <name>SEQ_NR</name> </field> <field> <id>LOGDATE</id> <enabled>Y</enabled> <name>LOGDATE</name> </field> <field> <id>TRANSNAME</id> <enabled>Y</enabled> <name>TRANSNAME</name> </field> <field> <id>STEPNAME</id> <enabled>Y</enabled> <name>STEPNAME</name> </field> <field> <id>STEP_COPY</id> <enabled>Y</enabled> <name>STEP_COPY</name> </field> <field> <id>LINES_READ</id> <enabled>Y</enabled> <name>LINES_READ</name> </field> <field> <id>LINES_WRITTEN</id> <enabled>Y</enabled> <name>LINES_WRITTEN</name> </field> <field> <id>LINES_UPDATED</id> <enabled>Y</enabled> <name>LINES_UPDATED</name> </field> <field> <id>LINES_INPUT</id> <enabled>Y</enabled> <name>LINES_INPUT</name> </field> <field> <id>LINES_OUTPUT</id> <enabled>Y</enabled> <name>LINES_OUTPUT</name> </field> <field> <id>LINES_REJECTED</id> <enabled>Y</enabled> <name>LINES_REJECTED</name> </field> <field> <id>ERRORS</id> <enabled>Y</enabled> <name>ERRORS</name> </field> <field> <id>INPUT_BUFFER_ROWS</id> <enabled>Y</enabled> <name>INPUT_BUFFER_ROWS</name> </field> <field> <id>OUTPUT_BUFFER_ROWS</id> <enabled>Y</enabled> <name>OUTPUT_BUFFER_ROWS</name> </field> </perf-log-table> <channel-log-table> <connection/> <schema/> <table/> <timeout_days/> <field> <id>ID_BATCH</id> <enabled>Y</enabled> <name>ID_BATCH</name> </field> <field> <id>CHANNEL_ID</id> <enabled>Y</enabled> <name>CHANNEL_ID</name> </field> <field> <id>LOG_DATE</id> <enabled>Y</enabled> <name>LOG_DATE</name> </field> <field> <id>LOGGING_OBJECT_TYPE</id> <enabled>Y</enabled> <name>LOGGING_OBJECT_TYPE</name> </field> <field> <id>OBJECT_NAME</id> <enabled>Y</enabled> <name>OBJECT_NAME</name> </field> <field> <id>OBJECT_COPY</id> <enabled>Y</enabled> <name>OBJECT_COPY</name> </field> <field> <id>REPOSITORY_DIRECTORY</id> <enabled>Y</enabled> <name>REPOSITORY_DIRECTORY</name> </field> <field> <id>FILENAME</id> <enabled>Y</enabled> <name>FILENAME</name> </field> <field> <id>OBJECT_ID</id> <enabled>Y</enabled> <name>OBJECT_ID</name> </field> <field> <id>OBJECT_REVISION</id> <enabled>Y</enabled> <name>OBJECT_REVISION</name> </field> <field> <id>PARENT_CHANNEL_ID</id> <enabled>Y</enabled> <name>PARENT_CHANNEL_ID</name> </field> <field> <id>ROOT_CHANNEL_ID</id> <enabled>Y</enabled> <name>ROOT_CHANNEL_ID</name> </field> </channel-log-table> <step-log-table> <connection/> <schema/> <table/> <timeout_days/> <field> <id>ID_BATCH</id> <enabled>Y</enabled> <name>ID_BATCH</name> </field> <field> <id>CHANNEL_ID</id> <enabled>Y</enabled> <name>CHANNEL_ID</name> </field> <field> <id>LOG_DATE</id> <enabled>Y</enabled> <name>LOG_DATE</name> </field> <field> <id>TRANSNAME</id> <enabled>Y</enabled> <name>TRANSNAME</name> </field> <field> <id>STEPNAME</id> <enabled>Y</enabled> <name>STEPNAME</name> </field> <field> <id>STEP_COPY</id> <enabled>Y</enabled> <name>STEP_COPY</name> </field> <field> <id>LINES_READ</id> <enabled>Y</enabled> <name>LINES_READ</name> </field> <field> <id>LINES_WRITTEN</id> <enabled>Y</enabled> <name>LINES_WRITTEN</name> </field> <field> <id>LINES_UPDATED</id> <enabled>Y</enabled> <name>LINES_UPDATED</name> </field> <field> <id>LINES_INPUT</id> <enabled>Y</enabled> <name>LINES_INPUT</name> </field> <field> <id>LINES_OUTPUT</id> <enabled>Y</enabled> <name>LINES_OUTPUT</name> </field> <field> <id>LINES_REJECTED</id> <enabled>Y</enabled> <name>LINES_REJECTED</name> </field> <field> <id>ERRORS</id> <enabled>Y</enabled> <name>ERRORS</name> </field> <field> <id>LOG_FIELD</id> <enabled>N</enabled> <name>LOG_FIELD</name> </field> </step-log-table> <metrics-log-table> <connection/> <schema/> <table/> <timeout_days/> <field> <id>ID_BATCH</id> <enabled>Y</enabled> <name>ID_BATCH</name> </field> <field> <id>CHANNEL_ID</id> <enabled>Y</enabled> <name>CHANNEL_ID</name> </field> <field> <id>LOG_DATE</id> <enabled>Y</enabled> <name>LOG_DATE</name> </field> <field> <id>METRICS_DATE</id> <enabled>Y</enabled> <name>METRICS_DATE</name> </field> <field> <id>METRICS_CODE</id> <enabled>Y</enabled> <name>METRICS_CODE</name> </field> <field> <id>METRICS_DESCRIPTION</id> <enabled>Y</enabled> <name>METRICS_DESCRIPTION</name> </field> <field> <id>METRICS_SUBJECT</id> <enabled>Y</enabled> <name>METRICS_SUBJECT</name> </field> <field> <id>METRICS_TYPE</id> <enabled>Y</enabled> <name>METRICS_TYPE</name> </field> <field> <id>METRICS_VALUE</id> <enabled>Y</enabled> <name>METRICS_VALUE</name> </field> </metrics-log-table> </log> <maxdate> <connection/> <table/> <field/> <offset>0.0</offset> <maxdiff>0.0</maxdiff> </maxdate> <size_rowset>10000</size_rowset> <sleep_time_empty>50</sleep_time_empty> <sleep_time_full>50</sleep_time_full> <unique_connections>N</unique_connections> <feedback_shown>Y</feedback_shown> <feedback_size>50000</feedback_size> <using_thread_priorities>Y</using_thread_priorities> <shared_objects_file/> <capture_step_performance>N</capture_step_performance> <step_performance_capturing_delay>1000</step_performance_capturing_delay> <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit> <dependencies> </dependencies> <partitionschemas> </partitionschemas> <slaveservers> </slaveservers> <clusterschemas> </clusterschemas> <created_user>-</created_user> <created_date>2024/06/03 08:48:46.329</created_date> <modified_user>-</modified_user> <modified_date>2024/06/03 08:48:46.329</modified_date> <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key> <is_key_private>N</is_key_private> </info> <notepads> </notepads> <order> <hop> <from>自定义常量数据</from> <to>获取redis中上一个值</to> <enabled>Y</enabled> </hop> </order> <step> <name>自定义常量数据</name> <type>DataGrid</type> <description/> <distribute>Y</distribute> <custom_distribution/> <copies>1</copies> <partitioning> <method>none</method> <schema_name/> </partitioning> <fields> <field> <name>name</name> <type>String</type> <format/> <currency/> <decimal/> <group/> <length>-1</length> <precision>-1</precision> <set_empty_string>N</set_empty_string> <field_null_if/> </field> </fields> <data> <line> <item>T1-1-running</item> </line> </data> <attributes/> <cluster_schema/> <remotesteps> <input> </input> <output> </output> </remotesteps> <GUI> <xloc>256</xloc> <yloc>112</yloc> <draw>Y</draw> </GUI> </step> <step> <name>获取redis中上一个值</name> <type>UserDefinedJavaClass</type> <description/> <distribute>Y</distribute> <custom_distribution/> <copies>1</copies> <partitioning> <method>none</method> <schema_name/> </partitioning> <definitions> <definition> <class_type>TRANSFORM_CLASS</class_type> <class_name>Processor</class_name> <class_source>import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.util.LinkedHashSet; import java.util.Set; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //1、连接redis JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(5); poolConfig.setMaxIdle(1); poolConfig.setMaxWaitMillis(1000); Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>(); nodes.add(new HostAndPort("11.11.1.40", 7001)); nodes.add(new HostAndPort("11.11.1.41", 7001)); nodes.add(new HostAndPort("11.11.1.42", 7001)); JedisCluster jedis = new JedisCluster(nodes, poolConfig); //2、获取key值 //2、获取key值 String name = get(Fields.In, "name").getString(r); boolean exists = jedis.exists(name); logBasic("redis的key是 "+name +" 是否存在 "+exists); //2.1、获取上一次的机米长度 String lastVsxzl = "0"; if (!exists) { jedis.hset(name, "vsxzl", "0"); } else { if (!jedis.hexists(name, "vsxzl")) { jedis.hset(name, "vsxzl", "0"); } lastVsxzl = jedis.hget(name, "vsxzl"); } //2.2、获取状态值 String status="0"; if (!jedis.hexists(name, "status")) { jedis.hset(name, "status", "0"); }else{ status=jedis.hget(name,"status"); } //2.3、获取vsxg String vsxg="0"; if (!jedis.hexists(name, "vsxg")) { jedis.hset(name, "vsxg", "0"); }else{ vsxg=jedis.hget(name,"vsxg"); } logBasic("上次长 "+lastVsxzl+" 上次状态 "+status+" 上次vsxg "+vsxg); //3、输出key值 get(Fields.Out, "lastVsxzl").setValue(r, lastVsxzl); get(Fields.Out, "lastStatus").setValue(r, status); get(Fields.Out, "lastVsxg").setValue(r, vsxg); // 关闭连接 try { jedis.close(); } catch (IOException e) { e.printStackTrace(); } putRow(data.outputRowMeta, r); return true; } </class_source> </definition> </definitions> <fields> <field> <field_name>lastVsxzl</field_name> <field_type>String</field_type> <field_length>-1</field_length> <field_precision>-1</field_precision> </field> <field> <field_name>lastStatus</field_name> <field_type>String</field_type> <field_length>-1</field_length> <field_precision>-1</field_precision> </field> <field> <field_name>lastVsxg</field_name> <field_type>String</field_type> <field_length>-1</field_length> <field_precision>-1</field_precision> </field> </fields> <clear_result_fields>N</clear_result_fields> <info_steps/> <target_steps/> <usage_parameters/> <attributes/> <cluster_schema/> <remotesteps> <input> </input> <output> </output> </remotesteps> <GUI> <xloc>464</xloc> <yloc>160</yloc> <draw>Y</draw> </GUI> </step> <step_error_handling> </step_error_handling> <slave-step-copy-partition-distribution> </slave-step-copy-partition-distribution> <slave_transformation>N</slave_transformation> <attributes/> </transformation>4.3、kettle 使用动态变量名定义变量
name是变量,value 值也是变量
我需要把name作为变量名,value作为变量值;
在kettle中,使用javascript脚本
key与lastVsxzl都是变量
//Script here setVariable(key,lastVsxzl,'r'); var r=getVariable(key,'r');Demo
1、从记事本里面获取机头号name
2、根据机头号从redis中获取相应的vsxzl值
3、name与vsxzl 成为一个变量,name是变量名,vsxzl是变量值
成功的截图如下
文本文件输入的截图
获取redis值的截图
import redis.clients.jedis.Jedis; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { if (first) { first = false; } Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //1、连接redis Jedis jedis = new Jedis("10.20.1.17", 6379); //2、获取key值 String name = get(Fields.In, "name").getString(r); boolean exists = jedis.exists(name); //2.1、获取上一次的机米长度 String lastVsxzl = "0"; if (!exists) { jedis.hset(name, "vsxzl", "0"); } else { if (!jedis.hexists(name, "vsxzl")) { jedis.hset(name, "vsxzl", "0"); } lastVsxzl = jedis.hget(name, "vsxzl"); } //3、输出key值 get(Fields.Out, "lastVsxzl").setValue(r, lastVsxzl); get(Fields.Out, "key").setValue(r, name+"vsxzl"); // 关闭连接 jedis.close(); // Send the row on to the next step. putRow(data.outputRowMeta, r); return true; }Javascript代码
5.1、kettle pan.sh如何后台运行
需求
kettle运行转换,当前终端关闭,仍然能够继续运行。
nohup ./pan.sh -file=/root/kettle/job/monitor.ktr > /root/kettle/job/log/log.txt 2>&1 &