在使用redisson消息订阅时,我针对门店商品库存减扣进行订阅的操作(在这里一个商品一个监听队列),当正式投入生产时,发现一直再报Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
的错误,索性根据提示翻了翻源码看看原因:
在redisson里先关注一个类:RedisPubSubConnection
该类继承自RedisConnection
,根据名字我们可知它是一个典型的发布与订阅的类。那么在redisson
使用时,会使用PubSubConnectionEntry
进行一次包装:
public class PubSubConnectionEntry { private final AtomicInteger subscribedChannelsAmount; private final RedisPubSubConnection conn; private final ConcurrentMapsubscribeChannelListeners = new ConcurrentHashMap (); private final ConcurrentMap >> channelListeners = new ConcurrentHashMap >>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } //.....省略其他代码 }
在这里我们可以看到其有一个比较重要的属性 subscribedChannelsAmount
,而这个值就是通过PublishSubscribeService
进行调用的:
private void connect(final Codec codec, final ChannelName channelName, final RPromisepromise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener ... listeners) { //.... RedisPubSubConnection conn = future.getNow(); final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); //.... }
那么此属性就是根据config的subscriptionsPerConnection
里设置的,那么此值就代表了每个连接的最大订阅数。当tryAcqcurie
的时候会减少这个数量:
public int tryAcquire() { while (true) { int value = subscribedChannelsAmount.get(); if (value == 0) { return -1; } if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { return value - 1; } } }
如果当此值为0时,那么会重新获取一个可用的连接,代码如下:
int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); subscribe(channelName, promise, type, lock, oldEntry, listeners); return; } if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release();
如果此时没有可用的连接的话,恐怕此次操作就会等待新的连接直至超时,超时了就报上述的错误了,不过根据提示。我们此时的解决办法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。当我们使用springboot时可以通过设置spring.redis.redisson.config
(具体设置请参考)来指定redisson的配置文件或者重新创建RedissonClient:
@Bean(destroyMethod = "shutdown") public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException { Config config = new Config(); String prefix = "redis://"; Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl"); if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) { prefix = "rediss://"; } config.useSingleServer() .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort()) .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在这里指定数目 .setDatabase(redisProperties.getDatabase()) .setPassword(redisProperties.getPassword()); return Redisson.create(config); }