本文共 7219 字,大约阅读时间需要 24 分钟。
这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。
如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。
那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程中全局事务B会因为获取不到全局锁而处于等待状态。public class ConnectionProxy extends AbstractConnectionProxy { public void commit() throws SQLException { if (context.inGlobalTransaction()) { try { // 1、向TC发起注册操作并检查是否能够获取全局锁 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e); } try { if (context.hasUndoLog()) { UndoLogManager.flushUndoLogs(this); } // 2、执行本地的事务的commit操作 targetConnection.commit(); } catch (Throwable ex) { report(false); if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } report(true); context.reset(); } else { targetConnection.commit(); } } private void register() throws TransactionException { Long branchId = DataSourceManager.get().branchRegister( BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.buildLockKeys()); context.setBranchId(branchId); }}
说明:
public class DefaultCore implements Core { protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { response.setTransactionId(request.getTransactionId()); response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), XID.generateXID(request.getTransactionId()), request.getLockKey())); } public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin); BranchSession branchSession = new BranchSession(); branchSession.setTransactionId(XID.getTransactionId(xid)); branchSession.setBranchId(UUIDGenerator.generateUUID()); branchSession.setApplicationId(globalSession.getApplicationId()); branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup()); branchSession.setBranchType(branchType); branchSession.setResourceId(resourceId); branchSession.setLockKey(lockKeys); branchSession.setClientId(clientId); // 判断branchSession是否能够获取锁 if (!branchSession.lock()) { throw new TransactionException(LockKeyConflict); } try { globalSession.addBranch(branchSession); } catch (RuntimeException ex) { throw new TransactionException(FailedToAddBranch); } return branchSession.getBranchId(); } public boolean lock() throws TransactionException { return LockManagerFactory.get().acquireLock(this); }}
说明:
public class DefaultLockManagerImpl implements LockManager { public boolean acquireLock(BranchSession branchSession) throws TransactionException { String resourceId = branchSession.getResourceId(); long transactionId = branchSession.getTransactionId(); //1、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。 ConcurrentHashMap>> dbLockMap = LOCK_MAP.get(resourceId); if (dbLockMap == null) { LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap >>()); dbLockMap = LOCK_MAP.get(resourceId); } ConcurrentHashMap
说明:
private static final ConcurrentHashMap>>> LOCK_MAP = new ConcurrentHashMap >>>();
说明:
转载地址:http://uefzo.baihongyu.com/