编写最简单flink应用并提交到flink v1.19.2集群

news/2025/2/26 1:58:26

1 概述

本文介绍编写最最简单的word count的代码,编译成jar后,提交到flink v1.19.2集群进行运行。

2 环境准备

2.1 jdk和maven工具的安装

yum安装jdk 1.8:

yum install java-1.8.0-openjdk-1.8.0.212.b04-0.el7_6.x86_64 java-1.8.0-openjdk-devel-1.8.0.212.b04-0.el7_6.x86_64 -y

下载maven二进制文件的压缩包,并解压。

cd /usr/local
wget https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.6.3-bin.tar.gz
tar xf apache-maven-3.6.3-bin.tar.gz
ln -s /usr/local/apache-maven-3.6.3 apache-maven

将以下两行追加到/etc/profile:

MAVEN_HOME=/usr/local/apache-maven
PATH=$PATH:$MAVEN_HOME/bin

加载linux环境变量:

source /etc/profile

查看maven的信息:
在这里插入图片描述

flink_29">2.2 准备flink集群

https://archive.apache.org/dist/flink/flink-1.19.2/

下载flink安装包,并启动集群,启动脚本放在bin目录中,脚本名称为start-cluster.sh。
在这里插入图片描述在这里插入图片描述
flink集群的web控制台监听在8081端口:
在这里插入图片描述

flink_39">3 编写flink应用代码

3.1 创建代码目录

cd /tmp
mkdir -p wordcount
cd wordcount
mkdir -p src/main/java/org/example

3.2 pom.xml文件

pom.xml内容如下,可直接拿来使用。
pom.xml指定flink版本为1.19.2,只需显式指明两个依赖flink-streaming-java和flink-clients。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.19.2</flink.version>
    </properties>


    <dependencies>
        <!-- Flink Streaming Java API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- Flink CLI Support (optional, for local execution) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3.3 java代码

文件名称为src/main/java/org/example/WordCount.java,内容如下:

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 定义数据源(从集合中读取数据)
        DataStream<String> text = env.fromElements(
                "Hello Flink",
                "Hello World",
                "Flink is awesome",
                "World is big"
        );


        // 转换操作:将句子拆分为单词,并统计每个单词的出现次数
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0) // 按单词分组
                .sum(1); // 对单词计数求和


        // 输出结果
        counts.print();


        // 启动任务
        env.execute("WordCount Example");
    }


    // 自定义 FlatMapFunction,将句子拆分为单词
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // 将句子转换为小写并按空格分割
            String[] words = value.toLowerCase().split("\\W+");


            // 发射每个单词及其初始计数(1)
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}

最终代码结构

在这里插入图片描述

编译

mvn clean package

在这里插入图片描述
在这里插入图片描述

flink_186">提交jar包到flink集群

浏览器打开:http://localhost:8081

在左边菜单栏的Submit New job,点击提交jar文件即可,flink任务开始运行。
在这里插入图片描述
在任务的标准输出可以看见,有内容输出:
在这里插入图片描述

小结

本文介绍启动flink集群、编写最简单的flink应用代码,提交jar包到flink集群,这些基础流程非常适合新接触flink的工程师。


http://www.niftyadmin.cn/n/5867063.html

相关文章

业务应用和大数据平台的数据流向

概述 业务应用与大数据平台之间的交互是实现数据驱动决策和实时业务处理的关键环节。其交互方式多样&#xff0c;协议选择取决于数据流向、实时性要求及技术架构。一句话总结&#xff0c;数据流向可以是从业务应用写入大数据平台&#xff0c;也可以是大数据平台回写至业务应用…

计算机毕业设计SpringBoot+Vue.jst网上超市系统(源码+LW文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

【无人集群系列---无人机集群编队算法】

【无人集群系列---无人机集群编队算法】 一、核心目标二、主流编队控制方法1. 领航-跟随法&#xff08;Leader-Follower&#xff09;2. 虚拟结构法&#xff08;Virtual Structure&#xff09;3. 行为法&#xff08;Behavior-Based&#xff09;4. 人工势场法&#xff08;Artific…

量子计算在金融风险评估中的应用:革新与突破

量子计算在金融风险评估中的应用:革新与突破 大家好,我是Echo_Wish,一名专注于人工智能和Python的自媒体创作者。今天,我们要探讨的是量子计算在金融风险评估中的应用。量子计算作为新一代计算技术,其超强的计算能力和并行处理能力,正在逐步改变金融风险评估的传统方法。…

vue3 下载文件 responseType-blob 或者 a标签

在 Vue 3 中&#xff0c;你可以使用 axios 或 fetch 来下载文件&#xff0c;并将 responseType 设置为 blob 以处理二进制数据。以下是一个使用 axios 的示例&#xff1a; 使用 axios 下载文件 首先&#xff0c;确保你已经安装了 axios&#xff1a; npm install axios然后在你…

国产编辑器EverEdit - 如何在EverEdit中创建工程?

1 创建工程 1.1 应用场景 工程是一个文件及文件夹的集合&#xff0c;对于稍微有点规模的项目&#xff0c;一般都会包含多个文件&#xff0c;甚至还会以文件夹的形式进行分层管理多个文件&#xff0c;为了方便的管理这个项目&#xff0c;可以将这些文件和文件夹保存为一个工程。…

01背包之---应用篇

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、01背包之---背包是否能被装满&#xff1f;例题1.分析题意例题2.分析题意 二、01背包之---装满背包有多少种组合?例题1.分析题意 三、01背包之---容量为N的…

C++和OpenGL实现3D游戏编程【连载23】——几何着色器和法线可视化

欢迎来到zhooyu的C++和OpenGL游戏专栏,专栏连载的所有精彩内容目录详见下边链接: 🔥C++和OpenGL实现3D游戏编程【总览】 1、本节实现的内容 上一节课,我们在Blend软件中导出经纬球模型时,遇到了经纬球法线导致我们在游戏中模型光照显示问题,我们在Blender软件中可以通过…