博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用redisson时关于订阅数的问题
阅读量:6868 次
发布时间:2019-06-26

本文共 4125 字,大约阅读时间需要 13 分钟。

在使用redisson消息订阅时,我针对门店商品库存减扣进行订阅的操作(在这里一个商品一个监听队列),当正式投入生产时,发现一直再报Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.的错误,索性根据提示翻了翻源码看看原因:

在redisson里先关注一个类:RedisPubSubConnection该类继承自RedisConnection,根据名字我们可知它是一个典型的发布与订阅的类。那么在redisson使用时,会使用PubSubConnectionEntry进行一次包装:

public class PubSubConnectionEntry {            private final AtomicInteger subscribedChannelsAmount;        private final RedisPubSubConnection conn;            private final ConcurrentMap
subscribeChannelListeners = new ConcurrentHashMap
(); private final ConcurrentMap
>> channelListeners = new ConcurrentHashMap
>>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } //.....省略其他代码 }

在这里我们可以看到其有一个比较重要的属性 subscribedChannelsAmount,而这个值就是通过PublishSubscribeService进行调用的:

private void connect(final Codec codec, final ChannelName channelName,                final RPromise
promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener
... listeners) { //.... RedisPubSubConnection conn = future.getNow(); final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); //.... }

那么此属性就是根据config的subscriptionsPerConnection里设置的,那么此值就代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

public int tryAcquire() {            while (true) {                int value = subscribedChannelsAmount.get();                if (value == 0) {                    return -1;                }                                if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {                    return value - 1;                }            }        }

如果当此值为0时,那么会重新获取一个可用的连接,代码如下:

int remainFreeAmount = freeEntry.tryAcquire();                    if (remainFreeAmount == -1) {                        throw new IllegalStateException();                    }                                        final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);                    if (oldEntry != null) {                        freeEntry.release();                        freePubSubLock.release();                        subscribe(channelName, promise, type, lock, oldEntry, listeners);                        return;                    }                                        if (remainFreeAmount == 0) {                        freePubSubConnections.poll();                    }                    freePubSubLock.release();

如果此时没有可用的连接的话,恐怕此次操作就会等待新的连接直至超时,超时了就报上述的错误了,不过根据提示。我们此时的解决办法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。当我们使用springboot时可以通过设置spring.redis.redisson.config(具体设置请参考)来指定redisson的配置文件或者重新创建RedissonClient:

@Bean(destroyMethod = "shutdown")            public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException {                    Config config = new Config();                String prefix = "redis://";                Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");                if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {                    prefix = "rediss://";                }                    config.useSingleServer()                        .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())                        .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在这里指定数目                        .setDatabase(redisProperties.getDatabase())                        .setPassword(redisProperties.getPassword());                    return Redisson.create(config);            }

转载于:https://www.cnblogs.com/niechen/p/10845459.html

你可能感兴趣的文章
oracle 表空间总结
查看>>
NYOJ260数数小木块
查看>>
Android 数据存储
查看>>
CTreeCtl的使用
查看>>
不错的网站链接
查看>>
POJ 1742
查看>>
post方法
查看>>
21、ActionBar & Notification
查看>>
步步为营:Asp.net 通用数据容器的缺陷
查看>>
判断整除(动态规划,递推)
查看>>
题解 P1004 【方格取数】
查看>>
【OCP-052】052最新考试题库分析整理-第7题
查看>>
vuex相关(actions和mutation的异曲同工)
查看>>
Linux常用命令总结
查看>>
即时通讯软件的发展演变
查看>>
java基础总结
查看>>
算法复杂度
查看>>
Jsonlib 属性过滤器
查看>>
List 去重
查看>>
Android性能优化之内存优化练习
查看>>