总结 Flink DataStream API 常见的5种数据输入源(Source)

本篇教程将详细介绍 Flink-1.16.0 DataStream API 中常用的五种数据输入源(Source)。

下面我们依次展开讲解,帮助大家尽快掌握 Flink 的数据输入基础。

一、从集合读取数据

Flink 提供了直接从 Java 或 Scala 集合中读取数据的方法,主要适用于小规模数据以及测试场景。通过这种方式,我们可以直接在程序中定义数据,然后将这些数据转化为 DataStream。

代码示例:

假设我们有一个整数列表,Java 代码示例如下:

package cn.datastream.apidemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;

public class CollectionSourceExample {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.定义集合数据
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);

        // 3.通过集合创建数据流
        DataStream<Integer> numberStream = env.fromCollection(numberList);

        // 4.对数据流做简单处理,例如打印结果
        numberStream.print();

        // 5.执行任务
        env.execute("从集合读取数据示例.");
    }
}

执行代码结果输出如下:

从集合读取数据执行结果

使用说明

  • 数据来源:数据直接从集合中提供,因此数据量有限,适合调试与测试。

  • 代码简单易懂:从集合创建 DataStream 的代码仅需一行(通过 fromCollection 方法)。

  • 局限性:实时生产环境中难以使用这种方式,它更多地用于离线调试和单元测试。

二、从文件读取数据

在 Flink 的 DataStream API 中,可以方便地从文件中读取数据作为数据源。常见的文件来源包括本地文件系统和分布式文件系统(如 HDFS)。下面分别示例说明。

1. 从本地 CSV 文件读取数据

假设我们有一个本地 CSV 文件 Person.csv,内容如下:

id,name,age
1,zhangsan,18
2,lisi,20
3,wangwu,22
4,zhaoqian,23
5,sunli,19

我们可以使用 Flink 的 readTextFile 或者 readFile 方法进行读取,然后解析每一行数据。

package cn.datastream.apidemo;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadLocalCsvFileExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1.从本地文件读取,并去除 BOM
        DataStream<String> fileDataStream = env.readTextFile("src/main/resources/Person.csv")
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String value) throws Exception {
                        // 处理 BOM 标记,去除可能的 BOM(UTF-8 编码的 BOM 标记是 0xEF 0xBB 0xBF)
                        if (value.startsWith("\uFEFF")) {
                            return value.substring(1); // 去掉 BOM
                        }
                        return value;
                    }
                });

        // 2.解析 CSV 数据(简单按逗号分割)
        DataStream<Person> parsed = fileDataStream.filter(line -> !line.startsWith("id")) // 过滤掉表头
                .map(new MapFunction<String, Person>() {
                    @Override
                    public Person map(String line) {
                        String[] fields = line.split(",");
                        return new Person(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]));
                    }
                });

        // 3.数据解析后的数据
        parsed.print();

        // 4.执行任务
        env.execute("读取本地 CSV 文件中的数据.");
    }

    // 定义 Person 类
    public static class Person {
        public int id;
        public String name;
        public int age;

        // 必须有无参构造函数
        public Person() {}

        public Person(int id, String name, int age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return id + "," + name + "," + age;
        }
    }
}

执行代码结果输出如下:

从本地文件读取数据执行结果

注意事项

  • 在某些 CSV 文件(尤其是 UTF-8 编码的文件)中,文件开头可能有一个隐藏的 BOM 标记,它会让程序在解析行时误将表头(如 "id,name,age")当作数据来处理,从而引发 NumberFormatException 错误。

  • 如果是 Maven 项目运行,src/main/resources/Person.csv 路径需要根据你的实际位置填写。

  • 简单示例直接按逗号分隔,真实情况建议使用 Flink 内置的 CSV 格式解析器或者用 POJOCsvInputFormat

2. 从 HDFS 文件系统读取数据

如果数据存储在 Hadoop 分布式文件系统(HDFS)上,Flink 同样支持直接读取。

提前将数据保存在 HDFS 上:hdfs://hadoop101:9000/data/Person.csv 。先启动 Hadoop 集群,再执行如下代码。

package cn.datastream.apidemo;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadHdfsCsvFileExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 HDFS 读取
        DataStream<String> input = env.readTextFile("hdfs://hadoop101:9000/data/Person.csv");

        // 解析 CSV 数据
        DataStream<Person> parsed = input
            .filter(line -> !line.startsWith("id")) // 过滤表头
            .map(new MapFunction<String, Person>() {
                @Override
                public Person map(String line) throws Exception {
                    String[] fields = line.split(",");
                    return new Person(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]));
                }
            });

        parsed.print();

        env.execute("读取 HDFS 上文件的数据.");
    }

    public static class Person {
        public int id;
        public String name;
        public int age;

        public Person() {}

        public Person(int id, String name, int age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return id + "," + name + "," + age;
        }
    }
}

执行代码结果输出如下:

从HDFS读取数据执行结果

注意事项

  • 确保 Flink 运行环境已经配置好 Hadoop 依赖(需要在 pom.xml 中添加 Flink Hadoop 连接器)。
  • <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.2</version>
    </dependency>
  • Flink 任务的 JobManager 和 TaskManager 必须能够访问 HDFS 的地址,并且有读取权限。

  • 如果在本地环境调试,需要本地安装 Hadoop 客户端并配置好环境变量。

三、从Socket读取数据

Socket 数据源常用于实时数据采集和调试,例如在本地通过 netcat 模拟数据发送。Socket Source 在教学和一些实时应用原型开发中非常有用。

在虚拟机 hadoop101 上使用 netcat 工具打开一个端口(如 9999),在 Flink 中编写 Socket 读取代码:

package cn.datastream.apidemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SocketSourceExample {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.定义 Socket 数据源:主机名和端口
        DataStream<String> socketStream = env.socketTextStream("192.168.220.30", 9999);

        // 3.处理数据:例如简单打印
        socketStream.print();

        // 4.执行任务
        env.execute("从 Socket 中读取实时数据.");
    }
}

执行代码结果输出如下:

从socket套接字读取数据执行结果

使用说明

  • 启动 netcat:使用命令 nc -lk 9999(Linux/Mac)开启监听服务,便可向 Flink 程序发送文本数据。

  • 实时数据:Socket Source 本质上是一个流式数据源,数据一旦到达就可实时处理。

  • 局限性:不具备容错能力,适合测试或者简单的演示。生产场景下需使用更稳健的数据源系统。

四、从 Kafka 读取数据

Kafka 是目前非常流行的分布式消息队列系统,Flink 提供了 Kafka Connector,可帮助我们无缝地将 Kafka 消息引入 Flink 流处理应用。在实际生产中,Kafka 常用于数据采集、日志收集、流处理等场景。

下面是使用 Kafka 作为数据输入源的一个简单示例(Java 代码):

package cn.datastream.apidemo;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSourceExample {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.Kafka 配置参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.220.30:9092");
        properties.setProperty("group.id", "flink_kafka_group");

        // 3.创建 Kafka 消费者,指定 topic、序列化方式和属性
        String topic = "your_topic_name";
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                topic,
                new SimpleStringSchema(),
                properties
        );

        // 4.添加 Kafka 数据源到执行环境
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 5.打印读取的 Kafka 数据
        kafkaStream.print();

        // 6.执行任务
        env.execute("从 Kafka 中读取实时数据.");
    }
}

使用说明

  • 依赖问题:需在 Maven 项目中引入 Kafka Connector 相关依赖,否则编译无法找到 FlinkKafkaConsumer 类。
  • <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-connector-kafka</artifactId>
       <version>1.16.0</version>
    </dependency>
  • 容错机制:Flink Kafka Connector 内部支持 checkpoint 与状态后端,能保证在故障时实现“至少一次”的消息语义。

  • 应用场景:适用于大数据量实时处理场景,如日志聚合、实时监控、数据 ETL 等。

五、自定义 Source

有些业务场景下,现有的内置数据源无法满足需求,我们需要自定义数据源以接入独特的数据生成逻辑。Flink 提供了两种方式:一种是实现 SourceFunction 接口,另一种是继承 RichSourceFunction 并实现相应方法。

下面是一个简单的自定义数据源示例,该数据源会每秒生成一次当前时间戳:

package cn.datastream.apidemo;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class CustomSource implements SourceFunction<Long> {
    // 标识数据源是否正在运行
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRunning) {
            // 获取当前系统时间并发送数据
            long currentTime = System.currentTimeMillis();
            ctx.collect(currentTime);

            // 每秒采集一次
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

在任务中使用自定义 Source:

package cn.datastream.apidemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomSourceExample {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.添加自定义数据源
        DataStream<Long> timeStream = env.addSource(new CustomSource());

        // 3.打印输出数据流
        timeStream.print();

        // 4.执行任务
        env.execute("从自定义的数据源中读取数据.");
    }
}

执行代码结果输出如下:

读取自定义source执行结果

使用说明

  • 实现 SourceFunction 接口:自定义 Source 只需实现 run()cancel() 方法。

  • 数据的生成与采集:数据生成逻辑完全由用户控制,可从文件、数据库、API 获取或者生成模拟数据。

  • 扩展性:自定义 Source 可以灵活接入各种数据系统,满足个性化需求。

小结

本教程结合 Flink-1.16.0 DataStream API 常见的数据输入源进行了详细讲解,并通过示例代码展示了如何:

  1. 从集合读取数据,适合数据测试与调试;

  2. 从文件读取数据,支持读取本地及分布式文件系统中的文本数据;

  3. 从 Socket 读取数据,实现实时数据采集;

  4. 从 Kafka 读取数据,使用 Kafka Connector 进行大规模实时数据流处理;

  5. 自定义 Source,实现对特殊数据生成逻辑的支持。

通过以上示例与说明,希望读者能尽快上手 Flink DataStream API,并结合业务需求灵活选择合适的数据输入方式,实现更高效、可靠的实时数据处理。