总结 Flink DataStream API 常见的5种数据输入源(Source)
- 实操笔记
- 3天前
- 16热度
- 0评论
文章目录[隐藏]
本篇教程将详细介绍 Flink-1.16.0 DataStream API 中常用的五种数据输入源(Source)。
下面我们依次展开讲解,帮助大家尽快掌握 Flink 的数据输入基础。
一、从集合读取数据
Flink 提供了直接从 Java 或 Scala 集合中读取数据的方法,主要适用于小规模数据以及测试场景。通过这种方式,我们可以直接在程序中定义数据,然后将这些数据转化为 DataStream。
代码示例:
假设我们有一个整数列表,Java 代码示例如下:
package cn.datastream.apidemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;
public class CollectionSourceExample {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.定义集合数据
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
// 3.通过集合创建数据流
DataStream<Integer> numberStream = env.fromCollection(numberList);
// 4.对数据流做简单处理,例如打印结果
numberStream.print();
// 5.执行任务
env.execute("从集合读取数据示例.");
}
}
执行代码结果输出如下:
使用说明
-
:数据直接从集合中提供,因此数据量有限,适合调试与测试。
-
代码简单易懂:从集合创建 DataStream 的代码仅需一行(通过
fromCollection
方法)。 -
局限性
二、从文件读取数据
在 Flink 的 DataStream API 中,可以方便地从文件中读取数据作为数据源。常见的文件来源包括本地文件系统和分布式文件系统(如 HDFS)。下面分别示例说明。
1. 从本地 CSV 文件读取数据
假设我们有一个本地 CSV 文件 Person.csv,内容如下:
id,name,age
1,zhangsan,18
2,lisi,20
3,wangwu,22
4,zhaoqian,23
5,sunli,19
我们可以使用 Flink 的 readTextFile 或者 readFile 方法进行读取,然后解析每一行数据。
package cn.datastream.apidemo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadLocalCsvFileExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.从本地文件读取,并去除 BOM
DataStream<String> fileDataStream = env.readTextFile("src/main/resources/Person.csv")
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// 处理 BOM 标记,去除可能的 BOM(UTF-8 编码的 BOM 标记是 0xEF 0xBB 0xBF)
if (value.startsWith("\uFEFF")) {
return value.substring(1); // 去掉 BOM
}
return value;
}
});
// 2.解析 CSV 数据(简单按逗号分割)
DataStream<Person> parsed = fileDataStream.filter(line -> !line.startsWith("id")) // 过滤掉表头
.map(new MapFunction<String, Person>() {
@Override
public Person map(String line) {
String[] fields = line.split(",");
return new Person(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]));
}
});
// 3.数据解析后的数据
parsed.print();
// 4.执行任务
env.execute("读取本地 CSV 文件中的数据.");
}
// 定义 Person 类
public static class Person {
public int id;
public String name;
public int age;
// 必须有无参构造函数
public Person() {}
public Person(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return id + "," + name + "," + age;
}
}
}
执行代码结果输出如下:
注意事项
-
NumberFormatException
-
如果是 Maven 项目运行,
src/main/resources/Person.csv
路径需要根据你的实际位置填写。 -
简单示例直接按逗号分隔,真实情况建议使用 Flink 内置的 CSV 格式解析器或者用
POJO
加CsvInputFormat
2. 从 HDFS 文件系统读取数据
如果数据存储在 Hadoop 分布式文件系统(HDFS)上,Flink 同样支持直接读取。
hdfs://hadoop101:9000/data/Person.csv
package cn.datastream.apidemo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadHdfsCsvFileExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 HDFS 读取
DataStream<String> input = env.readTextFile("hdfs://hadoop101:9000/data/Person.csv");
// 解析 CSV 数据
DataStream<Person> parsed = input
.filter(line -> !line.startsWith("id")) // 过滤表头
.map(new MapFunction<String, Person>() {
@Override
public Person map(String line) throws Exception {
String[] fields = line.split(",");
return new Person(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]));
}
});
parsed.print();
env.execute("读取 HDFS 上文件的数据.");
}
public static class Person {
public int id;
public String name;
public int age;
public Person() {}
public Person(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return id + "," + name + "," + age;
}
}
}
执行代码结果输出如下:
- 确保 Flink 运行环境已经配置好 Hadoop 依赖(需要在 pom.xml 中添加 Flink Hadoop 连接器)。
-
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency>
-
-
三、从Socket读取数据
Socket 数据源常用于实时数据采集和调试,例如在本地通过 netcat 模拟数据发送。Socket Source 在教学和一些实时应用原型开发中非常有用。
hadoop101
package cn.datastream.apidemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SocketSourceExample {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.定义 Socket 数据源:主机名和端口
DataStream<String> socketStream = env.socketTextStream("192.168.220.30", 9999);
// 3.处理数据:例如简单打印
socketStream.print();
// 4.执行任务
env.execute("从 Socket 中读取实时数据.");
}
}
执行代码结果输出如下:
使用说明
-
:使用命令
nc -lk 9999
(Linux/Mac)开启监听服务,便可向 Flink 程序发送文本数据。 -
实时数据:Socket Source 本质上是一个流式数据源,数据一旦到达就可实时处理。
-
局限性
四、从 Kafka 读取数据
Kafka 是目前非常流行的分布式消息队列系统,Flink 提供了 Kafka Connector,可帮助我们无缝地将 Kafka 消息引入 Flink 流处理应用。在实际生产中,Kafka 常用于数据采集、日志收集、流处理等场景。
下面是使用 Kafka 作为数据输入源的一个简单示例(Java 代码):
package cn.datastream.apidemo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSourceExample {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.Kafka 配置参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.220.30:9092");
properties.setProperty("group.id", "flink_kafka_group");
// 3.创建 Kafka 消费者,指定 topic、序列化方式和属性
String topic = "your_topic_name";
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
topic,
new SimpleStringSchema(),
properties
);
// 4.添加 Kafka 数据源到执行环境
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 5.打印读取的 Kafka 数据
kafkaStream.print();
// 6.执行任务
env.execute("从 Kafka 中读取实时数据.");
}
}
使用说明
- :需在 Maven 项目中引入 Kafka Connector 相关依赖,否则编译无法找到
FlinkKafkaConsumer
类。 -
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
-
:Flink Kafka Connector 内部支持 checkpoint 与状态后端,能保证在故障时实现“至少一次”的消息语义。
-
应用场景
五、自定义 Source
有些业务场景下,现有的内置数据源无法满足需求,我们需要自定义数据源以接入独特的数据生成逻辑。Flink 提供了两种方式:一种是实现 SourceFunction 接口,另一种是继承 RichSourceFunction 并实现相应方法。
下面是一个简单的自定义数据源示例,该数据源会每秒生成一次当前时间戳:
package cn.datastream.apidemo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class CustomSource implements SourceFunction<Long> {
// 标识数据源是否正在运行
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning) {
// 获取当前系统时间并发送数据
long currentTime = System.currentTimeMillis();
ctx.collect(currentTime);
// 每秒采集一次
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
在任务中使用自定义 Source:
package cn.datastream.apidemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSourceExample {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.添加自定义数据源
DataStream<Long> timeStream = env.addSource(new CustomSource());
// 3.打印输出数据流
timeStream.print();
// 4.执行任务
env.execute("从自定义的数据源中读取数据.");
}
}
执行代码结果输出如下:
使用说明
-
:自定义 Source 只需实现
run()
与cancel()
方法。 -
数据的生成与采集:数据生成逻辑完全由用户控制,可从文件、数据库、API 获取或者生成模拟数据。
-
扩展性
小结
本教程结合 Flink-1.16.0 DataStream API 常见的数据输入源进行了详细讲解,并通过示例代码展示了如何:
-
,适合数据测试与调试;
-
从文件读取数据,支持读取本地及分布式文件系统中的文本数据;
-
从 Socket 读取数据,实现实时数据采集;
-
从 Kafka 读取数据,使用 Kafka Connector 进行大规模实时数据流处理;
-
自定义 Source
通过以上示例与说明,希望读者能尽快上手 Flink DataStream API,并结合业务需求灵活选择合适的数据输入方式,实现更高效、可靠的实时数据处理。