Redis大Key问题

2022年618大促期间,某电商平台的Redis集群突然出现大量慢查询,延迟从1ms飙升到500ms。

技术团队排查后发现:商品详情的缓存中有一个Hash类型的大Key,存储了100万个商品的详细信息,单个Key占用内存超过10GB。

大Key的过期删除触发了主从同步风暴,所有从节点被阻塞。

这次故障导致商品详情页加载超时30分钟,影响了约100万次用户访问。

【面试官手记】

Redis大Key是生产环境最常见的问题之一。我面试过的候选人里,能说清楚"大Key识别方法"的有50%,能说清楚"大Key拆分方案"的有30%,能说清楚"内存优化"的有20%。大Key的关键词是早发现 + 早拆分

一、大Key的识别标准 🔴

1.1 大Key定义

大Key定义:

Redis官方建议单个Key的大小不超过:
- String类型:< 10KB
- Hash/Set/List/ZSet:< 10,000个元素

生产环境常见的大Key类型:

1. String类型
   - 存储大JSON:商品详情、用户画像
   - 存储文件二进制:图片、音频
   - 单个Value可能达到几MB甚至几十MB

2. Hash类型
   - 字段数量过多:10万个字段
   - 单个Value过大:每个字段几KB
   - 总大小超过10GB

3. List类型
   - 列表长度过长:1000万个元素
   - 存储聊天记录、行为日志

4. Set/ZSet类型
   - 集合成员过多:500万个成员
   - 存储用户标签、好友关系

内存占用:
- 一个Key的内存占用超过1GB即为大Key
- 一个Key的成员数超过10万即为大Key

1.2 大Key识别方法

# 方法1:redis-cli --bigkeys(扫描大Key)
redis-cli -h 127.0.0.1 -p 6379 --bigkeys

# 输出:
# Scanning 50 million keys...
# -------- summary -------
# biggest string found 123456 keys | avg len 102400
# biggest   list found 789 keys | avg len 1024000
# biggest    set found 123 keys | avg len 10000
# biggest   hash found 456 keys | avg len 50000
# biggest   zset found 99 keys | avg len 200000

# 方法2:MEMORY USAGE(查看单个Key内存)
redis-cli
> MEMORY USAGE user:profile:123456
# (integer) 52428800  # 50MB

> MEMORY USAGE product:detail:10000
# (integer) 104857600  # 100MB
// 方法3:代码扫描大Key
@Component
public class BigKeyScanner {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 扫描所有大Key
     */
    public List<BigKeyInfo> scanBigKeys(long cursor, int count) {
        List<BigKeyInfo> result = new ArrayList<>();

        // 使用SCAN命令遍历
        ScanOptions options = ScanOptions.scanOptions()
            .count(count)
            .match("*")
            .build();

        try (Cursor<byte[]> cursor_obj = redisTemplate.getConnectionFactory()
            .getConnection().scan(options)) {

            while (cursor_obj.hasNext()) {
                byte[] key = cursor_obj.next();
                String keyStr = new String(key);

                // 分析Key类型
                DataType type = redisTemplate.type(keyStr);
                Long size = getKeySize(keyStr, type);

                if (size > 1024 * 1024) {  // > 1MB
                    result.add(new BigKeyInfo(keyStr, type, size));
                }
            }
        }

        return result;
    }

    private Long getKeySize(String key, DataType type) {
        switch (type) {
            case STRING:
                return redisTemplate.opsForValue().getOperations()
                    .getConnection().stringCommands().strlen(key.getBytes());
            case HASH:
                return redisTemplate.opsForHash().size(key);
            case LIST:
                return redisTemplate.opsForList().size(key);
            case SET:
                return redisTemplate.opsForSet().size(key);
            case ZSET:
                return redisTemplate.opsForZSet().zCard(key);
            default:
                return 0L;
        }
    }
}

二、大Key拆分方案 🔴

2.1 Hash大Key拆分

// Hash大Key拆分:按固定规则分桶
@Service
public class ProductCacheService {

    private static final int BUCKET_COUNT = 100;  // 分成100个桶

    /**
     * 拆分前:product:detail:10000 → 存储所有商品信息
     * 拆分后:product:detail:10000:0 ~ product:detail:10000:99
     */
    public void setProduct(Long productId, Map<String, String> productInfo) {
        int bucketCount = getBucketCount(productId);
        String bucketKey = getBucketKey(productId, bucketCount);
        redisTemplate.opsForHash().putAll(bucketKey, productInfo);
    }

    public Map<Object, Object> getProduct(Long productId) {
        Map<Object, Object> result = new HashMap<>();
        for (int i = 0; i < BUCKET_COUNT; i++) {
            String bucketKey = getBucketKey(productId, i);
            Map<Object, Object> bucket = redisTemplate.opsForHash().entries(bucketKey);
            result.putAll(bucket);
        }
        return result;
    }

    /**
     * 分桶规则:hash(productId) % 100
     */
    private int getBucketCount(Long productId) {
        return Math.abs(productId.hashCode() % BUCKET_COUNT);
    }

    private String getBucketKey(Long productId, int bucket) {
        return String.format("product:detail:%d:%d", productId, bucket);
    }
}

2.2 List大Key拆分

// List大Key拆分:按时间分桶
@Service
public class UserBehaviorService {

    private static final int PAGE_SIZE = 1000;

    /**
     * 拆分前:user:behavior:123456 → 存储所有行为
     * 拆分后:user:behavior:123456:2024-01 → 按月份分桶
     */
    public void addBehavior(Long userId, String behavior) {
        String key = getBehaviorKey(userId);
        redisTemplate.opsForList().rightPush(key, behavior);

        // 控制每个桶的大小
        Long size = redisTemplate.opsForList().size(key);
        if (size != null && size > PAGE_SIZE * 10) {
            // 触发归档或清理
            archiveOldBehaviors(key);
        }
    }

    public List<String> getBehaviors(Long userId, int page, int size) {
        String key = getBehaviorKey(userId);
        int start = page * size;
        int end = start + size - 1;
        return redisTemplate.opsForList().range(key, start, end);
    }

    private String getBehaviorKey(Long userId) {
        String month = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
        return String.format("user:behavior:%d:%s", userId, month);
    }
}

2.3 String大Key拆分

// String大Key拆分:分片存储
@Service
public class FileCacheService {

    private static final int SHARD_SIZE = 1024 * 1024;  // 每个分片1MB

    /**
     * 拆分前:file:content:12345 → 存储整个文件
     * 拆分后:file:content:12345:0 ~ file:content:12345:N
     */
    public void setFile(String fileId, byte[] content) {
        int shardCount = (content.length + SHARD_SIZE - 1) / SHARD_SIZE;

        // 存储分片数量
        redisTemplate.opsForValue().set(
            getMetaKey(fileId),
            String.valueOf(shardCount)
        );

        // 存储分片数据
        for (int i = 0; i < shardCount; i++) {
            int start = i * SHARD_SIZE;
            int end = Math.min(start + SHARD_SIZE, content.length);
            byte[] shard = Arrays.copyOfRange(content, start, end);

            String shardKey = getShardKey(fileId, i);
            redisTemplate.opsForValue().set(shardKey, shard);
        }
    }

    public byte[] getFile(String fileId) {
        // 获取分片数量
        String countStr = (String) redisTemplate.opsForValue().get(getMetaKey(fileId));
        if (countStr == null) return null;

        int shardCount = Integer.parseInt(countStr);
        byte[][] shards = new byte[shardCount][];

        for (int i = 0; i < shardCount; i++) {
            String shardKey = getShardKey(fileId, i);
            shards[i] = (byte[]) redisTemplate.opsForValue().get(shardKey);
        }

        // 合并分片
        int totalLength = Arrays.stream(shards).mapToInt(s -> s.length).sum();
        byte[] result = new byte[totalLength];
        int offset = 0;
        for (byte[] shard : shards) {
            System.arraycopy(shard, 0, result, offset, shard.length);
            offset += shard.length;
        }

        return result;
    }

    private String getMetaKey(String fileId) { return "file:meta:" + fileId; }
    private String getShardKey(String fileId, int shard) {
        return String.format("file:shard:%s:%d", fileId, shard);
    }
}

三、大Key内存优化 🟡

3.1 数据结构选择

数据结构选择优化:

1. String vs Hash
   - 存储少量字段用Hash:HSET/HGET按字段操作
   - 存储大量字段用String + JSON:避免大Hash
   - 避免使用HGETALL获取大Hash

2. 压缩存储
   - 大Value使用压缩:redisTemplate.set(GZIP压缩后的数据)
   - 节省50%-80%内存
   - 缺点:CPU开销增加

3. 选择合适的数据类型
   - 存布尔值用SET:占用1bit
   - 存计数用String:Redis优化了计数操作
   - 存集合用HyperLogLog:去重统计

3.2 内存压缩配置

// 启用内存压缩(Redis 4.0+)
// 在Redis配置文件中:
// activedefrag yes
// active-defrag-ignore-bytes 100mb
// active-defrag-threshold-lower 10

// Java中使用压缩
@Component
public class CompressedRedisService {

    @Autowired
    private RedisTemplate redisTemplate;

    private static final String COMPRESS_PREFIX = "zlib:";

    public void set(String key, String value) {
        try {
            // 超过1KB才压缩
            if (value.length() > 1024) {
                byte[] compressed = compress(value);
                redisTemplate.opsForValue().set(COMPRESS_PREFIX + key, compressed);
            } else {
                redisTemplate.opsForValue().set(key, value);
            }
        } catch (IOException e) {
            // 降级:不压缩
            redisTemplate.opsForValue().set(key, value);
        }
    }

    public String get(String key) {
        Object value = redisTemplate.opsForValue().get(COMPRESS_PREFIX + key);
        if (value instanceof byte[]) {
            try {
                return decompress((byte[]) value);
            } catch (IOException e) {
                return null;
            }
        }
        return (String) value;
    }

    private byte[] compress(String str) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
            gzip.write(str.getBytes(StandardCharsets.UTF_8));
        }
        return bos.toByteArray();
    }

    private String decompress(byte[] data) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        try (GZIPInputStream gzip = new GZIPInputStream(bis);
             ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
            byte[] buffer = new byte[1024];
            int len;
            while ((len = gzip.read(buffer)) != -1) {
                bos.write(buffer, 0, len);
            }
            return bos.toString(StandardCharsets.UTF_8);
        }
    }
}

四、大Key删除策略 🟡

4.1 渐进式删除

// 大Key删除:避免阻塞
@Component
public class SafeDeleteService {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 安全删除Hash大Key
     */
    public void deleteHash(String key) {
        Long size = redisTemplate.opsForHash().size(key);
        if (size == null || size < 10000) {
            redisTemplate.delete(key);
            return;
        }

        // 分批删除,每批1000个字段
        int batchSize = 1000;
        while (true) {
            // 删除1000个字段
            redisTemplate.opsForHash().delete(key,
                redisTemplate.opsForHash().keys(key).stream()
                    .limit(batchSize)
                    .toArray());

            // 检查是否删除完毕
            Long remaining = redisTemplate.opsForHash().size(key);
            if (remaining == null || remaining == 0) {
                redisTemplate.delete(key);
                break;
            }

            // 暂停一下,避免阻塞
            try { Thread.sleep(10); } catch (InterruptedException ignored) {}
        }
    }

    /**
     * 使用UNLINK代替DEL
     * UNLINK是异步删除,不会阻塞主线程
     */
    public void asyncDelete(String key) {
        redisTemplate.unlink(key);
    }
}

4.2 惰性删除

// 惰性删除:异步清理
@Component
public class LazyDeleteService {

    @Autowired
    private RedisTemplate redisTemplate;

    private final ExecutorService executor = Executors.newFixedThreadPool(4);

    /**
     * 惰性删除:访问时检查并删除过期Key
     */
    public String getAndCheck(String key) {
        String value = (String) redisTemplate.opsForValue().get(key);

        // 检查是否需要删除(比如Value过大)
        if (value != null && value.length() > 10 * 1024 * 1024) {
            // 异步删除
            executor.execute(() -> redisTemplate.delete(key));
        }

        return value;
    }

    /**
     * 后台定时扫描并删除大Key
     */
    @Scheduled(fixedRate = 3600000)  // 每小时
    public void cleanupBigKeys() {
        // 扫描超过1GB的Key
        ScanOptions options = ScanOptions.scanOptions()
            .count(1000)
            .match("*")
            .build();

        try (Cursor<byte[]> cursor = redisTemplate.getConnectionFactory()
            .getConnection().scan(options)) {

            while (cursor.hasNext()) {
                byte[] keyBytes = cursor.next();
                String key = new String(keyBytes);

                Long memUsage = redisTemplate.execute(
                    (RedisCallback<Long>) conn ->
                        conn.stringCommands().memoryUsage(keyBytes)
                );

                if (memUsage != null && memUsage > 1024 * 1024 * 1024) {
                    log.warn("发现大Key:{} 占用 {}MB", key, memUsage / 1024 / 1024);
                    // 异步删除
                    executor.execute(() -> redisTemplate.delete(key));
                }
            }
        }
    }
}

五、生产避坑 🟡

5.1 大Key的五大坑

坑1:一次获取整个大Hash

问题:HGETALL获取百万字段的Hash
场景:缓存商品详情用单个Hash存储
解决方案:
- 使用HSCAN分批获取
- 拆分大Hash为多个小Hash

坑2:大Key过期触发阻塞

问题:大Key过期时主从同步阻塞
场景:10GB的Hash同时过期
解决方案:
- 使用EXPIREAT设置过期时间
- 过期时间加随机偏移

坑3:DEL删除大Key阻塞

问题:DEL删除大Key导致Redis阻塞
场景:删除1000万个元素的List
解决方案:
- 使用UNLINK异步删除
- 分批渐进式删除

坑4:使用字符串存储大JSON

问题:String类型存储大JSON,更新需要整个读写
场景:用户画像存储
解决方案:
- 使用Hash按字段存储
- 或使用Memcached

坑5:没有监控大Key

问题:大Key慢慢长大,直到撑爆内存
场景:没有定期检查Key大小
解决方案:
- 定期扫描大Key
- 设置内存告警

5.2 大Key检查清单

开发规范:
- [ ] String类型不超过10KB
- [ ] Hash类型不超过1万个字段
- [ ] List类型不超过1万个元素
- [ ] 避免使用HGETALL

监控规范:
- [ ] 定期扫描大Key
- [ ] 设置内存使用率告警
- [ ] 监控BigKey数量变化

运维规范:
- [ ] 使用UNLINK删除
- [ ] 过期时间加随机偏移
- [ ] 定期整理内存碎片

六、真实面试回放 🟡

面试官:Redis大Key怎么识别和处理?

候选人(小张):识别方法:

一是redis-cli --bigkeys扫描。

二是MEMORY USAGE查看单个Key。

三是SCAN遍历+统计。

处理方案:

一是拆分。Hash按字段分桶,List按时间分桶。

二是压缩。大Value用GZIP压缩。

三是删除用UNLINK,不用DEL。

面试官:大Key过期会引发什么问题?

小张:主从同步风暴。

如果有主从架构,大Key过期时主节点会发送del命令给从节点,从节点如果也在同时读取这个Key,可能阻塞。

解决方案是过期时间加随机偏移,避免同时过期。

【面试官手记】

小张这场面试的亮点:

  1. 知道三种大Key识别方法

  2. 知道拆分和压缩方案

  3. 知道UNLINK异步删除

大Key是P6工程师必备知识点,能完整回答的候选人,说明有Redis实战经验。

Redis大Key的核心是早发现 + 早拆分。记住三个要点:

  1. 识别方法:--bigkeys扫描、MEMORY USAGE
  2. 拆分策略:Hash分桶、List分时间、String分片
  3. 删除策略:UNLINK异步删除、渐进式删除

大Key是Redis性能杀手,宁可提前拆分,不要事后救火。