本文共 8689 字,大约阅读时间需要 28 分钟。
Flink没有类似于spark中的foreach方法,让用户进行迭代的操作。对外的输出操作要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(XX))
官方提供了一部分的框架的sink。除此之外,需要用户自定义实现sink。
public class SinkTest1_Kafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); inputStream.print("input"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString(); }); dataStream.addSink(new FlinkKafkaProducer011 ("localhost:9092", "sinktest", new SimpleStringSchema())); env.execute(); }}
./bin/./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest
启动Kafka生产者:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
Kafka作为数据源的代码:
public class SinkTest1_Kafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka配置项 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从kafka中读取数据 DataStreaminputStream = env.addSource(new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties)); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString(); }); dataStream.addSink(new FlinkKafkaProducer011 ("localhost:9092", "sinktest", new SimpleStringSchema())); env.execute(); }}
启动Kafka消费者:
如上所示效果:
org.apache.bahir flink-connector-redis_2.11 1.0
public class SinkTest2_Redis { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义Jedis连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build(); dataStream.addSink(new RedisSink<>(config, new MyRedisMapper())); env.execute(); } // 自定义RedisMapper public static class MyRedisMapper implements RedisMapper { // 定义保存数据到Redis的命令,存成Hash表,使用hset命令创建sensor_temp 哈希表,数据包括id、temperature @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp"); } @Override public String getKeyFromData(SensorReading data) { return data.getId(); } @Override public String getValueFromData(SensorReading data) { return data.getTemperature().toString(); } }}
启动服务端
cd redis安装包的src目录./redis-server
启动客户端
cd redis安装包的src目录./redis-clikeys *
结果如下
再次执行keys *
命令,结果如下
hget sensor_temp sensor_1
命令,结果如下 查看全部数据,执行hgetall sensor_temp
命令,结果如下 org.apache.flink flink-connector-elasticsearch6_2.12 1.10.1
运行下面的代码
public class SinkTest3_ES { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义es的连接配置 ArrayList httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200)); dataStream.addSink(new ElasticsearchSink.Builder (httpHosts, new MyEsSinkFunction()).build()); env.execute(); } // 实现自定义的ES写入操作 public static class MyEsSinkFunction implements ElasticsearchSinkFunction { @Override public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { // 定义写入的数据source HashMap dataSource = new HashMap<>(); dataSource.put("id", sensorReading.getId()); dataSource.put("temp", sensorReading.getTemperature().toString()); dataSource.put("ts", sensorReading.getTimestamp().toString()); // 创建请求,作为向ES发起的写入命令 IndexRequest indexRequest = Requests.indexRequest() .index("sensor") .type("readingdata") .source(dataSource); // 用index发送请求 requestIndexer.add(indexRequest); } }}
JDBC 自定义 Sink
mysql mysql-connector-java 5.1.48
public class SinkTest4_JDBC { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); dataStream.addSink(new MyJdbcSink()); env.execute(); } public static class MyJdbcSink extends RichSinkFunction { //声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; // open属于生命周期,避免在invoke()中来一条数据就创建一次数据库连接 @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456"); insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception { // 直接执行更新语句,如果没有更新就执行插入 updateStmt.setDouble(1, value.getTemperature()); updateStmt.setString(2, value.getId()); updateStmt.execute(); if(updateStmt.getUpdateCount() == 0){ insertStmt.setString(1, value.getId()); insertStmt.setDouble(2, value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception { insertStmt.close(); updateStmt.close(); connection.close(); } }}
转载地址:http://kyxii.baihongyu.com/