负载均衡 "频繁提交" 和 "全局锁"

Posted by zhida on February 2, 2017

负载均衡架构需要解决两个问题: 频繁提交全局锁

频繁提交,接口幂等性问题

独立主机: 内存锁

以往的处理方式: 内存锁

private static Map<String, Integer> order_sync = new HashMap<String, Integer>();

/**
 * 订单的同步处理
 * 
 * @param order_no
 * @param remove
 *            是否在内存队列中删除
 * @return 流程是否正常
 */
private synchronized boolean orderSync(String order_no, Boolean remove) {

    if (remove) {
        order_sync.remove(order_no);
        return true;
    }
    Integer s = order_sync.get(order_no);

    if (s == null) {
        order_sync.put(order_no, 99); // 处理中
        return true;
    } else {
        return false;
    }
}

使用方法:调用方法体:
{
    if (!this.orderSync(orderNo, false)) {
        return new Message<String>(OrderError.REPEAT.value, OrderError.REPEAT.alias);
    }   
    
    //TODO  业务处理

    this.orderSync(orderNo, true);     
}

分布式主机:redis锁

分布式集群的情况下,每个接口都需要做频繁提交的处理,做接口幂等性设计。

解决方案:

Node中间件

1、有状态 : user_id + ip + url
2、无状态 : ip + url

这种方式有一个小问题,没有对参数进行判断,幂等性的原则是:函数/接口可以使用相同的参数重复执行, 不应该影响系统状态, 也不会对系统造成改变。

实际的场景: 一个用户提现,一次提现100,一次提现500, 如果100的接口还没有返回,用户提现500被拒绝,在实际的场景中也是被允许的。

后台服务

对内存锁做修改,将内存锁修改为 redis锁,对关键的函数进行处理。细节:set NX (redis自带的锁)

@Resource(name = "redisTemplate")
private ValueOperations<String, String> redisString;

/**
 * 方法是否可以调用
 * 
 * @author zhidaliao
 * @param key
 * @return remove false代表第一次加锁 true代表释放锁
 */
public boolean atomProcessor(String key, boolean remove) {

    if (!remove) {
        boolean flag = true;
        try {

            flag = redisString.setIfAbsent(key, "true");
            redisString.getOperations().expire(key, 90, TimeUnit.SECONDS);   //要确保你的程序有事务支持,或者中间状态

        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e.getMessage());
            flag = false;
        }

        if (!flag) {
            logger.debug("atomProcessor false key:" + key);
        }

        return flag;
    } else {
        redisString.getOperations().delete(key);
        return true;
    }

}

全局锁

全局锁是为了让 synchronized 能够分布式化。 和内存锁这样的设计不同的是,全局锁需要阻塞(可以使用同步工具类实现阻塞),等待锁释放

思考一:

redis锁 + while循环

T1 -> getLock -> bussiness Code -> release Lock

T2 -> getLock - > while(判断上一个Key是否被释放,判断是否超时) -> delete Lock -> get Lock -> bussiness Code -> release Lock

T3 -> getLock - > while(判断上一个Key是否被释放,判断是否超时) -> delete Lock -> get Lock -> bussiness Code -> release Lock

弊端: T2 T3同时获取锁的时候,会造成 T3把T2的锁删除掉,然后获取锁。

思考二:

修改一下数据结构,改成Hash的格式。

T1 Set key ("createOrder","2017-01-01  10:10")

T2 while ("存在")
    检测时间 当前 "2017-01-01  10:30"
    Set key ("createOrder","2017-01-01  10:40")
    delete T1

T3 while ("存在")
    检测时间 当前 "2017-01-01  10:30"
    Get key ("createOrder") = "2017-01-01  10:40"
    wait();

思考三:

避免了线程冲突,但是没有实现线程优先请求,优先分配锁。可以继承Java的公平锁的实现方案处理。

在网上找了一个轮子:redisson 注意细节:有Java 1.7 和 Java 1.8 的版本

Redisson简单使用方法

pom.xml

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
    <version>1.5.2.RELEASE</version>
</dependency> 

<!-- 会导致需要过高JDK版本编译  消除依赖错误之后可删除 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-parent</artifactId>
    <version>1.5.2.RELEASE</version>
    <relativePath>../spring-boot-parent</relativePath>
</parent>

使用RedissonAutoConfiguration初始化

@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedissonAutoConfiguration {
    
    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
        
        Config config = new Config();
                
        //sentinel
        if (redisProperties.getSentinel() != null) {
            
            SentinelServersConfig sentinelServersConfig = config.useSentinelServers();
            sentinelServersConfig.setMasterName(redisProperties.getSentinel().getMaster());
            redisProperties.getSentinel().getNodes();
            sentinelServersConfig.addSentinelAddress(redisProperties.getSentinel().getNodes().split(","));
            sentinelServersConfig.setDatabase(redisProperties.getDatabase());
            if (redisProperties.getPassword() != null) {
                sentinelServersConfig.setPassword(redisProperties.getPassword());
            }
            
        } else { 
            
            
            
            //single server
            SingleServerConfig singleServerConfig = config.useSingleServer();
            // format as redis://127.0.0.1:7181 or rediss://127.0.0.1:7181 for SSL
            String schema = redisProperties.isSsl() ? "rediss://" : "redis://";
            singleServerConfig.setAddress(schema + redisProperties.getHost() + ":" + redisProperties.getPort());
            singleServerConfig.setDatabase(redisProperties.getDatabase());
            if (redisProperties.getPassword() != null) {
                singleServerConfig.setPassword(redisProperties.getPassword());
            }
        }
        
        return Redisson.create(config);
    }
}

调用全局锁

@Service
public class GlobalLock {

    @Autowired
    private RedissonClient redissonClient;

    private static final Logger logger = LoggerFactory.getLogger(GlobalLock.class);

    private final int tryTime = 60;
    private final int unLockTime = 40;

    /**
     * 是否释放锁 ,否则就加锁
     * 
     * @param key
     * @param flag
     * @return
     */
    public boolean unlockOrNot(String key, boolean flag) {

        RLock fairLock = redissonClient.getFairLock(key);

        boolean result = false;

        // 加锁
        if (!flag) {

            logger.debug("start Get lock ~");

            try {
                fairLock.lock(unLockTime,TimeUnit.SECONDS);
                result = fairLock.tryLock(tryTime, unLockTime, TimeUnit.SECONDS);
                if (result) {
                    logger.debug("get " + key + " succ");
                } else {
                    logger.debug("get " + key + " fail");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger.error(e.getMessage());
            }

        }

        // 释放锁
        else {

            fairLock.unlock();
            logger.debug("release " + key + " succ");
            result = true;
        }

        return result;
    }
}

延伸

在做分布式架构选型的时候,优先考虑的是Spring 社区的Cloud项目,netflix在分布式开源对Cloud项目有很大的贡献。 在做第一步发现服务的选型的时候,基于以上的考虑,在和Zookeeper做了对比,将Eureka作为发现服务的框架。

在做负载均衡的时候,业界比较主流的方案,是通过Zookeeper的内存文件去实现全局锁,但是如果引入了Zookeeper之后,既然自带了发现服务,那Eureka的发现服务就显得很鸡肋。

先介绍一下Zookeeper的一些大概设计,Zookeeper主要是基于内存目录结构,监听者的设计模式。

主要的数据结构,以树形的结构建立节点

image

我们看一下Zookeeper主要提供的服务:

名称服务 名称服务是将一个名称映射到与该名称有关联的一些信息的服务。电话目录是将人的名字映射到其电话号码的一个名称服务。同样,DNS 服务也是一个名称服务,它将一个域名映射到一个 IP 地址。在分布式系统中,您可能想跟踪哪些服务器或服务在运行,并通过名称查看其状态。ZooKeeper 暴露了一个简单的接口来完成此工作。也可以将名称服务扩展到组成员服务,这样就可以获得与正在查找其名称的实体有关联的组的信息。

锁定 为了允许在分布式系统中对共享资源进行有序的访问,可能需要实现分布式互斥(distributed mutexes)。ZooKeeper 提供一种简单的方式来实现它们。

同步 与互斥同时出现的是同步访问共享资源的需求。无论是实现一个生产者-消费者队列,还是实现一个障碍,ZooKeeper 都提供一个简单的接口来实现该操作。您可以在 Apache ZooKeeper 维基上查看示例,了解如何做到这一点(参阅 参考资料)。

配置管理 您可以使用 ZooKeeper 集中存储和管理分布式系统的配置。这意味着,所有新加入的节点都将在加入系统后就可以立即使用来自 ZooKeeper 的最新集中式配置。这还允许您通过其中一个 ZooKeeper 客户端更改集中式配置,集中地更改分布式系统的状态。

集群管理 分布式系统可能必须处理节点停机的问题,您可能想实现一个自动故障转移策略。ZooKeeper 通过领导者选举对此提供现成的支持。 如有多台 Server 组成一个服务集群,那么必须要一个“总管(Zookeeper)”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“总管”知道。

全局锁

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

配置管理:

将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中。

使用spring Cloud 的方案:

  • 名称服务: Eureka
  • 锁定: redisson
  • 同步: mq
  • 配置管理: spring config Or Ctrip config
  • 集群管理: 未知

因为集群管理暂时没有用到,所以最后抛弃了使用zookeeper。

参考网站

redisson

redisson教程