Java中消息和缓存如何使用?


唐朝
唐朝 2024-04-18 23:08:44 61032
分类专栏: 问答

1.Java中有个需求是我要每秒中接收mqtt给我发来的消息,然后和数据库中的数据进行比对,如果一样就存入表中,我想这个如何使用redis缓存让他不要频繁访问数据库。

网站声明:如果转载,请联系本站管理员。否则一切后果自行承担。

本文链接:https://www.xckfsq.com/news/show.html?id=53377
赞同 0
评论 2 条
  • 1843880570 2024-04-19 09:16:46

    缓存是Java开发中经常用到的组件,我们会使用缓存来存储一些不经常改变的热点数据,提高系统处理效率,其根本原因在于内存和硬盘读写速度的巨大差异。
    Java 1.8中有多个本地缓存,主要是Guava和Caffine,当然我们也可以自己设置一个HashMap来当做缓存使用,接下来一一介绍这几种本地缓存。

    1、使用HashMap当做缓存
    这是个比较简单的方法,最好是使用ConcurrtHashMap,ConcurrtHashMap兼顾了效率和并发一致性。这里简单用ConcurrtHashMap实现了一个本地缓存,当我们根据code查询对应城市时,如果cache中没有或者缓存已过期,则会从模拟的数据库中查询,这里设置的超时时间为3秒。

    public class ConcurrentHashMapTest {
        private static ConcurrentHashMap<Integer, Tuple2<Long, String>> cache = new ConcurrentHashMap<>(3);
        private static Map<Integer, String> dbData = new HashMap<>(3);
        static {
            dbData.put(1, "shanghai");
            dbData.put(2, "beijing");
            dbData.put(3, "shenzhen");
        }
        private static int expirationTime = 3;
        private static int mill = 1000;

        @Test
        @SneakyThrows
        public void test() {
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(1000);
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(3000);
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(1000);
            System.out.println("the result is " + getCityByCode(2));
        }

        private String getCityByCode(int code) {
            if (!cache.containsKey(code)) {
                return getCityFromDb(code);
            }
            Tuple2<Long, String> tuple2 = cache.get(code);
            if (isOverTime(tuple2._1)) {
                System.out.println("cache is over time");
                return getCityFromDb(code);
            } else {
                return tuple2._2;
            }
        }

        private String getCityFromDb(Integer code) {
            String city = dbData.get(code);
            System.out.println("query city " + city + " from db");
            cache.put(code, new Tuple2<>(System.currentTimeMillis(), city));
            return city;
        }

        private boolean isOverTime(Long time) {
            if ((System.currentTimeMillis() - time) / mill > expirationTime) {
                return true;
            }
            return false;
        }
    }

    输出结果:
    query city shanghai from db
    the result is shanghai
    the result is shanghai
    cache is over time
    query city shanghai from db
    the result is shanghai
    query city beijing from db
    the result is beijing
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    2、Guava Cache的使用
    Guava是Google提供的一套Java工具包,而Guava Cache是一套非常完善的本地缓存机制(JVM缓存)。Guava cache的设计来源于CurrentHashMap,可以按照多种策略来清理存储在其中的缓存值且保持很高的并发读写性能。接下来介绍一下Guava Cache的常用方法,首先还是创建一些公共方法用于测试,这里用静态方法模拟从数据库查询数据。

    private static final int THREE = 3;
    private static Map<Integer, String> dbData = new HashMap<>(3);
    static {
        dbData.put(1, "shanghai");
        dbData.put(2, "beijing");
        dbData.put(3, "shenzhen");
    }

    @SneakyThrows
    private static void sleep(long millis) {
        Thread.sleep(millis);
    }

    private static String getCityFromDb(Integer code) {
        sleep(1000);
        return dbData.get(code) + " " + System.currentTimeMillis();
    }

    private static ListeningExecutorService poolExecutor1 = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    主要包括两种构建缓存的方式,包括LoadingCache和CallableCache 。LoadingCache是指在构建缓存时就设置好加载方法,CallableCache则是在每次获取缓存时设置加载方法。
    这里将 Guava Cache 中的方法分为4类进行介绍,如下所示:

    构建缓存的方法,包括LoadingCache和CallableCache;
    缓存刷新策略(refreshAfterWrite)和缓存过期策略(expireAfterWrite和expireAfterAccess)
    缓存过期方法,包括主动使缓存过期(invalidate和invalidateAll)和基于引用的回收策略(weakKeys、weakValues和softValues)
    其他方法,例如recordStats(记录缓存命中状态)、maximumSize(设定缓存的最大个数)
    2.1 构建 Guava Cache 的方法
    可以分为两种,一种是创建缓存时就设置好,一种是每次获取缓存时设置。前者的好处是不用每次都设置,后者的好处是每次可以设置不同的数据源,按需设置即可。

    private static void createDemo() {
         LoadingCache<Integer, String> guavaLoadingCache = CacheBuilder
                .newBuilder()
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer code) {
                        return getCityFromDb(code);
                    }
                });

         Cache<Integer, String> guavaCallableCache = CacheBuilder
                .newBuilder()
                .build();
        System.out.println(guavaLoadingCache.get(1));
        System.out.println(guavaCallableCache.get(1, () -> getCityFromDb(1)));
    }

    输出结果:
    shanghai 1667722957611
    shanghai 1667722958651
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    2.2 缓存刷新和过期策略
    缓存过期有两种策略,一种是在写之后一段时候过期(expireAfterWrite),另一种是在读或者写之后一段时间过期(expireAfterAccess),二者的区别在于一个是写,一个是读或者写。缓存刷新则是只有在写之后一段时候刷新(refreshAfterWrite)的策略,这里也介绍下removalListener方法,这个方法可以在缓存过期时执行一些操作,缓存刷新和过期的示例如下:

    @SneakyThrows
    private static void expireAndRefreshDemo() {
        LoadingCache<Integer, String> expireCache = CacheBuilder.newBuilder()
                .expireAfterAccess(1000, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        expireCache.put(1, "init value");
        System.out.println(expireCache.get(1));
        Thread.sleep(1500);
        System.out.println(expireCache.get(1));
        System.out.println("————————————————");

        LoadingCache<Integer, String> refreshCache = CacheBuilder.newBuilder()
                .refreshAfterWrite(1000, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        System.out.println(refreshCache.get(1));
        Thread.sleep(1500);
        System.out.println(refreshCache.get(1));
    }

    输出结果:
    init value
    the key 1 is removed
    shanghai 1667722979092
    ————————————————
    shanghai 1667722980096
    the key 1 is removed
    shanghai 1667722982599
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    上述示例中分别新建了两个缓存,缓存刷新和失效的时间均为1000ms,可以看到1500ms后缓存都会失效。注意,如果多个缓存过期策略同时使用,则会同时生效,示例如下:

    @SneakyThrows
    private static void expireDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .expireAfterWrite(1800, TimeUnit.MILLISECONDS)
                .expireAfterAccess(1300, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        Thread.sleep(1000);
        System.out.println(cache.get(1));
        Thread.sleep(1000);
        System.out.println(cache.get(1));
        Thread.sleep(1500);
        System.out.println(cache.get(1));
    }

    输出结果:
    init value
    init value
    the key 1 is removed
    shanghai 1667723200491
    the key 1 is removed
    shanghai 1667723202999
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    可以看到,这里设置了两个缓存过期策略,分别是写后1800ms过期和获取后1300ms过期。然后每隔1000ms读取一次,可以看到2000ms时触发了写后1800ms过期策略。之后等待1500ms后再次读取,触发了获取后1300ms过期策略。后面单独介绍缓存失效和缓存刷新的区别,以及在高并发的场景下查询缓存如何避免缓存穿透。

    2.2 缓存过期方法
    主动使缓存过期的方法包括invalidate和invalidateAll,invalidate是使单个缓存过期,invalidateAll是使全部缓存过期,示例如下。

    @SneakyThrows
    private static void invalidateDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        cache.invalidate(1);
        System.out.println(cache.get(1));
        cache.put(2, "init value");
        cache.invalidateAll();
    }

    输出结果:
    init value
    the key 1 is removed
    shanghai 1667918329448
    the key 2 is removed
    the key 1 is removed
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Guava Cache 中还可以基于引用过期,包括weakKeys、weakValues和softValues。weakKeys的意思是清除key为,这几个引用需要慎用,示例如下。

    @SneakyThrows
    private static void invalidateDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .weakKeys()
                .weakValues()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(3, new String("init key"));
        System.gc();
        System.out.println(cache.get(1));
    }

    输出结果:
    the key 3 is removed
    shanghai 1667918601703
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    2.3 其他方法
    除了上述介绍的方法,Guava Cache 中还有很多其他有用的方法,接下来一一介绍。
    1、recordStats,该方法用于记录缓存命中的状态,便于我们更好的使用缓存,示例如下:

    @SneakyThrows
    private static void otherMethodDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .recordStats()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        System.out.println(cache.get(2));
        System.out.println(cache.get(3));
        System.out.println(cache.stats());
    }

    输出结果:
    init value
    beijing 1667959350889
    shenzhen 1667959353893
    CacheStats{hitCount=1, missCount=2, loadSuccessCount=2, loadExceptionCount=0, totalLoadTime=6008055275, evictionCount=0}
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    可以看到这里缓存命中了一次(key=1),两次没有命中(key=2、3),加载成功两次(key=2、3)
    2、maximumSize,该方法用于设置缓存的最大容量,注意这里是个数,并不是内存大小。当缓存个数大于设置的最大容量时,则会使用LRU算法(通过concurrentHashMap和双向链表实现)来淘汰缓存。

    3、Caffine Cache 的使用
    Caffeine 是一款基于Java8开发的,高性能的的本地缓存库,出自于Benjamin Manes大神之手,大家熟悉的Spring4.3+和SpringBoot1.4+缓存的实现便是基于Caffeine。Caffeine 的缓存淘汰算法是一种对LRU和LFU进行了组合优化的算法,caffeine的用法与guava基本相同,有一些不同的方法。
    除了这两种初始化方式外,caffeine cache还提供了第三种初始化方式,异步加载方式

    赞同 0 反对 0
    回复

  • prtyaa 2024-04-25 02:21:09

    在Java中,消息可以通过java.util.concurrent包中的BlockingQueue来处理,而缓存可以使用Guava CacheCaffeine Cache

    赞同 0 反对 0
    回复

    丫哈哈哈哈哈哈哈各个环节
唐朝L1
粉丝 0 发表 11 + 关注 私信
上周热门
银河麒麟添加网络打印机时,出现“client-error-not-possible”错误提示  1323
银河麒麟打印带有图像的文档时出错  1236
银河麒麟添加打印机时,出现“server-error-internal-error”  1023
统信桌面专业版【如何查询系统安装时间】  951
统信操作系统各版本介绍  944
统信桌面专业版【全盘安装UOS系统】介绍  903
麒麟系统也能完整体验微信啦!  889
统信【启动盘制作工具】使用介绍  499
统信桌面专业版【一个U盘做多个系统启动盘】的方法  440
信刻全自动档案蓝光光盘检测一体机  386
本周热议
我的信创开放社区兼职赚钱历程 40
今天你签到了吗? 27
信创开放社区邀请他人注册的具体步骤如下 15
如何玩转信创开放社区—从小白进阶到专家 15
方德桌面操作系统 14
我有15积分有什么用? 13
用抖音玩法闯信创开放社区——用平台宣传企业产品服务 13
如何让你先人一步获得悬赏问题信息?(创作者必看) 12
2024中国信创产业发展大会暨中国信息科技创新与应用博览会 9
中央国家机关政府采购中心:应当将CPU、操作系统符合安全可靠测评要求纳入采购需求 8

添加我为好友,拉您入交流群!

请使用微信扫一扫!