/** * Prepare this context for refreshing, setting its startup date and * active flag as well as performing any initialization of property sources. */ protected void prepareRefresh() { // Switch to active. this.startupDate = System.currentTimeMillis(); this.closed.set(false); this.active.set(true); if (logger.isDebugEnabled()) { if (logger.isTraceEnabled()) { logger.trace("Refreshing " + this); } else { logger.debug("Refreshing " + getDisplayName()); } } // Initialize any placeholder property sources in the context environment. // 初始化上下文环境中的任何占位符属性源 initPropertySources(); // Validate that all properties marked as required are resolvable:seeConfigurablePropertyResolver#setRequiredProperties // 验证所有标记为必需的属性是否可解析:参见 ConfigurablePropertyResolver#setRequiredProperties getEnvironment().validateRequiredProperties(); // Store pre-refresh ApplicationListeners... // 存储预刷新 ApplicationListeners if (this.earlyApplicationListeners == null) { this.earlyApplicationListeners = new LinkedHashSet<>(this.applicationListeners); } else { // Reset local application listeners to pre-refresh state. this.applicationListeners.clear(); this.applicationListeners.addAll(this.earlyApplicationListeners); } // Allow for the collection of early ApplicationEvents, // to be published once the multicaster is available... this.earlyApplicationEvents = new LinkedHashSet<>(); }
第二步:刷新bean工厂准备beanfactory
1 2 3 4 5 6
// Tell the subclass to refresh the internal bean factory. 告诉子类刷新内部bean工厂 ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory(); // Prepare the bean factory for use in this context 准备 bean 工厂以供在此上下文中使用 prepareBeanFactory(beanFactory);
第三步:允许在上下文子类中对 bean 工厂进行后处理
1 2
// Allows post-processing of the bean factory in context subclasses. postProcessBeanFactory(beanFactory);
第四步:调用在上下文中注册为 bean 的工厂处理器
1 2
// Invoke factory processors registered as beans in the context. invokeBeanFactoryPostProcessors(beanFactory);
第五步:注册拦截 bean 创建的 bean 处理器
1 2 3
// Register bean processors that intercept bean creation. registerBeanPostProcessors(beanFactory); beanPostProcess.end();
第六步:初始化此上下文的消息源
1 2
// Initialize message source for this context. initMessageSource();
第七步:为该上下文初始化事件多播器
1 2
// Initialize event multicaster for this context. initApplicationEventMulticaster();
第八步:初始化特定上下文子类中的其他特殊 bean
1 2
// Initialize other special beans in specific context subclasses. onRefresh()
第九步:检查监听器 bean 并注册它们
1 2
// Check for listener beans and register them. registerListeners();
第十步:实例化所有剩余的(非延迟初始化)单例
1 2
// Instantiate all remaining (non-lazy-init) singletons. finishBeanFactoryInitialization(beanFactory);
十一步:最后一步,发布相应事件。
1 2
// Last step: publish corresponding event. finishRefresh();
方法注释是 完成此上下文的 bean 工厂的初始化,初始化所有剩余的单例 bean protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) { // Initialize conversion service for this context. if (beanFactory.containsBean(CONVERSION_SERVICE_BEAN_NAME) && beanFactory.isTypeMatch(CONVERSION_SERVICE_BEAN_NAME, ConversionService.class)) { beanFactory.setConversionService( beanFactory.getBean(CONVERSION_SERVICE_BEAN_NAME, ConversionService.class)); }
// Register a default embedded value resolver if no BeanFactoryPostProcessor // (such as a PropertySourcesPlaceholderConfigurer bean) registered any before: // at this point, primarily for resolution in annotation attribute values. if (!beanFactory.hasEmbeddedValueResolver()) { beanFactory.addEmbeddedValueResolver(strVal -> getEnvironment().resolvePlaceholders(strVal)); }
// Initialize LoadTimeWeaverAware beans early to allow for registering their transformers early. String[] weaverAwareNames = beanFactory.getBeanNamesForType(LoadTimeWeaverAware.class, false, false); for (String weaverAwareName : weaverAwareNames) { try { beanFactory.getBean(weaverAwareName, LoadTimeWeaverAware.class); } catch (BeanNotOfRequiredTypeException ex) { if (logger.isDebugEnabled()) { logger.debug("Failed to initialize LoadTimeWeaverAware bean '" + weaverAwareName + "' due to unexpected type mismatch: " + ex.getMessage()); } } }
// Stop using the temporary ClassLoader for type matching. beanFactory.setTempClassLoader(null);
// Allow for caching all bean definition metadata, not expecting further changes. beanFactory.freezeConfiguration();
// Instantiate all remaining (non-lazy-init) singletons. beanFactory.preInstantiateSingletons(); }
在springmvc-servlet.xml这个核心配置文件中,最重要的其实是配置Controller类所在的路径,即包扫描的路径,以及配置一个视图解析器,主要用于解析请求成功之后的视图数据。 OK~,配置好了springmvc-servlet.xml文件后,紧接着我们会再修改maven-web项目核心文件web.xml中的配置项: xml 代码解读复制代码
At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景,使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级” 用幂等性解决重复消息问题一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。幂等(Idempotence) 本来是一个数学上的概念,它是这样定义的: 如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。 这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同 一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。我们举个例子来说明一下。在不考虑并发的情况下,“将账户 X 的余额设置为 100 元”,执行一次后对系统的影响是,账户 X 的余额变成了 100 元。只要提供的参数 100 元不变,那即使再执行多少次,账户 X 的余额始终都是 100 元,不会变化,这个操作就是一个幂等的操作。再举一个例子,“将账户 X 的余额加 100 元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。“如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。 从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。那么如何实现幂等操作呢? 最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。 下面我给你介绍几种常用的设计幂等操作的方法:
利用数据库的唯一约束实现幂等,“例如我们刚刚提到的那个不具备幂等特性的转账的例子:将账户 X 的余额加 100 元。在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 “ ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
为更新的数据设置前置条件另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件,“中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。比如,刚刚我们说过,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时版本号 +1,一样可以实现幂等更新
记录并检查操作如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费 原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况: t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”; t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻“Consumer A 还未来得及更新消息执行状态。 这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题”
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)