[译]Hystrix的使用和分析

Hystrix入门手册和特性讲解

Posted by zhidaliao on February 2, 2017

根据 这篇文章 进行翻译,并做了一些改动方便理解,同时基于文章的代码块创建了测试项目 hystrix_demo ,推送到 github 方便大家学习.

转载请注明出处 来源:paraller’s blog

为什么要使用 Hystrix ?

在中大型的分布式系统中,一般都会有很多依赖系统(HTTP, Hession, Netty, Dubbo 等其他进行通信)

在高并发访问的情况下,依赖系统的稳定性问题不是很严重,但是会存在很多无法预测的问题: 比如网络连接缓慢、繁忙,服务临时不可用,离线等。

下图展示的是 50 QPS 的场景图,依赖系统I在这样的流量下已经不可用了,但其他的依赖服务仍然可用

当I服务被阻塞,服务的大部分线程都会被阻塞,这样会造成整个线上的服务没有线程资源可用,如下图所示:

一个复杂的分布式程序会存在很多依赖系统,上面的情况一直持续必然会在某个时刻导致程序崩溃,在高并发的情况下如果依赖系统没有相应的隔离措施,应用程序就一定会存在崩溃的风险

举个例子:一个依赖30个SOA服务的程序,每个服务的可用率在 99.99%, 30个服务的可用性就是 99.7%, 0.3%的故障率意味着1000次请求就有3次是失败的,依赖的服务越多,程序的不稳定因素就会呈几何级的增长。

这个问题的解决方案如下:对依赖系统进行资源隔离,Hystrix是一个 依赖服务资源隔离框架, 但它也能帮助我们进行服务管理和监控

Netflix 作为开发者已经有成功的使用案例,规模如下:

Netflix API 使用线程隔离方案每天处理 10+ billion HystrixCommand

每一个 API 实例拥有 40+ 的线程池 ,每个线程池有5-20条线程

Hystrix怎样使用隔离解决依赖系统可能产生的问题

  1. Hystrix 使用 HystrixCommand 命令模式来封装 依赖的逻辑调用,在强制授权下,每一个命令分别使用单独的线程/信号执行
  2. 可以设置调用第三方接口的超时时间,出现超时情况,直接返回开发者自己实现的 fallback 逻辑
  3. 设置了独立线程池的依赖系统,当线程池没有空闲线程的情况下,对请求直接拒绝处理,在设置的时间内快速失败
  4. 对依赖系统的调用,有如下几种结果:成功、失败(包括抛出异常)、超时、线程池拒绝、熔断。 请求失败的情况下(异常、拒绝、超时、熔断),会出现降级操作(fallback 逻辑)
  5. 提供了熔断组件,可以手动和自动调用,在一段时间内(10 seconds)停止当前的依赖系统调用,当超过了设置的错误次数 threshold 的 50%,会自动触发。
  6. 提供几乎实时的依赖调用数据和监控

Hystrix依赖的隔离架构如下图所示:

如何使用 Hystrix

1、Hystrix依赖包的介绍: maven 模式
<!-- Rely on the version:>
<hystrix.version>1.3.16</hystrix.version>
<hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version> 
 
<dependency>
     <groupId>com.netflix.hystrix</groupId>
     <artifactId>hystrix-core</artifactId>
     <version>${hystrix.version}</version>
 </dependency>
     <dependency>
     <groupId>com.netflix.hystrix</groupId>
     <artifactId>hystrix-metrics-event-stream</artifactId>
     <version>${hystrix-metrics-event-stream.version}</version>
 </dependency>
<!-- Warehouse address -->
<repository>
     <id>nexus</id>
     <name>local private nexus</name>
     <url> </url>
     <releases>
          <enabled>true</enabled>
     </releases>
     <snapshots>
          <enabled>false</enabled>
     </snapshots>
</repository>
2、使用 命令模式 包装依赖逻辑

默认的command 超时时间为 1000毫秒

public class HelloWorldCommand extends HystrixCommand<String> {
    
    private final String name;
    public HelloWorldCommand(String name) {
        //至少要指定一个 command group name 的值(CommandGroup)
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        // 在run () 方法中包括实际的业务逻辑
        return "Hello " + name +" thread:" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception{

        //每一个 Command object 只能被调用一次
        //重复调用会抛出这样的异常信息: This instance can only be executed once. Please instantiate a new instance.
        HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
        
        // 同步的调用代码,等价于: helloWorldCommand.queue().get(); 
        String result = helloWorldCommand.execute();
        System.out.println("result=" + result);
 
		/* 异步的代码调用 */
        helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
        Future<String> future = helloWorldCommand.queue();
        //The get 方法不能超过command 定义的 timeout时间, 默认是: 1
        result = future.get(100, TimeUnit.MILLISECONDS);
        System.out.println("result=" + result);
        System.out.println("mainThread=" + Thread.currentThread().getName());
    }
     
}

output

// result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1
// result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2
// mainThread=main

异步的调用使用的是 command.queueget (timeout, TimeUnit.MILLISECONDS)

同步的调用使用的是 command.execute (command.queue) 等价于 the queue().get();

3: registered asynchronous event callback execution

observe() 方法会开启一个线程执行 run() 函数,执行结束之后会调用 call 方法或者完整的生命周期方法

// 注册 观察者事件拦截器 
Observable<String> fs = new HelloWorldCommand("World").observe();
// 观察者订阅事件
fs.subscribe(new Action1<String>() {
    @Override
    public void call(String result) {
         //The execution results of treatment, result HelloWorldCommand returns a result
        //The user to do two times the processing of results.
    }
});

// 注册完整的生命周期事件
fs.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("execute onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("onError " + e.getMessage());
                e.printStackTrace();
            }
            @Override
            public void onNext(String v) {
                System.out.println("onNext: " + v);
            }
        });

/* The operation results
call execute result = Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
execute onCompleted
*/
使用Fallback (4:) 提供降级策略

示例图:

public class FallbackDemo extends HystrixCommand<String> {

	private final String name;

	public FallbackDemo(String name) {
		super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
		.andCommandPropertiesDefaults(
				HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
		this.name = name;
	}
	
	@Override
	protected String getFallback() {
	    return "exeucute Falled";
	}
	
	@Override
	protected String run()  {
	    try {
			TimeUnit.MILLISECONDS.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	    System.out.println("被打断调用getFallback() 后依然会执行");
	    return "Hello " + name +" thread:" + Thread.currentThread().getName();
	}

	
	public static void main(String[] args) throws Exception {
		FallbackDemo command = new FallbackDemo("test-Fallback");
		String result = command.execute();
		System.out.println(result);
	}
}

output:

// exeucute Falled
// java.lang.InterruptedException: sleep interrupted
// 被打断调用getFallback() 后依然会执行

NOTE: 除了 HystrixBadRequestException 异常情况, run方法抛出的所有异常都被视为 failures, 会触发 降级逻辑 和 熔断器逻辑

HystrixBadRequestException 被用在错误的入参或者 or 非系统级的异常,不会触发降级操作

5: relies on named: CommandKey
public HelloWorldCommand(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            /* HystrixCommandKey factory defined dependent name */
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
    this.name = name;
}

NOTE: 每一个 CommandKey 代表一个抽象依赖,相同的依赖系统必须使用一致的 CommandKey ,基本的依赖隔离取决于一致的 CommandKey

6: dependent group: CommandGroup
//The use of HystrixCommandGroupKey factory defined
public HelloWorldCommand(String name) {
    Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
}

NOTE: CommandGroup 是必须的参数; 在没有指定ThreadPoolKey 参数的情况下,它主要是用来区分不同的thread pool/signal

7: thread pool / signal: ThreadPoolKey
public HelloWorldCommand(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
            /* The use of HystrixThreadPoolKey factory defined thread pool name*/
            .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
    this.name = name;
}

NOTE:使用CommandGroup,在一样的业务依赖系统的隔离环境下,作了差异化处理; 比如依赖了相同的第三方服务,但是你们的调用方式不一致,你可以使用 HystrixThreadPoolKey 来区分两种不同的调用方式环境。

在相同的 CommandGroup 中还想进一步的对资源隔离作区分,可以使用 HystrixThreadPoolKey

8: request cache Request-Cache

需要自定义 getCacheKey() 方法,默认的方法返回null,没有办法使用缓存 ; 条件符合的情况下 run() 业务逻辑只会被调用一次

public class RequestCacheCommand extends HystrixCommand<String> {
	private final int id;

	public RequestCacheCommand(int id) {
		super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
		this.id = id;
	}

	@Override
	protected String run() throws Exception {
		System.out.println(Thread.currentThread().getName() + " execute id=" + id);
		return "executed=" + id;
	}

	@Override
	protected String getCacheKey() {
		return String.valueOf(id);
	}

	public static void main(String[] args) {
		HystrixRequestContext context = HystrixRequestContext.initializeContext();
		try {
			RequestCacheCommand command2a = new RequestCacheCommand(2);
			RequestCacheCommand command2b = new RequestCacheCommand(2);

			System.out.println(command2a.execute());
			System.out.println(command2a.isResponseFromCache());
			
			System.out.println(command2b.execute() );
			System.out.println(command2b.isResponseFromCache());
		} finally {
			context.shutdown();
		}
		context = HystrixRequestContext.initializeContext();
		try {
			RequestCacheCommand command3b = new RequestCacheCommand(2);
			System.out.println(command3b.execute());
			System.out.println(command3b.isResponseFromCache());
		} finally {
			context.shutdown();
		}
	}
}

output:

hystrix-RequestCacheCommand-1 execute id=2
executed=2
false
executed=2
true
hystrix-RequestCacheCommand-2 execute id=2
executed=2
false

NOTE: 请求缓存可以让 (CommandKey/CommandGroup) 一致的情况下,共享调用结果,减少调用的次数,在高并发的场景中能够提高性能

转载请注明出处 来源:paraller’s blog

9: semaphore isolation: SEMAPHORE

独立的本地代码或者能够快速返回的远程调用(such as memcached, redis),可以直接使用信号量。以减少过度的使用 thread isolation;(没有必要耗费时间和性能创建线程,执行简单的业务)

public class SemaphoreDemo extends HystrixCommand<String> {

	private final String name;

	public SemaphoreDemo(String name) {
		super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
				.andCommandPropertiesDefaults(
						HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
								HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));
		this.name = name;
	}

	@Override
	protected String run() throws Exception {
		return "HystrixThread:" + Thread.currentThread().getName();
	}

	public static void main(String[] args) throws Exception {
		SemaphoreDemo command = new SemaphoreDemo("semaphore");
		String result = command.execute();
		System.out.println(result);
		System.out.println("MainThread:" + Thread.currentThread().getName());
	}
}

output:

HystrixThread:main
MainThread:main
降级方法中关联 logical command

10:fallback degradation of nested logical command

示意图如下:

应用场景:调用网络访问的时候的降级操作,比如访问第三方服务,失败了之后在降级服务中获取缓存

public class CommandWithFallbackViaNetworkDemo {
	public static void main(String[] args) {
		CommandWithFallbackViaNetwork commandWithFallbackViaNetwork = new CommandWithFallbackViaNetwork(1);
		String result = commandWithFallbackViaNetwork.execute();
		System.out.println("result is " + result);

	}
}

class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
	private final int id;

	protected CommandWithFallbackViaNetwork(int id) {
		super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX")).andCommandKey(
				HystrixCommandKey.Factory.asKey("GetValueCommand")));
		this.id = id;
	}

	@Override
	protected String run() {
		System.out.println(" -----run------ ");
		throw new RuntimeException("force failure for example");
	}

	@Override
	protected String getFallback() {
		return new FallbackViaNetwork(id).execute();
	}

	private static class FallbackViaNetwork extends HystrixCommand<String> {
		private final int id;

		public FallbackViaNetwork(int id) {
			super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
					.andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
					.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
			this.id = id;
		}

		@Override
		protected String run() {
			System.out.println("run fallback id:" + id);
			return "fallback result:" + id;
		}

		@Override
		protected String getFallback() {
			return null;
		}
	}
}

output

run fallback id:1
result is fallback result:1

NOTE:依赖调用,降级调用要使用隔离的 线程池,避免上一级的线程池被消耗完,影响到级联的调用

调用不同的逻辑操作,展示如何降级服务

11: show called fallback logic, for special service processing

如下图:

public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
	
	private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty(
			"primarySecondary.usePrimary", true);
	
	private final int id;

	public CommandFacadeWithPrimarySecondary(int id) {
		super(Setter
				.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
				.andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
				.andCommandPropertiesDefaults(
						HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
								ExecutionIsolationStrategy.SEMAPHORE)));
		this.id = id;
	}

	@Override
	protected String run() {
		if (usePrimary.get()) {
			return new PrimaryCommand(id).execute();
		} else {
			return new SecondaryCommand(id).execute();
		}
	}

	@Override
	protected String getFallback() {
		return "static-fallback-" + id;
	}

	@Override
	protected String getCacheKey() {
		return String.valueOf(id);
	}

	private static class PrimaryCommand extends HystrixCommand<String> {
		private final int id;

		private PrimaryCommand(int id) {
			super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
					.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
					.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
					.andCommandPropertiesDefaults(
							HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(600)));
			this.id = id;
		}

		@Override
		protected String run() {
			return "responseFromPrimary-" + id;
		}
	}

	private static class SecondaryCommand extends HystrixCommand<String> {
		private final int id;

		private SecondaryCommand(int id) {
			super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
					.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
					.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
					.andCommandPropertiesDefaults(
							HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(100)));
			this.id = id;
		}

		@Override
		protected String run() {
			// perform fast 'secondary' service call
			return "responseFromSecondary-" + id;
		}
	}

	public static class UnitTest {
		@Test
		public void testPrimary() {
			HystrixRequestContext context = HystrixRequestContext.initializeContext();
			try {
				ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
				System.out.println(new CommandFacadeWithPrimarySecondary(20).execute());
			} finally {
				context.shutdown();
				ConfigurationManager.getConfigInstance().clear();
			}
		}

		@Test
		public void testSecondary() {
			HystrixRequestContext context = HystrixRequestContext.initializeContext();
			try {
				ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
				System.out.println(  new CommandFacadeWithPrimarySecondary(20).execute());
			} finally {
				context.shutdown();
				ConfigurationManager.getConfigInstance().clear();
			}
		}
	}
}

output

responseFromPrimary-20
responseFromSecondary-20

NOTE:在一些场景中特定的需求,展示降级调用是合适的,对于业务进程的回退,回退操作是没有降级义务的,在使用中要谨慎,会导致监控错误或数据混乱。(翻译的不好,请参照英文)

NOTE: display call relegation is applicable to the special needs of the scene, fallback for business processing, fallback assumes no relegation duties, recommend caution in use, will cause the problem of monitoring and statistical chaos.

The 12:合并命令模式的调用:HystrixCollapser; command call merged: HystrixCollapser

命令调用的合并允许多个请求,在线程/信号中批量的被处理。如图所示:

public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

	private final Integer key;

	public CommandCollapserGetValueForKey(Integer key) {
		this.key = key;
	}

	@Override
	public Integer getRequestArgument() {
		return key;
	}

	@Override
	protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
		return new BatchCommand(requests);
	}

	@Override
	protected void mapResponseToRequests(List<String> batchResponse,
			Collection<CollapsedRequest<String, Integer>> requests) {
		int count = 0;
		for (CollapsedRequest<String, Integer> request : requests) {
			request.setResponse(batchResponse.get(count++));
		}
	}

	private static final class BatchCommand extends HystrixCommand<List<String>> {
		private final Collection<CollapsedRequest<String, Integer>> requests;

		private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
			super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).andCommandKey(
					HystrixCommandKey.Factory.asKey("GetValueForKey")));
			this.requests = requests;
		}

		@Override
		protected List<String> run() {
			ArrayList<String> response = new ArrayList<String>();
			for (CollapsedRequest<String, Integer> request : requests) {
				response.add("ValueForKey: " + request.getArgument());
			}
			return response;
		}
	}

	static class UnitTest1 {

		public static void main(String[] args) {

			HystrixRequestContext context = HystrixRequestContext.initializeContext();

			Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
			Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
			Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
			Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();

			try {
				System.out.println(f1.get());
				System.out.println(f2.get());
				System.out.println(f3.get());
				System.out.println(f4.get());
				System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
				
				HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands()		
						.toArray(new HystrixCommand<?>[1])[0];
				System.out.println(command.getCommandKey().name());
				System.out.println(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
				System.out.println(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));

			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			} finally {
				context.shutdown();
			}

		}

	}

}

output:

ValueForKey: 1
ValueForKey: 2
ValueForKey: 3
ValueForKey: 4
2
GetValueForKey
true
true

NOTE:适用场景:HystrixCollapser 适用于将大量的调用相同服务的请求合并在一个线程中,使用一个连接减少线程上下文交互的时间和IO次数,但是要确保调用的都是同一个服务

Four:使用 Hystrix-dashboard 构建 监控平台

1: monitoring dashboard introduction 监控平台介绍

监控界面可以提供依赖系统服务的实时的监控(通过Key区分):如下图所示:

An instance of 2: 开放 命令 数据

Hystrix 在当前的JVM中,所有的命令调用都使用 Servlet 输出数据流 配置如下:

<servlet>
    <display-name>HystrixMetricsStreamServlet</display-name>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
</servlet>
<servlet-mapping>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <url-pattern>/hystrix.stream</url-pattern>
</servlet-mapping>
<!-- 
    The corresponding URL format: http://hostname:port/application/hystrix.stream
-->

To build the 3: cluster model monitoring statistics 监控数据的集群模式

1) 使用 Turbine 组件 聚合集群数据

架构图如下:

2) 内嵌的 jetty 提供 Servlet 容器, 展示 HystrixMetrics 信息
public class JettyServer {
	
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private int port;
    private ExecutorService executorService = Executors.newFixedThreadPool(1);
    private Server server = null;
    public void init() {
        try {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        server = new Server(8080);
                        WebAppContext context = new WebAppContext();
                        context.setContextPath("/");
                        context.addServlet(HystrixMetricsStreamServlet.class, "/hystrix.stream");
                        context.setResourceBase(".");
                        server.setHandler(context);
                        server.start();
                        server.join();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            });
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
    public void destory() {
        if (server != null) {
            try {
                server.stop();
                server.destroy();
                logger.warn("jettyServer stop and destroy!");
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
    
    public static void main(String[] args) {
    	JettyServer js = new JettyServer();
    	js.init();
	}
} 
3) Turbine 步骤和配置信息

A: configuration Turbine Servlet collector

<servlet>
   <description></description>
   <display-name>TurbineStreamServlet</display-name>
   <servlet-name>TurbineStreamServlet</servlet-name>
   <servlet-class>com.netflix.turbine.streaming.servlet.TurbineStreamServlet</servlet-class>
 </servlet>
 <servlet-mapping>
   <servlet-name>TurbineStreamServlet</servlet-name>
   <url-pattern>/turbine.stream</url-pattern>
 </servlet-mapping>

B: 在config.properties 文件中配置集群实例

#Configuration two clusters: mobil-online,ugc-online
turbine.aggregator.clusterConfig=mobil-online,ugc-online
#The configuration of mobil-online cluster instance
turbine.ConfigPropertyBasedDiscovery.mobil-online.instances=10.10.34.11,10.10.34.12,10.10.34.13,10.10.34.14,10.10.34.15,10.10.34.16,10.16.14.52,10.16.14.53,10.16.14.54,10.16.14.55
#The configuration of mobil-online data stream Servlet
turbine.instanceUrlSuffix.mobil-online=:8080/hystrix.stream
#The configuration of ugc-online cluster instance
turbine.ConfigPropertyBasedDiscovery.ugc-online.instances=10.10.34.198,10.10.52.231,10.10.52.245,10.10.52.246
#The configuration of ugc-online data stream Servlet
turbine.instanceUrlSuffix.ugc-online=:8080/hystrix.stream

C: using the Dashboard configuration Turbine connection

如图所示:

Hystrix的配置和分析

The 1:Hystrix 配置

1):命令配置

当命令被使用 setter 方式创建的时候 ,命令配置的源代码在 HystrixCommandProperties

配置的具体说明和默认的值如下:

//命令的资源隔离模式, 默认为: thread isolation,ExecutionIsolationStrategy.THREAD
private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy; 

//使用线程隔离, 超时调用, 默认值: 1
private final HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds; 

//The thread pool key, 用来决定 命令在哪个线程池中执行  
private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride; 

//使用信号量隔离模式, 次数调用,并发最大值默认为10
private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;

/使用信号量隔离模式,命令的降级调用的最大并发值,默认为10 
private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests; 

//是否启用降级策略 默认为true
private final HystrixProperty<Boolean> fallbackEnabled; 

// 使用线程隔离模式, 是否开始命令调用的线程超时中断操作,默认是true
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout; 

// Statistical rolling time window, default: 5000 ms circuitBreakerSleepWindowInMilliseconds
private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;

// 统计窗口Buckets的数量  default: 10 per second, a Buckets statistics
private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow

//是否开启数据监控功能, the default: true
private final HystrixProperty<Boolean> metricsRollingPercentileEnabled; 

// 是否开启请求日志, default: true
private final HystrixProperty<Boolean> requestLogEnabled; 

//是否开始请求缓存, default: true
private final HystrixProperty<Boolean> requestCacheEnabled;  

2):Fuse (Circuit Breaker) configuration 熔断器配置

当使用 setter方式设值的时候,熔断器的配置信息源码类是 HystrixCommandProperties

//是否会在整个统计时间值中打开熔断器, 默认是 20 seconds. 10秒至少20次请求失败,就会启动熔断
private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold; 

//熔断器的默认工作时间, 默认值是 5 seconds. 在进入半开启状态之后,熔断器会中断请求5秒钟 , 紧接着才会发起重试
private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds; 

//是否开启熔断器, 默认值是 true. boot
private final HystrixProperty<Boolean> circuitBreakerEnabled; 

//默认值: 50%. 当错误比率达到这个值将会开启熔断
private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;

//是否对所有请求熔断处理 默认是 false
private final HystrixProperty<Boolean> circuitBreakerForceOpen; 

//是否允许熔断器忽略错误,默认是false
private final HystrixProperty<Boolean> circuitBreakerForceClosed;
3):Command combination (Collapser) configuration

Collapser 使用 setter 方式设置的时候,源代码参见 HystrixCollapserProperties

//合并的请求数量 ,默认的允许最大值: Integer.MAX_VALUE
private final HystrixProperty<Integer> maxRequestsInBatch;

//批量的命令处理迟延时间, default: 10 ms
private final HystrixProperty<Integer> timerDelayInMilliseconds;

//批量处理的请求缓存, default: enabled
private final HystrixProperty<Boolean> requestCacheEnabled;
4):线程池配置
/**
线程池大小, 默认值 10. Suggestion: request when the peak value of 99.5% in the average response time + to save some can
*/
HystrixThreadPoolProperties.Setter().withCoreSize(int value)

/**

等待队列的长度,默认为-1,不推荐修改这个值,推荐值的意思是:不等待直接拒绝处理

Configure the thread value waiting queue length, the default value: -1
The suggested values: -1 said not to wait directly refuse, test show that the non retracting the thread pool thread pool using highest efficiency directly refuse strategy + suitable size. It is not recommended to modify this value. 
When using non retracting the thread pool, queueSizeRejectionThreshold, the keepAliveTimeMinutes parameter is invalid
*/
HystrixThreadPoolProperties.Setter().withMaxQueueSize(int value)

Analysis of the key components of 2:Hystrix

1): Hystrix 流程架构分析:

流程描述:

每次调用的第一步

1: 创建一个 HystrixCommand, 使用 execute (/queue / observe / observable)

2: 使用 execute (/queue) ,分别是同步或异步的方式

3: 查看请求缓存是否可用,是就直接返回

4: 查看熔断器是否打开, 是的话跳到 step 8, 降级策略, 不是就进入下一步流程.

5:thread pool / signal 资源是否耗尽, 如果是降级到 step 8 ,并将事件传递给 healthCount, 否则下一步流程

6: HystrixCommand 调用 run() 调用具体的业务操作

6a: 执行是否出现失败,如果是降级到 step 8,并将事件传递给 healthCount

6b: 调用是否超时,,如果是降级到 step 8,并将事件传递给 healthCount

7: healthCount计算熔断的健康状态, 所有这些状态(success, failure, rejection, timeout) 都会被报告给熔断器, 熔断器基于这些数据来决定下一步的走向

8:getFallback()Degradation of logic.下面四种情况会触发 getFallback 调用:

  • :run()The method throws an non HystrixBadRequestException anomalies.
  • :run()The method call timeout
  • :Fuse opening to intercept calls
  • :The thread pool / queue / signal whether the amount of running full

8a: 没有实现getFallback 方法的 Command 会直接抛出 exception

8b: 降级操作的逻辑调用是成功的就直接返回

9: 返回成功的结果

关于以上的流程介绍,有一篇文章进行了详细的介绍 How it Works

2):Fuse: Circuit Breaker 熔断器

Circuit Breaker 架构和数据处理

(failure) / (failure + success) = error% , if(error%) > threshold , 触发熔断

每一次熔断,每秒钟默认包含了10个 bucket, 每一个 bucket 可以记录 成功、失败、超时、拒绝状态 默认的错误策略:超过50%的请求错误,并且10秒内超过20个请求被中断拦截。

源码分析 HealthCountsStream ,内部是一个concurrentMap<commandkey,HealthCountsStream>,底层是long数组的Bucket数据结构,index 为事件类型枚举对应的索引(ordinal),值为对应事件的个数 在给定十秒 10个bucket的情况下,每个bucket记录每秒的请求状态,熔断器通过他来获取健康状态决定下一步的行为

父抽象类:BucketedCounterStream -> BucketedRollingCounterStream -> HealthCountsStream

  • 最顶层的 BucketedCounterStream 抽象类提供了基本的桶计数器实现,按配置的时间间隔将所有事件聚合成桶;
  • BucketedRollingCounterStream 抽象类在其基础上实现滑动窗口,并聚合成指标数据;
  • 而最底下一层的类则是各种具体的实现,比如 HealthCountsStream 最终会聚合成健康检查数据(HystrixCommandMetrics.HealthCounts,统计调用成功和失败的计数),供 HystrixCircuitBreaker 使用。

核心代码:

private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>();

private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;

public static HealthCountsStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
    HealthCountsStream initialStream = streams.get(commandKey.name());
    if (initialStream != null) {
        return initialStream;
    } else {
        synchronized (HealthCountsStream.class) {
            HealthCountsStream existingStream = streams.get(commandKey.name());
            if (existingStream == null) {
                HealthCountsStream newStream = new HealthCountsStream(commandKey, numBuckets, bucketSizeInMs,
                        HystrixCommandMetrics.appendEventToBucket);
                newStream.startCachingStreamValuesIfUnstarted();
                streams.putIfAbsent(commandKey.name(), newStream);
                return newStream;
            } else {
                return existingStream;
            }
        }
    }
}

@Override
long[] getEmptyBucketSummary() {
    return new long[NUM_EVENT_TYPES];
}

3) 隔离分析 analysis

Hystrix 隔离模式使用的是 thread/signal 模式,通过隔离限制进行扩散,取决于并发性和阻塞。

Hystrix isolation mode using the thread / signal mode, diffusion through the isolation limits depend on the concurrency and blocking.

(1):Thread isolation 线程隔离

线程池的大小可以控制并发数,当线程池饱和了会进一步导致程序拒绝服务,避免问题依赖的扩散。

线上推荐的线程池配置不要设置的太高,否则大量的阻塞线程会拖垮整个服务器

如图:

(2):thread isolation 优劣势

优势:

  • 使用线程可以完全的和第三方代码进行隔离,请求线程可以快速的返回
  • Rely on when a failed once again become available, the thread pool will be clean, and immediately available, but not a long time to restore
  • 可以方便的使用异步编程风格

劣势:

  • 主要的缺点是增加了线程池的CPU消耗,因为每次命令的执行都会引起排队操作(使用默认的 SynchronousQueue 可以避免排队、调度、上下文切换)
  • 增加了使用的复杂性,类似ThreadLocal依赖于线程状态,需要手动去处理线程安全。

NOTE: Netflix公司的线程隔离规模能做到足够小,对成本和性能的影响降到最低

Netflix internal API 每 10000000000 个 HystrixCommand 依赖使用线程分区, 每一个线程池有超过40 个线程池, 每个线程池有5-20个线程

(3):Signal isolation 信号隔离

信号 隔离一样可以被用来严格的限制并发访问,避免阻塞传导,线程隔离最大的不同就是 实现依赖代码线程依旧是 请求线程( requesting thread :线程需要应用程序发出信号)

如果客户端是可信的并且可以快速返回,可以使用信号模式替代线程模式来减少过度使用线程

信号大小可以动态调整,线程池不能

线程模式和信号模式的不同如下所示:

翻译完了之后才发现这篇文章已经有人翻译过了,要是觉得本文有哪些地方看不懂的可以阅读这篇文章: Hystrix 使用与分析

参考网站

Use and analysis of Hystrix

How-it-Works

Hystrix 1.5 滑动窗口实现原理总结

Hystrix线程隔离技术解析-线程池

Hystrix技术解析

防雪崩利器:熔断器 Hystrix 的原理与使用