Java中消息和缓存如何使用?75


唐朝
唐朝 2024-04-18 23:08:44 47343
分类专栏:问题 问题分类: 其它未定义问题
系统自动结题

1.Java中有个需求是我要每秒中接收mqtt给我发来的消息,然后和数据库中的数据进行比对,如果一样就存入表中,我想这个如何使用redis缓存让他不要频繁访问数据库。

2 个回答
  • 1843880570 进阶 2024-04-19 09:16:46

    缓存是Java开发中经常用到的组件,我们会使用缓存来存储一些不经常改变的热点数据,提高系统处理效率,其根本原因在于内存和硬盘读写速度的巨大差异。
    Java 1.8中有多个本地缓存,主要是Guava和Caffine,当然我们也可以自己设置一个HashMap来当做缓存使用,接下来一一介绍这几种本地缓存。

    1、使用HashMap当做缓存
    这是个比较简单的方法,最好是使用ConcurrtHashMap,ConcurrtHashMap兼顾了效率和并发一致性。这里简单用ConcurrtHashMap实现了一个本地缓存,当我们根据code查询对应城市时,如果cache中没有或者缓存已过期,则会从模拟的数据库中查询,这里设置的超时时间为3秒。

    public class ConcurrentHashMapTest {
        private static ConcurrentHashMap<Integer, Tuple2<Long, String>> cache = new ConcurrentHashMap<>(3);
        private static Map<Integer, String> dbData = new HashMap<>(3);
        static {
            dbData.put(1, "shanghai");
            dbData.put(2, "beijing");
            dbData.put(3, "shenzhen");
        }
        private static int expirationTime = 3;
        private static int mill = 1000;

        @Test
        @SneakyThrows
        public void test() {
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(1000);
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(3000);
            System.out.println("the result is " + getCityByCode(1));
            Thread.sleep(1000);
            System.out.println("the result is " + getCityByCode(2));
        }

        private String getCityByCode(int code) {
            if (!cache.containsKey(code)) {
                return getCityFromDb(code);
            }
            Tuple2<Long, String> tuple2 = cache.get(code);
            if (isOverTime(tuple2._1)) {
                System.out.println("cache is over time");
                return getCityFromDb(code);
            } else {
                return tuple2._2;
            }
        }

        private String getCityFromDb(Integer code) {
            String city = dbData.get(code);
            System.out.println("query city " + city + " from db");
            cache.put(code, new Tuple2<>(System.currentTimeMillis(), city));
            return city;
        }

        private boolean isOverTime(Long time) {
            if ((System.currentTimeMillis() - time) / mill > expirationTime) {
                return true;
            }
            return false;
        }
    }

    输出结果:
    query city shanghai from db
    the result is shanghai
    the result is shanghai
    cache is over time
    query city shanghai from db
    the result is shanghai
    query city beijing from db
    the result is beijing
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    2、Guava Cache的使用
    Guava是Google提供的一套Java工具包,而Guava Cache是一套非常完善的本地缓存机制(JVM缓存)。Guava cache的设计来源于CurrentHashMap,可以按照多种策略来清理存储在其中的缓存值且保持很高的并发读写性能。接下来介绍一下Guava Cache的常用方法,首先还是创建一些公共方法用于测试,这里用静态方法模拟从数据库查询数据。

    private static final int THREE = 3;
    private static Map<Integer, String> dbData = new HashMap<>(3);
    static {
        dbData.put(1, "shanghai");
        dbData.put(2, "beijing");
        dbData.put(3, "shenzhen");
    }

    @SneakyThrows
    private static void sleep(long millis) {
        Thread.sleep(millis);
    }

    private static String getCityFromDb(Integer code) {
        sleep(1000);
        return dbData.get(code) + " " + System.currentTimeMillis();
    }

    private static ListeningExecutorService poolExecutor1 = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    主要包括两种构建缓存的方式,包括LoadingCache和CallableCache 。LoadingCache是指在构建缓存时就设置好加载方法,CallableCache则是在每次获取缓存时设置加载方法。
    这里将 Guava Cache 中的方法分为4类进行介绍,如下所示:

    构建缓存的方法,包括LoadingCache和CallableCache;
    缓存刷新策略(refreshAfterWrite)和缓存过期策略(expireAfterWrite和expireAfterAccess)
    缓存过期方法,包括主动使缓存过期(invalidate和invalidateAll)和基于引用的回收策略(weakKeys、weakValues和softValues)
    其他方法,例如recordStats(记录缓存命中状态)、maximumSize(设定缓存的最大个数)
    2.1 构建 Guava Cache 的方法
    可以分为两种,一种是创建缓存时就设置好,一种是每次获取缓存时设置。前者的好处是不用每次都设置,后者的好处是每次可以设置不同的数据源,按需设置即可。

    private static void createDemo() {
         LoadingCache<Integer, String> guavaLoadingCache = CacheBuilder
                .newBuilder()
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer code) {
                        return getCityFromDb(code);
                    }
                });

         Cache<Integer, String> guavaCallableCache = CacheBuilder
                .newBuilder()
                .build();
        System.out.println(guavaLoadingCache.get(1));
        System.out.println(guavaCallableCache.get(1, () -> getCityFromDb(1)));
    }

    输出结果:
    shanghai 1667722957611
    shanghai 1667722958651
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    2.2 缓存刷新和过期策略
    缓存过期有两种策略,一种是在写之后一段时候过期(expireAfterWrite),另一种是在读或者写之后一段时间过期(expireAfterAccess),二者的区别在于一个是写,一个是读或者写。缓存刷新则是只有在写之后一段时候刷新(refreshAfterWrite)的策略,这里也介绍下removalListener方法,这个方法可以在缓存过期时执行一些操作,缓存刷新和过期的示例如下:

    @SneakyThrows
    private static void expireAndRefreshDemo() {
        LoadingCache<Integer, String> expireCache = CacheBuilder.newBuilder()
                .expireAfterAccess(1000, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        expireCache.put(1, "init value");
        System.out.println(expireCache.get(1));
        Thread.sleep(1500);
        System.out.println(expireCache.get(1));
        System.out.println("————————————————");

        LoadingCache<Integer, String> refreshCache = CacheBuilder.newBuilder()
                .refreshAfterWrite(1000, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        System.out.println(refreshCache.get(1));
        Thread.sleep(1500);
        System.out.println(refreshCache.get(1));
    }

    输出结果:
    init value
    the key 1 is removed
    shanghai 1667722979092
    ————————————————
    shanghai 1667722980096
    the key 1 is removed
    shanghai 1667722982599
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    上述示例中分别新建了两个缓存,缓存刷新和失效的时间均为1000ms,可以看到1500ms后缓存都会失效。注意,如果多个缓存过期策略同时使用,则会同时生效,示例如下:

    @SneakyThrows
    private static void expireDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .expireAfterWrite(1800, TimeUnit.MILLISECONDS)
                .expireAfterAccess(1300, TimeUnit.MILLISECONDS)
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        Thread.sleep(1000);
        System.out.println(cache.get(1));
        Thread.sleep(1000);
        System.out.println(cache.get(1));
        Thread.sleep(1500);
        System.out.println(cache.get(1));
    }

    输出结果:
    init value
    init value
    the key 1 is removed
    shanghai 1667723200491
    the key 1 is removed
    shanghai 1667723202999
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    可以看到,这里设置了两个缓存过期策略,分别是写后1800ms过期和获取后1300ms过期。然后每隔1000ms读取一次,可以看到2000ms时触发了写后1800ms过期策略。之后等待1500ms后再次读取,触发了获取后1300ms过期策略。后面单独介绍缓存失效和缓存刷新的区别,以及在高并发的场景下查询缓存如何避免缓存穿透。

    2.2 缓存过期方法
    主动使缓存过期的方法包括invalidate和invalidateAll,invalidate是使单个缓存过期,invalidateAll是使全部缓存过期,示例如下。

    @SneakyThrows
    private static void invalidateDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        cache.invalidate(1);
        System.out.println(cache.get(1));
        cache.put(2, "init value");
        cache.invalidateAll();
    }

    输出结果:
    init value
    the key 1 is removed
    shanghai 1667918329448
    the key 2 is removed
    the key 1 is removed
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Guava Cache 中还可以基于引用过期,包括weakKeys、weakValues和softValues。weakKeys的意思是清除key为,这几个引用需要慎用,示例如下。

    @SneakyThrows
    private static void invalidateDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .weakKeys()
                .weakValues()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(3, new String("init key"));
        System.gc();
        System.out.println(cache.get(1));
    }

    输出结果:
    the key 3 is removed
    shanghai 1667918601703
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    2.3 其他方法
    除了上述介绍的方法,Guava Cache 中还有很多其他有用的方法,接下来一一介绍。
    1、recordStats,该方法用于记录缓存命中的状态,便于我们更好的使用缓存,示例如下:

    @SneakyThrows
    private static void otherMethodDemo() {
        LoadingCache<Integer, String> cache = CacheBuilder.newBuilder()
                .recordStats()
                .removalListener(removalNotification ->
                        System.out.println("the key " + removalNotification.getKey() + " is removed"))
                .build(new CacheLoader<Integer, String>() {
                    @Override
                    public String load(Integer integer) {
                        return getCityFromDb(integer);
                    }
                });
        cache.put(1, "init value");
        System.out.println(cache.get(1));
        System.out.println(cache.get(2));
        System.out.println(cache.get(3));
        System.out.println(cache.stats());
    }

    输出结果:
    init value
    beijing 1667959350889
    shenzhen 1667959353893
    CacheStats{hitCount=1, missCount=2, loadSuccessCount=2, loadExceptionCount=0, totalLoadTime=6008055275, evictionCount=0}
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    可以看到这里缓存命中了一次(key=1),两次没有命中(key=2、3),加载成功两次(key=2、3)
    2、maximumSize,该方法用于设置缓存的最大容量,注意这里是个数,并不是内存大小。当缓存个数大于设置的最大容量时,则会使用LRU算法(通过concurrentHashMap和双向链表实现)来淘汰缓存。

    3、Caffine Cache 的使用
    Caffeine 是一款基于Java8开发的,高性能的的本地缓存库,出自于Benjamin Manes大神之手,大家熟悉的Spring4.3+和SpringBoot1.4+缓存的实现便是基于Caffeine。Caffeine 的缓存淘汰算法是一种对LRU和LFU进行了组合优化的算法,caffeine的用法与guava基本相同,有一些不同的方法。
    除了这两种初始化方式外,caffeine cache还提供了第三种初始化方式,异步加载方式

    赞同 0 反对 0

  • prtyaa 新手 2024-04-25 02:21:09

    在Java中,消息可以通过java.util.concurrent包中的BlockingQueue来处理,而缓存可以使用Guava CacheCaffeine Cache

    赞同 0 反对 0

    丫哈哈哈哈哈哈哈各个环节
唐朝新手
粉丝 0 发表 11 + 关注 私信
上周热门
统信系统能生成某一指定文件夹下的所有文件列表吗  2773
统信系统有自己的字幕屏幕保护程序吗  2533
统信软件能支持pandas吗  2456
统信系统如何在保证自己数据安全的基础上,进行数据分析,比如使用pandas进行大数据分析  2445
安全与发展,统信系统是如何在两者之间权衡,满足用户高效办公的要求  2104
统信系统的数据安全机制是什么  1770
统信软件支持python为什么不支持安装pandas  1115
您好,我想问一下,就是这银河麒麟系统,背景黑屏怎么办啊,其他的都正常,就是没有背景,设置里面的背景一点击系统就不响应了怎么解决?  178
我想连接共享打印机可是,搜索驱动时候没有,怎么办  162
uos有支持活体检测的软件吗  149
本周热议
麒麟系统登录输入密码后又需要重新输入密码,确定密码正确。如何处理? 12
求麒麟系统下的Broadcom 802.11n 无线网卡驱动 10
银河麒麟桌面操作系统V10 SP1安装应用时会反复提示安全授权认证,如何才能取消呢? 10
统信UOS系统下安装HP打印机驱动问题 10
银河麒麟系统登录时用户名是中文,如何将输入法切换成中文进行登录? 9
如何在统信系统使用VFP? 9
使用正版软件承诺书每年一签有相关的政策文件吗? 8
银河麒麟系统安装软件需要密码授权,单用户模式修改密码不行,如何解决 8
uos系统怎么装了向日葵,向日葵打不开啊? 7
有偿使用中国长城信创运维工程师(初级)证书 7

加入交流群

请使用微信扫一扫!