Springboot之分布式事务框架Seata实现原理源码分析

作者:FastCoder 2021-08-06 08:33:27开发前端分布式 Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。

[[415757]]

环境:springboot2.2.11 + seata1.3.0

1 准备环境

    <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>1.3.0</version></dependency>

开启全局事务

    seata:service:disable-global-transaction:true

2 代理数据源及注册代理Bean

    @Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER,BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)publicGlobalTransactionScannerglobalTransactionScanner(SeataPropertiesseataProperties,FailureHandlerfailureHandler){if(LOGGER.isInfoEnabled()){LOGGER.info("AutomaticallyconfigureSeata");}returnnewGlobalTransactionScanner(seataProperties.getApplicationId(),seataProperties.getTxServiceGroup(),failureHandler);}@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)@ConditionalOnProperty(prefix=StarterConstants.SEATA_PREFIX,name={"enableAutoDataSourceProxy","enable-auto-data-source-proxy"},havingValue="true",matchIfMissing=true)@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)publicSeataAutoDataSourceProxyCreatorseataAutoDataSourceProxyCreator(SeataPropertiesseataProperties){returnnewSeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());}

2.1 创建代理Bean

Seata通过GlobalTransactionScanner来注册我们项目中所有带有@GlobalTransactional注解的方法类。

    publicclassGlobalTransactionScannerextendsAbstractAutoProxyCreatorimplementsInitializingBean,ApplicationContextAware,DisposableBean

AbstractAutoProxyCreator继承层次

从这里也知道GlobalTransactionScanner类其实是一个BeanPostProcessor处理器。

InstantiationAwareBeanPostProcessor类有如下3个方法是很有用的

postProcessBeforeInstantiation 实例化前执行

postProcessAfterInstantiation 实例化之后执行

postProcessProperties 属性填充时执行

当然在这里GlobalTransactionScanner类并没有覆盖这3个方法。

BeanPostProcessor相关的2个方法在父类中有实现

    @OverridepublicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName){returnbean;}@OverridepublicObjectpostProcessAfterInitialization(@NullableObjectbean,StringbeanName){if(bean!=null){ObjectcacheKey=getCacheKey(bean.getClass(),beanName);if(this.earlyProxyReferences.remove(cacheKey)!=bean){returnwrapIfNecessary(bean,beanName,cacheKey);}}returnbean;}

在实例化Bean的时候会执行父类中

postProcessAfterInitialization方法。关键是该方法中的wrapIfNecessary方法,该方法在GlobalTransactionScanner类中被重写了。

existsAnnotation 方法判断当前的类方法上是否有@GlobalTransactional注解。如果不存在会直接返回当前Bean。

interceptor 判断当前拦截器是否为空,为空创建

GlobalTransactionalInterceptor该拦截器处理全局事务的地方。

    if(!AopUtils.isAopProxy(bean)){bean=super.wrapIfNecessary(bean,beanName,cacheKey);}else{AdvisedSupportadvised=SpringProxyUtils.getAdvisedSupport(bean);Advisor[]advisor=buildAdvisors(beanName,getAdvicesAndAdvisorsForBean(null,null,null));for(Advisoravr:advisor){advised.addAdvisor(0,avr);}}

该片段代码,判断当前的Bean是否是代理类(JDK或CGLIB),如果不是那么会先的执行下父类的wrapIfNecessary方法。

如果当前Bean是代理类对象,那么会获取当前代理类的AdvisedSupport(内部维护了切面类的集合)对象。这里可以看看JDK和CGLIB两种方式创建的代理类对象是否都具有AdvisedSupport对象。

    publicstaticAdvisedSupportgetAdvisedSupport(Objectproxy)throwsException{Fieldh;if(AopUtils.isJdkDynamicProxy(proxy)){h=proxy.getClass().getSuperclass().getDeclaredField("h");}else{h=proxy.getClass().getDeclaredField("CGLIB$CALLBACK_0");}h.setAccessible(true);ObjectdynamicAdvisedInterceptor=h.get(proxy);Fieldadvised=dynamicAdvisedInterceptor.getClass().getDeclaredField("advised");advised.setAccessible(true);return(AdvisedSupport)advised.get(dynamicAdvisedInterceptor);}

jdk创建代理对象时使用的InvocationHandler

    finalclassJdkDynamicAopProxyimplementsAopProxy,InvocationHandler,Serializable{/**WeuseastaticLogtoavoidserializationissues.*/privatestaticfinalLoglogger=LogFactory.getLog(JdkDynamicAopProxy.class);/**Configusedtoconfigurethisproxy.*/privatefinalAdvisedSupportadvised;}

cglib 获取CGLIB$CALLBACK_0字段,该字段是MethodInterceptor对象

    publicclassPersonDAOImpl$$EnhancerBySpringCGLIB$$d4658dadextendsPersonDAOImplimplementsSpringProxy,Advised,Factory{privatebooleanCGLIB$BOUND;publicstaticObjectCGLIB$FACTORY_DATA;privatestaticfinalThreadLocalCGLIB$THREAD_CALLBACKS;privatestaticfinalCallback[]CGLIB$STATIC_CALLBACKS;privateMethodInterceptorCGLIB$CALLBACK_0;}

接下来就是将Seata的拦截器添加到AdvisedSupport中。

    Advisor[]advisor=buildAdvisors(beanName,getAdvicesAndAdvisorsForBean(null,null,null));for(Advisoravr:advisor){advised.addAdvisor(0,avr);}protectedObject[]getAdvicesAndAdvisorsForBean(ClassbeanClass,StringbeanName,TargetSourcecustomTargetSource)throwsBeansException{returnnewObject[]{interceptor};}

到此就将Seata的方法拦截器包装成Advisor切面添加到了当前的AdvisedSupport管理的切面集合中。

2.2 创建代理数据源

对数据源上的方法调用进行代理处理通过DataSourceProxy

    publicclassSeataAutoDataSourceProxyCreatorextendsAbstractAutoProxyCreator{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);privatefinalString[]excludes;privatefinalAdvisoradvisor=newDefaultIntroductionAdvisor(newSeataAutoDataSourceProxyAdvice());publicSeataAutoDataSourceProxyCreator(booleanuseJdkProxy,String[]excludes){this.excludes=excludes;setProxyTargetClass(!useJdkProxy);}@OverrideprotectedObject[]getAdvicesAndAdvisorsForBean(Class<?>beanClass,StringbeanName,TargetSourcecustomTargetSource)throwsBeansException{if(LOGGER.isInfoEnabled()){LOGGER.info("Autoproxyof[{}]",beanName);}returnnewObject[]{advisor};}@OverrideprotectedbooleanshouldSkip(Class<?>beanClass,StringbeanName){returnSeataProxy.class.isAssignableFrom(beanClass)||!DataSource.class.isAssignableFrom(beanClass)||Arrays.asList(excludes).contains(beanClass.getName());}}

shouldSkip 该方法确定了如果当前beanClass是SeataProxy的子类并且beanClass不是DataSource的子类或者当前的bean名称不再excludes集合中就会进行代理。简单说就是代理当前系统的默认数据源对象。

getAdvicesAndAdvisorsForBean 方法直接返回DefaultIntroductionAdvisor切面,通知类是SeataAutoDataSourceProxyAdvice

    publicclassSeataAutoDataSourceProxyAdviceimplementsMethodInterceptor,IntroductionInfo{@OverridepublicObjectinvoke(MethodInvocationinvocation)throwsThrowable{DataSourceProxydataSourceProxy=DataSourceProxyHolder.get().putDataSource((DataSource)invocation.getThis());Methodmethod=invocation.getMethod();Object[]args=invocation.getArguments();Methodm=BeanUtils.findDeclaredMethod(DataSourceProxy.class,method.getName(),method.getParameterTypes());if(m!=null){returnm.invoke(dataSourceProxy,args);}else{returninvocation.proceed();}}@OverridepublicClass<?>[]getInterfaces(){returnnewClass[]{SeataProxy.class};}}

3 全局事务拦截器

在需要进行发起全局事务的方法是被代理的 具体执行的拦截器是

GlobalTransactionalInterceptor

handleGlobalTransaction方法

通过事务模版执行,TransactionalExecutor类进行收集当前@GlobalTransactional注解上配置的相关信息封装到TransactionInfo中。

TransactionalTemplate.execute方法

    //该方法中根据不同的事务传播特性进行不同的处理。publicObjectexecute(TransactionalExecutorbusiness)throwsThrowable{//1gettransactionInfoTransactionInfotxInfo=business.getTransactionInfo();if(txInfo==null){thrownewShouldNeverHappenException("transactionInfodoesnotexist");}//1.1getorcreateatransactionGlobalTransactiontx=GlobalTransactionContext.getCurrentOrCreate();//1.2HandletheTransactionpropatationandthebranchTypePropagationpropagation=txInfo.getPropagation();SuspendedResourcesHoldersuspendedResourcesHolder=null;try{switch(propagation){caseNOT_SUPPORTED:suspendedResourcesHolder=tx.suspend(true);returnbusiness.execute();caseREQUIRES_NEW:suspendedResourcesHolder=tx.suspend(true);break;caseSUPPORTS:if(!existingTransaction()){returnbusiness.execute();}break;caseREQUIRED:break;caseNEVER://存在事务抛出异常if(existingTransaction()){thrownewTransactionException(String.format("Existingtransactionfoundfortransactionmarkedwithpropagation'never',xid=%s",RootContext.getXID()));}else{//直接执行业务代码returnbusiness.execute();}caseMANDATORY:if(!existingTransaction()){thrownewTransactionException("Noexistingtransactionfoundfortransactionmarkedwithpropagation'mandatory'");}break;default:thrownewTransactionException("NotSupportedPropagation:"+propagation);}try{//2.开始事务beginTransaction(txInfo,tx);Objectrs=null;try{//执行我们的业务代码rs=business.execute();}catch(Throwableex){//3.theneededbusinessexceptiontorollback.completeTransactionAfterThrowing(txInfo,tx,ex);throwex;}//4.一切正常提交事务。commitTransaction(tx);returnrs;}finally{//5.cleartriggerAfterCompletion();cleanUp();}}finally{tx.resume(suspendedResourcesHolder);}}

3.1 获取全局事务

    GlobalTransactiontx=GlobalTransactionContext.getCurrentOrCreate();publicstaticGlobalTransactiongetCurrentOrCreate(){//首次这里会返回null,执行createNew方法GlobalTransactiontx=getCurrent();if(tx==null){returncreateNew();}returntx;}//获取全局事务对象privatestaticGlobalTransactiongetCurrent(){Stringxid=RootContext.getXID();if(xid==null){returnnull;}returnnewDefaultGlobalTransaction(xid,GlobalStatus.Begin,GlobalTransactionRole.Participant);}privatestaticGlobalTransactioncreateNew(){returnnewDefaultGlobalTransaction();}

3.2 开始全局事务

    beginTransaction(txInfo,tx);privatevoidbeginTransaction(TransactionInfotxInfo,GlobalTransactiontx)throwsTransactionalExecutor.ExecutionException{try{triggerBeforeBegin();tx.begin(txInfo.getTimeOut(),txInfo.getName());triggerAfterBegin();}catch(TransactionExceptiontxe){thrownewTransactionalExecutor.ExecutionException(tx,txe,TransactionalExecutor.Code.BeginFailure);}}//开始全局事务,并且通过TC获取全局事务唯一IDxidpublicvoidbegin(inttimeout,Stringname)throwsTransactionException{//全局事务的开启必须是Launcherif(role!=GlobalTransactionRole.Launcher){assertXIDNotNull();if(LOGGER.isDebugEnabled()){LOGGER.debug("IgnoreBegin():justinvolvedinglobaltransaction[{}]",xid);}return;}assertXIDNull();if(RootContext.getXID()!=null){thrownewIllegalStateException();}xid=transactionManager.begin(null,null,name,timeout);status=GlobalStatus.Begin;//将当前获取到的xid绑定到当前thread上(ThreadLocal)RootContext.bind(xid);if(LOGGER.isInfoEnabled()){LOGGER.info("Beginnewglobaltransaction[{}]",xid);}}

将xid绑定到当前执行thread(ThreadLocal)在这里seata是通过SPI技术来实现的

    privatestaticContextCoreCONTEXT_HOLDER=ContextCoreLoader.load();publicstaticvoidbind(Stringxid){if(LOGGER.isDebugEnabled()){LOGGER.debug("bind{}",xid);}CONTEXT_HOLDER.put(KEY_XID,xid);}//通过SPI加载具体的ContextCore实现publicclassContextCoreLoader{privateContextCoreLoader(){}privatestaticclassContextCoreHolder{privatestaticfinalContextCoreINSTANCE=Optional.ofNullable(EnhancedServiceLoader.load(ContextCore.class)).orElse(newThreadLocalContextCore());}publicstaticContextCoreload(){returnContextCoreHolder.INSTANCE;}}//META-INF/services/io.seata.core.context.ContextCore文件内容io.seata.core.context.ThreadLocalContextCoreio.seata.core.context.FastThreadLocalContextCore

3.3 执行本地业务

执行本地事务时会向TC注册分支然后提交本地事务,接下来看看本地分支事务的注册及处理。

    rs=business.execute();

3.3.1 提交本地事务

执行完业务代码后提交事务ConnectionProxy.commit()

    @Overridepublicvoidcommit()throwsSQLException{try{LOCK_RETRY_POLICY.execute(()->{//事务提交doCommit();returnnull;});}catch(SQLExceptione){throwe;}catch(Exceptione){thrownewSQLException(e);}}privatevoiddoCommit()throwsSQLException{//判断当前是否在全局事务中(xid!=null)if(context.inGlobalTransaction()){//处理全局事务processGlobalTransactionCommit();}elseif(context.isGlobalLockRequire()){processLocalCommitWithGlobalLocks();}else{targetConnection.commit();}}

进入

processGlobalTransactionCommit方法

3.3.2 注册本次事务分支

    privatevoidprocessGlobalTransactionCommit()throwsSQLException{try{register();}catch(TransactionExceptione){recognizeLockKeyConflictException(e,context.buildLockKeys());}try{UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);targetConnection.commit();}catch(Throwableex){report(false);thrownewSQLException(ex);}if(IS_REPORT_SUCCESS_ENABLE){report(true);}context.reset();}//注册本次事务执行RM,将返回的branchId保存到当前的上下文中privatevoidregister()throwsTransactionException{if(!context.hasUndoLog()||context.getLockKeysBuffer().isEmpty()){return;}LongbranchId=DefaultResourceManager.get().branchRegister(BranchType.AT,getDataSourceProxy().getResourceId(),null,context.getXid(),null,context.buildLockKeys());//将注册RM返回的branchId绑定到当前的上下文中ConnectionContextcontext.setBranchId(branchId);}

进入branchRegister方法

DefaultResourceManager.java

    @OverridepublicLongbranchRegister(BranchTypebranchType,StringresourceId,StringclientId,Stringxid,StringapplicationData,StringlockKeys)throwsTransactionException{returngetResourceManager(branchType).branchRegister(branchType,resourceId,clientId,xid,applicationData,lockKeys);}
    @OverridepublicLongbranchRegister(BranchTypebranchType,StringresourceId,StringclientId,Stringxid,StringapplicationData,StringlockKeys)throwsTransactionException{try{BranchRegisterRequestrequest=newBranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponseresponse=(BranchRegisterResponse)RmNettyRemotingClient.getInstance().sendSyncRequest(request);if(response.getResultCode()==ResultCode.Failed){thrownewRmTransactionException(response.getTransactionExceptionCode(),String.format("Response[%s]",response.getMsg()));}returnresponse.getBranchId();}catch(TimeoutExceptiontoe){thrownewRmTransactionException(TransactionExceptionCode.IO,"RPCTimeout",toe);}catch(RuntimeExceptionrex){thrownewRmTransactionException(TransactionExceptionCode.BranchRegisterFailed,"Runtime",rex);}}

3.3.3 记录undo log日志

undo log主要记录了数据的逻辑变化,比如一条 INSERT 语句,对应一条DELETE 的 undo log ,对于每个 UPDATE 语句,对应一条相反的 UPDATE 的 undo log ,这样在发生错误时,就能回滚到事务之前的数据状态。

    UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//记录undolog日志publicvoidflushUndoLogs(ConnectionProxycp)throwsSQLException{ConnectionContextconnectionContext=cp.getContext();if(!connectionContext.hasUndoLog()){return;}Stringxid=connectionContext.getXid();longbranchId=connectionContext.getBranchId();BranchUndoLogbranchUndoLog=newBranchUndoLog();branchUndoLog.setXid(xid);branchUndoLog.setBranchId(branchId);branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());UndoLogParserparser=UndoLogParserFactory.getInstance();byte[]undoLogContent=parser.encode(branchUndoLog);if(LOGGER.isDebugEnabled()){LOGGER.debug("FlushingUNDOLOG:{}",newString(undoLogContent,Constants.DEFAULT_CHARSET));}//将undolog日志出入到undo_log表中insertUndoLogWithNormal(xid,branchId,buildContext(parser.getName()),undoLogContent,cp.getTargetConnection());}//这里会根据是Oracle或MySQL自动执行;我当前环境使用的MySQL,所以使用的是MySQLUndoLogManager@OverrideprotectedvoidinsertUndoLogWithNormal(Stringxid,longbranchId,StringrollbackCtx,byte[]undoLogContent,Connectionconn)throwsSQLException{insertUndoLog(xid,branchId,rollbackCtx,undoLogContent,State.Normal,conn);}privatevoidinsertUndoLog(Stringxid,longbranchId,StringrollbackCtx,byte[]undoLogContent,Statestate,Connectionconn)throwsSQLException{try(PreparedStatementpst=conn.prepareStatement(INSERT_UNDO_LOG_SQL)){pst.setLong(1,branchId);pst.setString(2,xid);pst.setString(3,rollbackCtx);pst.setBlob(4,BlobUtils.bytes2Blob(undoLogContent));pst.setInt(5,state.getValue());pst.executeUpdate();}catch(Exceptione){if(!(einstanceofSQLException)){e=newSQLException(e);}throw(SQLException)e;}}

3.3.4 本地事务提交

业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。

    targetConnection.commit();

3.3.5 重置当前上下文

将当前ConnectionProxy中的ConnectionContext重置

    context.reset();//重置voidreset(Stringxid){this.xid=xid;branchId=null;this.isGlobalLockRequire=false;lockKeysBuffer.clear();sqlUndoItemsBuffer.clear();}

到此整个全局事务的第一阶段完成了(通过feign的调用也成功返回);接下来是第二阶段的提交。

3.4 全局事物提交

    //TransactionalTemplate.javacommitTransaction(tx);privatevoidcommitTransaction(GlobalTransactiontx)throwsTransactionalExecutor.ExecutionException{try{triggerBeforeCommit();tx.commit();triggerAfterCommit();}catch(TransactionExceptiontxe){//4.1FailedtocommitthrownewTransactionalExecutor.ExecutionException(tx,txe,TransactionalExecutor.Code.CommitFailure);}}//DefaultGlobalTransaction.javapublicvoidcommit()throwsTransactionException{intretry=COMMIT_RETRY_COUNT<=0?DEFAULT_TM_COMMIT_RETRY_COUNT:COMMIT_RETRY_COUNT;try{while(retry>0){try{//根据当前的全局唯一事务idxid提交事务。status=transactionManager.commit(xid);break;}catch(Throwableex){retry--;if(retry==0){thrownewTransactionException("Failedtoreportglobalcommit",ex);}}}}finally{if(RootContext.getXID()!=null&&xid.equals(RootContext.getXID())){suspend(true);}}}//DefaultTransactionManager.java@OverridepublicGlobalStatuscommit(Stringxid)throwsTransactionException{GlobalCommitRequestglobalCommit=newGlobalCommitRequest();globalCommit.setXid(xid);GlobalCommitResponseresponse=(GlobalCommitResponse)syncCall(globalCommit);returnresponse.getGlobalStatus();}//通过Netty同步调用privateAbstractTransactionResponsesyncCall(AbstractTransactionRequestrequest)throwsTransactionException{try{return(AbstractTransactionResponse)TmNettyRemotingClient.getInstance().sendSyncRequest(request);}catch(TimeoutExceptiontoe){thrownewTmTransactionException(TransactionExceptionCode.IO,"RPCtimeout",toe);}}

到此第二阶段的事务就提交完成了。

3.5 全局事务回滚

参与全局事务的任何一个分支发生异常将对整个事务进行回滚。

3.5.1 全局事务发起端异常

代码片段

    try{//DoYourBusinessrs=business.execute();}catch(Throwableex){//3.theneededbusinessexceptiontorollback.completeTransactionAfterThrowing(txInfo,tx,ex);throwex;}

当本地业务执行时发生异常后执行

completeTransactionAfterThrowing方法。

    privatevoidcompleteTransactionAfterThrowing(TransactionInfotxInfo,GlobalTransactiontx,ThrowableoriginalException)throwsTransactionalExecutor.ExecutionException{//rollbackif(txInfo!=null&&txInfo.rollbackOn(originalException)){try{//全局事务回滚rollbackTransaction(tx,originalException);}catch(TransactionExceptiontxe){//FailedtorollbackthrownewTransactionalExecutor.ExecutionException(tx,txe,TransactionalExecutor.Code.RollbackFailure,originalException);}}else{//notrollbackonthisexception,socommitcommitTransaction(tx);}}

进入rollbackTransaction方法。

    privatevoidrollbackTransaction(GlobalTransactiontx,ThrowableoriginalException)throwsTransactionException,TransactionalExecutor.ExecutionException{triggerBeforeRollback();tx.rollback();triggerAfterRollback();//3.1SuccessfullyrolledbackthrownewTransactionalExecutor.ExecutionException(tx,GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())?TransactionalExecutor.Code.RollbackRetrying:TransactionalExecutor.Code.RollbackDone,originalException);}

进入rollback方法。

    publicvoidrollback()throwsTransactionException{//Participant(参与者),如果当前是参与者那么直接返回,全局事务的回滚必须是Launcherif(role==GlobalTransactionRole.Participant){return;}assertXIDNotNull();intretry=ROLLBACK_RETRY_COUNT<=0?DEFAULT_TM_ROLLBACK_RETRY_COUNT:ROLLBACK_RETRY_COUNT;try{while(retry>0){try{status=transactionManager.rollback(xid);break;}catch(Throwableex){retry--;if(retry==0){thrownewTransactionException("Failedtoreportglobalrollback",ex);}}}}finally{if(RootContext.getXID()!=null&&xid.equals(RootContext.getXID())){suspend(true);}}}

进入

transactionManager.rollback方法

    publicGlobalStatusrollback(Stringxid)throwsTransactionException{GlobalRollbackRequestglobalRollback=newGlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponseresponse=(GlobalRollbackResponse)syncCall(globalRollback);returnresponse.getGlobalStatus();}

到此全局事务就进行了回滚

3.5.2 全局事务参与者异常

4 XID的传递

4.1 RestTemplate

    @Configuration(proxyBeanMethods=false)publicclassSeataRestTemplateAutoConfiguration{//该拦截器的作用就是为请求的Header中传递XID@BeanpublicSeataRestTemplateInterceptorseataRestTemplateInterceptor(){returnnewSeataRestTemplateInterceptor();}//获取当前IOC容器中所有的的RestTemplate对象@Autowired(required=false)privateCollection<RestTemplate>restTemplates;@AutowiredprivateSeataRestTemplateInterceptorseataRestTemplateInterceptor;//当前这个Bean(SeataRestTemplateAutoConfiguration)被创建且相关属性被注入后执行@PostConstructpublicvoidinit(){if(this.restTemplates!=null){//为所有的RestTemplate设置一个拦截器。SeataRestTemplateInterceptorfor(RestTemplaterestTemplate:restTemplates){List<ClientHttpRequestInterceptor>interceptors=newArrayList<ClientHttpRequestInterceptor>(restTemplate.getInterceptors());interceptors.add(this.seataRestTemplateInterceptor);restTemplate.setInterceptors(interceptors);}}}}
    publicclassSeataRestTemplateInterceptorimplementsClientHttpRequestInterceptor{@OverridepublicClientHttpResponseintercept(HttpRequesthttpRequest,byte[]bytes,ClientHttpRequestExecutionclientHttpRequestExecution)throwsIOException{HttpRequestWrapperrequestWrapper=newHttpRequestWrapper(httpRequest);Stringxid=RootContext.getXID();if(!StringUtils.isEmpty(xid)){requestWrapper.getHeaders().add(RootContext.KEY_XID,xid);}returnclientHttpRequestExecution.execute(requestWrapper,bytes);}}

该拦截器的作用就是为当前的请求header中放置TX_XID头信息。(是不是有点过分了,所有的RestTemplate都添加这个Header;应该像@LoadBalanced一样只有添加有该注解的才具有负载均衡的作用)。

到此RestTemplate调用方式传递XID值信息就这么简单。

4.2 Feign

    @Configuration(proxyBeanMethods=false)@ConditionalOnClass(Client.class)@AutoConfigureBefore(FeignAutoConfiguration.class)publicclassSeataFeignClientAutoConfiguration{@Bean@Scope("prototype")@ConditionalOnClass(name="com.netflix.hystrix.HystrixCommand")@ConditionalOnProperty(name="feign.hystrix.enabled",havingValue="true")Feign.BuilderfeignHystrixBuilder(BeanFactorybeanFactory){returnSeataHystrixFeignBuilder.builder(beanFactory);}}

每一个Feign客户端是一个FeignClientFactoryBean 工厂Bean。当在调用接口的时候会执行getObject方法

    @OverridepublicObjectgetObject()throwsException{returngetTarget();}<T>TgetTarget(){FeignContextcontext=this.applicationContext.getBean(FeignContext.class);//这个方法会从当前的IOC容器中获取Feign.Builder对象Feign.Builderbuilder=feign(context);if(!StringUtils.hasText(this.url)){if(!this.name.startsWith("http")){this.url="http://"+this.name;}else{this.url=this.name;}this.url+=cleanPath();//负载均衡调用目标服务(通过服务发现调用)return(T)loadBalance(builder,context,newHardCodedTarget<>(this.type,this.name,this.url));}//下面是通过配置的url直接调用目标服务。if(StringUtils.hasText(this.url)&&!this.url.startsWith("http")){this.url="http://"+this.url;}Stringurl=this.url+cleanPath();Clientclient=getOptional(context,Client.class);if(client!=null){if(clientinstanceofLoadBalancerFeignClient){client=((LoadBalancerFeignClient)client).getDelegate();}if(clientinstanceofFeignBlockingLoadBalancerClient){client=((FeignBlockingLoadBalancerClient)client).getDelegate();}builder.client(client);}Targetertargeter=get(context,Targeter.class);return(T)targeter.target(this,builder,context,newHardCodedTarget<>(this.type,this.name,url));}

feign(content)方法在调用链中最终执行如下方法从IOC容器中获取Feign.Builder

    public<T>TgetInstance(Stringname,Class<T>type){AnnotationConfigApplicationContextcontext=getContext(name);if(BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,type).length>0){returncontext.getBean(type);}returnnull;}

接下来进入

SeataHystrixFeignBuilder.builder(beanFactory);方法

    staticFeign.Builderbuilder(BeanFactorybeanFactory){returnHystrixFeign.builder().retryer(Retryer.NEVER_RETRY).client(newSeataFeignClient(beanFactory));}

SeataFeignClient类

    //Feign的执行就通过Client接口调用。publicclassSeataFeignClientimplementsClient{privatefinalClientdelegate;privatefinalBeanFactorybeanFactory;privatestaticfinalintMAP_SIZE=16;SeataFeignClient(BeanFactorybeanFactory){this.beanFactory=beanFactory;this.delegate=newClient.Default(null,null);}SeataFeignClient(BeanFactorybeanFactory,Clientdelegate){this.delegate=delegate;this.beanFactory=beanFactory;}@OverridepublicResponseexecute(Requestrequest,Request.Optionsoptions)throwsIOException{RequestmodifiedRequest=getModifyRequest(request);returnthis.delegate.execute(modifiedRequest,options);}//该方法给请求headers中添加TX_XID请求header信息。privateRequestgetModifyRequest(Requestrequest){Stringxid=RootContext.getXID();if(StringUtils.isEmpty(xid)){returnrequest;}Map<String,Collection<String>>headers=newHashMap<>(MAP_SIZE);headers.putAll(request.headers());List<String>seataXid=newArrayList<>();seataXid.add(xid);headers.put(RootContext.KEY_XID,seataXid);returnRequest.create(request.method(),request.url(),headers,request.body(),request.charset());}}

5 参与者如何加入全局事务

在被调用端(通过Feign调用服务)接口服务上没有加入任何注解或是特殊的代码那它又是如何加入到整个全局事务中的呢?

在2.2中介绍了seata自动配置会为我们自动的创建数据源代理。就是通过这个代理数据源来完成的DataSourceProxy。

事务方法在执行时都会先拿到Connection对象,这里系统默认的DataSource已经被代理成DataSourceProxy。

5.1 参与者获取XID

SeataHandlerInterceptorConfiguration注册一个拦截器SeataHandlerInterceptor;SeataHandlerInterceptor拦截器对我们的所有请求进行拦截

    publicclassSeataHandlerInterceptorConfigurationimplementsWebMvcConfigurer{@OverridepublicvoidaddInterceptors(InterceptorRegistryregistry){registry.addInterceptor(newSeataHandlerInterceptor()).addPathPatterns("/**");}}

拦截器从Header中获取TX_XID

    publicclassSeataHandlerInterceptorimplementsHandlerInterceptor{@OverridepublicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler){Stringxid=RootContext.getXID();//从Header中获取TX_XID信息。如果存在就绑定到RootContext上下文中。StringrpcXid=request.getHeader(RootContext.KEY_XID);if(StringUtils.isBlank(xid)&&rpcXid!=null){RootContext.bind(rpcXid);}returntrue;}@OverridepublicvoidafterCompletion(HttpServletRequestrequest,HttpServletResponseresponse,Objecthandler,Exceptione){if(StringUtils.isNotBlank(RootContext.getXID())){StringrpcXid=request.getHeader(RootContext.KEY_XID);if(StringUtils.isEmpty(rpcXid)){return;}//执行完后解除绑定StringunbindXid=RootContext.unbind();if(!rpcXid.equalsIgnoreCase(unbindXid)){if(unbindXid!=null){RootContext.bind(unbindXid);}}}}}

参与者通过拦截器的方式将xid拿到并且绑定到上下文中。

5.2 获取代理连接对象

一个数据库操作执行

DataSourceProxy.getConnection方法获取ConnectionProxy对象。

    @OverridepublicConnectionProxygetConnection()throwsSQLException{ConnectiontargetConnection=targetDataSource.getConnection();returnnewConnectionProxy(this,targetConnection);}

用ConnectionProxy代理默认数据源的的Connection对象。

在ConnectionProxy对象中有个非常重要的属性

    publicclassConnectionProxyextendsAbstractConnectionProxy{privateConnectionContextcontext=newConnectionContext();}

在一个数据库操作做最后事务提交的时候会通过ConnectionContext对象来判断是否是全局事务xid是否为空。

5.3 绑定XID到ConnectionContext

由于事务在提交的时候需要从ConnectionContext中获取判断是否全局事务(xid是否为空);xid是在Statement执行时进行绑定的。

执行相关SQL语句是通过StatementProxy, PreparedStatementProxy,这两个对象都是通过ConnectionProxy获取。

    publicclassPreparedStatementProxyextendsAbstractPreparedStatementProxyimplementsPreparedStatement,ParametersHolder{@OverridepublicMap<Integer,ArrayList<Object>>getParameters(){returnparameters;}publicPreparedStatementProxy(AbstractConnectionProxyconnectionProxy,PreparedStatementtargetStatement,StringtargetSQL)throwsSQLException{super(connectionProxy,targetStatement,targetSQL);}@Overridepublicbooleanexecute()throwsSQLException{returnExecuteTemplate.execute(this,(statement,args)->statement.execute());}@OverridepublicResultSetexecuteQuery()throwsSQLException{returnExecuteTemplate.execute(this,(statement,args)->statement.executeQuery());}@OverridepublicintexecuteUpdate()throwsSQLException{returnExecuteTemplate.execute(this,(statement,args)->statement.executeUpdate());}}

查看executeUpdate的执行

    publicstatic<T,SextendsStatement>Texecute(List<SQLRecognizer>sqlRecognizers,StatementProxy<S>statementProxy,StatementCallback<T,S>statementCallback,Object...args)throwsSQLException{if(!RootContext.requireGlobalLock()&&!StringUtils.equals(BranchType.AT.name(),RootContext.getBranchType())){//JustworkasoriginalstatementreturnstatementCallback.execute(statementProxy.getTargetStatement(),args);}StringdbType=statementProxy.getConnectionProxy().getDbType();if(CollectionUtils.isEmpty(sqlRecognizers)){sqlRecognizers=SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}Executor<T>executor;if(CollectionUtils.isEmpty(sqlRecognizers)){executor=newPlainExecutor<>(statementProxy,statementCallback);}else{if(sqlRecognizers.size()==1){SQLRecognizersqlRecognizer=sqlRecognizers.get(0);switch(sqlRecognizer.getSQLType()){caseINSERT:executor=EnhancedServiceLoader.load(InsertExecutor.class,dbType,newClass[]{StatementProxy.class,StatementCallback.class,SQLRecognizer.class},newObject[]{statementProxy,statementCallback,sqlRecognizer});break;caseUPDATE:executor=newUpdateExecutor<>(statementProxy,statementCallback,sqlRecognizer);break;caseDELETE:executor=newDeleteExecutor<>(statementProxy,statementCallback,sqlRecognizer);break;caseSELECT_FOR_UPDATE:executor=newSelectForUpdateExecutor<>(statementProxy,statementCallback,sqlRecognizer);break;default:executor=newPlainExecutor<>(statementProxy,statementCallback);break;}}else{executor=newMultiExecutor<>(statementProxy,statementCallback,sqlRecognizers);}}Trs;try{rs=executor.execute(args);}catch(Throwableex){if(!(exinstanceofSQLException)){//TurnotherexceptionintoSQLExceptionex=newSQLException(ex);}throw(SQLException)ex;}returnrs;}

这里的操作UpdateExecutor,DeleteExecutor,InsertExecutor(通过SPI获取实现MySQL或Oracle)他们都继承自BaseTransactionalExecutor。

    rs=executor.execute(args);//上面的execute执行BaseTransactionalExecutor.execute方法。publicclassBaseTransactionalExecutor...{@OverridepublicTexecute(Object...args)throwsThrowable{if(RootContext.inGlobalTransaction()){Stringxid=RootContext.getXID();statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());returndoExecute(args);}}
    statementProxy.getConnectionProxy().bind(xid);//该行代码将xid绑定到ConnectionProxy对象中的ConnectionContext上。publicvoidbind(Stringxid){context.bind(xid);}

到这xid已经绑定到了ConnectionProxy中的ConnectionContext中。