From 4365f2e2316835440ea0441b3a05506dcfacc5c5 Mon Sep 17 00:00:00 2001 From: "Yangkai.Shen" <237497819@qq.com> Date: Fri, 28 Dec 2018 11:10:03 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20spring-boot-demo-zookeeper=20?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring-boot-demo-zookeeper/README.md | 461 ++++++++++++++++++ .../xkcoding/zookeeper/util/ZkLockHelper.java | 64 --- 2 files changed, 461 insertions(+), 64 deletions(-) create mode 100644 spring-boot-demo-zookeeper/README.md delete mode 100644 spring-boot-demo-zookeeper/src/main/java/com/xkcoding/zookeeper/util/ZkLockHelper.java diff --git a/spring-boot-demo-zookeeper/README.md b/spring-boot-demo-zookeeper/README.md new file mode 100644 index 0000000..ebe21ed --- /dev/null +++ b/spring-boot-demo-zookeeper/README.md @@ -0,0 +1,461 @@ +# spring-boot-demo-zookeeper + +> 此 demo 主要演示了如何使用 Spring Boot 集成 Zookeeper 实现分布式锁。 + +## pom.xml + +```xml + + + 4.0.0 + + spring-boot-demo-zookeeper + 1.0.0-SNAPSHOT + jar + + spring-boot-demo-zookeeper + Demo project for Spring Boot + + + com.xkcoding + spring-boot-demo + 1.0.0-SNAPSHOT + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-aop + + + + + + org.apache.curator + curator-recipes + 4.1.0 + + + + cn.hutool + hutool-all + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.projectlombok + lombok + true + + + + + spring-boot-demo-zookeeper + + + org.springframework.boot + spring-boot-maven-plugin + + + + + +``` + +## ZkProps.java + +```java +/** + *

+ * Zookeeper 配置项 + *

+ * + * @package: com.xkcoding.zookeeper.config.props + * @description: Zookeeper 配置项 + * @author: yangkai.shen + * @date: Created in 2018-12-27 14:47 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Data +@ConfigurationProperties(prefix = "zk") +public class ZkProps { + /** + * 连接地址 + */ + private String url; + + /** + * 超时时间(毫秒),默认1000 + */ + private int timeout = 1000; + + /** + * 重试次数,默认3 + */ + private int retry = 3; +} +``` + +## application.yml + +```yaml +server: + port: 8080 + servlet: + context-path: /demo + +zk: + url: 127.0.0.1:2181 + timeout: 1000 + retry: 3 +``` + +## ZkConfig.java + +```java +/** + *

+ * Zookeeper配置类 + *

+ * + * @package: com.xkcoding.zookeeper.config + * @description: Zookeeper配置类 + * @author: yangkai.shen + * @date: Created in 2018-12-27 14:45 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Configuration +@EnableConfigurationProperties(ZkProps.class) +public class ZkConfig { + private final ZkProps zkProps; + + @Autowired + public ZkConfig(ZkProps zkProps) { + this.zkProps = zkProps; + } + + @Bean + public CuratorFramework curatorFramework() { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProps.getTimeout(), zkProps.getRetry()); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkProps.getUrl(), retryPolicy); + client.start(); + return client; + } +} +``` + +## ZooLock.java + +> 分布式锁的关键注解 + +```java +/** + *

+ * 基于Zookeeper的分布式锁注解 + * 在需要加锁的方法上打上该注解后,AOP会帮助你统一管理这个方法的锁 + *

+ * + * @package: com.xkcoding.zookeeper.annotation + * @description: 基于Zookeeper的分布式锁注解,在需要加锁的方法上打上该注解后,AOP会帮助你统一管理这个方法的锁 + * @author: yangkai.shen + * @date: Created in 2018-12-27 14:11 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +public @interface ZooLock { + /** + * 分布式锁的键 + */ + String key(); + + /** + * 锁释放时间,默认五秒 + */ + long timeout() default 5 * 1000; + + /** + * 时间格式,默认:毫秒 + */ + TimeUnit timeUnit() default TimeUnit.MILLISECONDS; +} +``` + +## LockKeyParam.java + +> 分布式锁动态key的关键注解 + +```java +/** + *

+ * 分布式锁动态key注解,配置之后key的值会动态获取参数内容 + *

+ * + * @package: com.xkcoding.zookeeper.annotation + * @description: 分布式锁动态key注解,配置之后key的值会动态获取参数内容 + * @author: yangkai.shen + * @date: Created in 2018-12-27 14:17 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Target({ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +public @interface LockKeyParam { + /** + * 如果动态key在user对象中,那么就需要设置fields的值为user对象中的属性名可以为多个,基本类型则不需要设置该值 + *

例1:public void count(@LockKeyParam({"id"}) User user) + *

例2:public void count(@LockKeyParam({"id","userName"}) User user) + *

例3:public void count(@LockKeyParam String userId) + */ + String[] fields() default {}; +} +``` + +## ZooLockAspect.java + +> 分布式锁的关键部分 + +```java +/** + *

+ * 使用 aop 切面记录请求日志信息 + *

+ * + * @package: com.xkcoding.log.aop.aspectj + * @description: 使用 aop 切面记录请求日志信息 + * @author: yangkai.shen + * @date: Created in 2018/10/1 10:05 PM + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Aspect +@Component +@Slf4j +public class ZooLockAspect { + private final CuratorFramework zkClient; + + private static final String KEY_PREFIX = "DISTRIBUTED_LOCK_"; + + private static final String KEY_SEPARATOR = "/"; + + @Autowired + public ZooLockAspect(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + + /** + * 切入点 + */ + @Pointcut("@annotation(com.xkcoding.zookeeper.annotation.ZooLock)") + public void doLock() { + + } + + /** + * 环绕操作 + * + * @param point 切入点 + * @return 原方法返回值 + * @throws Throwable 异常信息 + */ + @Around("doLock()") + public Object around(ProceedingJoinPoint point) throws Throwable { + MethodSignature signature = (MethodSignature) point.getSignature(); + Method method = signature.getMethod(); + Object[] args = point.getArgs(); + ZooLock zooLock = method.getAnnotation(ZooLock.class); + if (StrUtil.isBlank(zooLock.key())) { + throw new RuntimeException("分布式锁键不能为空"); + } + String lockKey = buildLockKey(zooLock, method, args); + InterProcessMutex lock = new InterProcessMutex(zkClient, lockKey); + try { + // 假设上锁成功,以后拿到的都是 false + if (lock.acquire(zooLock.timeout(), zooLock.timeUnit())) { + return point.proceed(); + } else { + throw new RuntimeException("请勿重复提交"); + } + } finally { + lock.release(); + } + } + + /** + * 构造分布式锁的键 + * + * @param lock 注解 + * @param method 注解标记的方法 + * @param args 方法上的参数 + * @return + * @throws NoSuchFieldException + * @throws IllegalAccessException + */ + private String buildLockKey(ZooLock lock, Method method, Object[] args) throws NoSuchFieldException, IllegalAccessException { + StringBuilder key = new StringBuilder(KEY_SEPARATOR + KEY_PREFIX + lock.key()); + + // 迭代全部参数的注解,根据使用LockKeyParam的注解的参数所在的下标,来获取args中对应下标的参数值拼接到前半部分key上 + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + + for (int i = 0; i < parameterAnnotations.length; i++) { + // 循环该参数全部注解 + for (Annotation annotation : parameterAnnotations[i]) { + // 注解不是 @LockKeyParam + if (!annotation.annotationType().isInstance(LockKeyParam.class)) { + continue; + } + + // 获取所有fields + String[] fields = ((LockKeyParam) annotation).fields(); + if (ArrayUtil.isEmpty(fields)) { + // 普通数据类型直接拼接 + if (ObjectUtil.isNull(args[i])) { + throw new RuntimeException("动态参数不能为null"); + } + key.append(KEY_SEPARATOR).append(args[i]); + } else { + // @LockKeyParam的fields值不为null,所以当前参数应该是对象类型 + for (String field : fields) { + Class clazz = args[i].getClass(); + Field declaredField = clazz.getDeclaredField(field); + declaredField.setAccessible(true); + Object value = declaredField.get(clazz); + key.append(KEY_SEPARATOR).append(value); + } + } + } + } + return key.toString(); + } + +} +``` + +## SpringBootDemoZookeeperApplicationTests.java + +> 测试分布式锁 + +```java +@RunWith(SpringRunner.class) +@SpringBootTest +@Slf4j +public class SpringBootDemoZookeeperApplicationTests { + + public Integer getCount() { + return count; + } + + private Integer count = 10000; + private ExecutorService executorService = Executors.newFixedThreadPool(1000); + + @Autowired + private CuratorFramework zkClient; + + /** + * 不使用分布式锁,程序结束查看count的值是否为0 + */ + @Test + public void test() throws InterruptedException { + IntStream.range(0, 10000).forEach(i -> executorService.execute(this::doBuy)); + TimeUnit.MINUTES.sleep(1); + log.error("count值为{}", count); + } + + /** + * 测试AOP分布式锁 + */ + @Test + public void testAopLock() throws InterruptedException { + // 测试类中使用AOP需要手动代理 + SpringBootDemoZookeeperApplicationTests target = new SpringBootDemoZookeeperApplicationTests(); + AspectJProxyFactory factory = new AspectJProxyFactory(target); + ZooLockAspect aspect = new ZooLockAspect(zkClient); + factory.addAspect(aspect); + SpringBootDemoZookeeperApplicationTests proxy = factory.getProxy(); + IntStream.range(0, 10000).forEach(i -> executorService.execute(() -> proxy.aopBuy(i))); + TimeUnit.MINUTES.sleep(1); + log.error("count值为{}", proxy.getCount()); + } + + /** + * 测试手动加锁 + */ + @Test + public void testManualLock() throws InterruptedException { + IntStream.range(0, 10000).forEach(i -> executorService.execute(this::manualBuy)); + TimeUnit.MINUTES.sleep(1); + log.error("count值为{}", count); + } + + @ZooLock(key = "buy", timeout = 1, timeUnit = TimeUnit.MINUTES) + public void aopBuy(int userId) { + log.info("{} 正在出库。。。", userId); + doBuy(); + log.info("{} 扣库存成功。。。", userId); + } + + public void manualBuy() { + String lockPath = "/buy"; + log.info("try to buy sth."); + try { + InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath); + try { + if (lock.acquire(1, TimeUnit.MINUTES)) { + doBuy(); + log.info("buy successfully!"); + } + } finally { + lock.release(); + } + } catch (Exception e) { + log.error("zk error"); + } + } + + public void doBuy() { + count--; + log.info("count值为{}", count); + } + +} +``` + +## 参考 + +1. [如何在测试类中使用 AOP](https://stackoverflow.com/questions/11436600/unit-testing-spring-around-aop-methods) +2. zookeeper 实现分布式锁:《Spring Boot 2精髓 从构建小系统到架构分布式大系统》李家智 - 第16章 - Spring Boot 和 Zoo Keeper - 16.3 实现分布式锁 \ No newline at end of file diff --git a/spring-boot-demo-zookeeper/src/main/java/com/xkcoding/zookeeper/util/ZkLockHelper.java b/spring-boot-demo-zookeeper/src/main/java/com/xkcoding/zookeeper/util/ZkLockHelper.java deleted file mode 100644 index c7cacda..0000000 --- a/spring-boot-demo-zookeeper/src/main/java/com/xkcoding/zookeeper/util/ZkLockHelper.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.xkcoding.zookeeper.util; - -import lombok.extern.slf4j.Slf4j; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.concurrent.TimeUnit; - -/** - *

- * Zookeeper分布式锁工具类 - *

- * - * @package: com.xkcoding.zookeeper.util - * @description: Zookeeper分布式锁工具类 - * @author: yangkai.shen - * @date: Created in 2018-12-27 14:55 - * @copyright: Copyright (c) 2018 - * @version: V1.0 - * @modified: yangkai.shen - */ -@Slf4j -@Component -public class ZkLockHelper { - private final CuratorFramework zkClient; - - @Autowired - public ZkLockHelper(CuratorFramework zkClient) { - this.zkClient = zkClient; - } - - /** - * 尝试获取锁 - * - * @param key 锁的键 - * @param timeout 超时时间 - * @param unit 单位 - * @return 是否获取锁 - */ - public boolean tryLock(String key, long timeout, TimeUnit unit) { - InterProcessMutex lock = new InterProcessMutex(zkClient, key); - try { - return lock.acquire(timeout, unit); - } catch (Exception e) { - throw new RuntimeException("系统异常"); - } - } - - /** - * 释放锁 - * - * @param key 锁的键 - */ - public void unLock(String key) { - InterProcessMutex lock = new InterProcessMutex(zkClient, key); - try { - lock.release(); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } -}