Flink 维表关联

news/2024/6/19 2:34:08 标签: 1024程序员节, flink, 大数据

1、实时查询维表

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统


public class DimSync  extends RichMapFunction<fplOverview, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");
    }
    @Override
    public String map(fplOverview fplOverview) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());

        String dp_id = jsonObject.getString("dp_id");

        //根据 dp_id 查询  上周的 fpl_amount,yw
        PreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                                                      "from fpl_overview \n" +
                                                      "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                                                      "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                                                      "and dp_id = ? \n" +
                                                      "group by dp_id");
        pst.setString(1,dp_id);
        ResultSet resultSet = pst.executeQuery();
        String fpl_amount = null;
        String yw = null ;
        while (resultSet.next()){
            fpl_amount = resultSet.getString(1);
            yw = resultSet.getString(2);
        }
        pst.close();

        jsonObject.put("lastweek_fpl_amount",fpl_amount);
        jsonObject.put("lastweek_yw",yw)
        return jsonObject.toString();

    }
    public void close() throws Exception {
        super.close();
        conn.close();
    }

flink_Id_48">2、LRU 缓存 (flink 异步Id)

利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。


public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {
    private SQLClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        Vertx vertx = Vertx.vertx(new VertxOptions()
                .setWorkerPoolSize(10)
                .setEventLoopPoolSize(10));

        JsonObject config = new JsonObject()
                .put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false")
                .put("driver_class", "com.mysql.cj.jdbc.Driver")
                .put("max_pool_size", 10)
                .put("user", "root")
                .put("password", "");

        client = JDBCClient.createShared(vertx, config);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {
        client.getConnection(
                conn -> {
            if (conn.failed()) {
                return;
            }

            final SQLConnection connection = conn.result();
            // 执行sql
            connection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                             "from fpl_overview \n" +
                            "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                            "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                             "and dp_id = '" + fplOverview.getDp_id() + " ' " +
                            "group by dp_id ", res2 -> {
                ResultSet rs = new ResultSet();
                if (res2.succeeded()) {
                    rs = res2.result();
                }else{
                    System.out.println("查询数据库出错");
                }
                List<JsonObject> stores = new ArrayList<>();
                for (JsonObject json : rs.getRows()) {
                    stores.add(json);
                }
                connection.close();
                resultFuture.complete(stores);
            });
        });

    }

}

3、预加载全量mysql数据

预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

public class WholeLoad extends RichMapFunction<fplOverview,String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    // 定义map的结果,key为关联字段
    private static Map<String,String> cache ;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        cache = new HashMap<>();
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据
    }

    @Override
    public String map(fplOverview fplOverview) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());
        String dp_id = jsonObject.getString("dp_id");
        // 获取对应id的结果
        String rs = cache.get(dp_id);
        JSONObject rsObject = JSONObject.parseObject(rs);
        jsonObject.putAll(rsObject);
        return jsonObject.toString();
    }

    public   void  load() throws Exception {

        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");
        // 执行查询的SQL
        PreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +
                "from fpl_overview \n" +
                "where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +
                "and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +
                "group by dp_id");
        ResultSet rs = statement.executeQuery();
        while (rs.next()) {
            // 查询结果放入缓存
            String dp_id = rs.getString("dp_id");
            String fpl_amount = rs.getString("fpl_amount");
            String yw = rs.getString("yw");
            JSONObject jsonObject = JSONObject.parseObject("{}");
            jsonObject.put("lastweek_fpl_amount",fpl_amount);
            jsonObject.put("lastweek_yw",yw);
            cache.put(dp_id,jsonObject.toString());
        }
        System.out.println("数据输出测试:"+cache.toString());
        con.close();

    }
}


http://www.niftyadmin.cn/n/5121748.html

相关文章

Web APIs——环境对象this以及回调函数

一、环境对象 环境对象&#xff1a;指的是函数内部特殊的变量this&#xff0c;它代表着当前函数运行时所处的环境 作用&#xff1a;弄清楚this的指向&#xff0c;可以使代码更简洁 函数的调用方式不同&#xff0c;this指代的对象也不同[谁调用&#xff0c;this就是谁] 是判断th…

负载均衡--Haproxy

haproxy 他也是常用的负载均衡软件 nginx 支持四层转发&#xff0c;七层转发 haproxy也可以四层和七层转发 haproxy&#xff1a;法国人开发的威利塔罗在2000年基于C语言开发的一个开源软件 可以支持一万以上的并发请求 高性能的tcp和http负载均衡2.4 1.5.9 haproxy&#…

YOLOv7改进:全网原创首发 | 新颖的多尺度卷积注意力(MSCA),即插即用,助力小目标检测 | NeurIPS2022

💡💡💡本文全网首发独家改进:多尺度卷积注意力(MSCA),有效地提取上下文信息,新颖度高,创新十足。 1)作为注意力MSCA使用; 推荐指数:五星 MSCA | 亲测在多个数据集能够实现涨点,多尺度特性在小目标检测表现也十分出色。 收录: YOLOv7高阶自研专栏介绍:…

nginx+websphere sendRedirect 端口错误

nginxwebsphere sendRedirect 端口错误 问题现象&#xff1a; nginxwebsphere 当在websphere中使用sendRedirect时&#xff0c;会将websphere的端口传递到浏览器 eg: 正确的访问地址为&#xff1a;127.0.0.1 使用sendRedirect后&#xff0c;变为127.0.0.1:9080 解决办法&…

户外用品经营小程序商城的作用是什么

线下门店经营不易&#xff0c;自然流量降低&#xff0c;发广告传单、口口相传的形式也很难获取到客户&#xff0c;或推广成本难以统计&#xff0c;往往会出现本大于利的情况。如今各行各业都通过线上数字化转型以提升自己的销量及营业额、扩展品牌等。 同时在日常经营中&#…

Python参数种类介绍

Python参数种类介绍 相比于一些其他编程语言&#xff0c;Python提供了更多的参数种类选项。这是Python的一大特点&#xff0c;使用不同的参数类型&#xff0c;可以提高函数的可读性和可维护性。例如&#xff0c;使用关键字参数可以使函数调用更加清晰&#xff0c;不需要记住参数…

NOIP2023模拟2联测23 负责

题目大意 有 n n n个区间 [ l i , r i ] [l_i,r_i] [li​,ri​]&#xff0c;每个区间有一个权值 w i w_i wi​。把这 n n n个区间当成 n n n个点&#xff0c;如果两个区间之间有交&#xff08;包括端点&#xff09;&#xff0c;那么就在这两个区间之间连边。于是&#xff0c;这…

Oracle 数据库的锁排查方法

关键字 oracle lock 问题描述 Oracle 数据库上锁问题如何排查 解决问题思路 准备数据 create table lock_test(name varchar(10),age varchar(10));insert into lock_test values(ff,10); insert into lock_test values(yy,20); insert into lock_test values(ll,30);Orac…