博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Fescar 全局锁介绍
阅读量:6455 次
发布时间:2019-06-23

本文共 7219 字,大约阅读时间需要 24 分钟。

开篇

 这篇文章的目的主要是讲解TC的在处理分支事务注册过程中对全局锁的处理流程,理解了全局锁以后才能明白对DB同一个记录进行多次变更是如何解决的。

image

 如上图所示,问最终全局事务A对资源R1应该回滚到哪种状态?很明显,如果再根据UndoLog去做回滚,就会发生严重问题:覆盖了全局事务B对资源R1的变更。

 那Fescar是如何解决这个问题呢?答案就是 Fescar的全局写排它锁解决方案,在全局事务A执行过程中全局事务B会因为获取不到全局锁而处于等待状态。

Fescar 全局锁处理流程

RM 尝试获取全局锁

public class ConnectionProxy extends AbstractConnectionProxy {    public void commit() throws SQLException {        if (context.inGlobalTransaction()) {            try {                // 1、向TC发起注册操作并检查是否能够获取全局锁                register();            } catch (TransactionException e) {                recognizeLockKeyConflictException(e);            }            try {                if (context.hasUndoLog()) {                    UndoLogManager.flushUndoLogs(this);                }                // 2、执行本地的事务的commit操作                targetConnection.commit();            } catch (Throwable ex) {                report(false);                if (ex instanceof SQLException) {                    throw (SQLException) ex;                } else {                    throw new SQLException(ex);                }            }            report(true);            context.reset();        } else {            targetConnection.commit();        }    }    private void register() throws TransactionException {        Long branchId = DataSourceManager.get().branchRegister(                BranchType.AT, getDataSourceProxy().getResourceId(),                null, context.getXid(), context.buildLockKeys());        context.setBranchId(branchId);    }}

说明:

  • RM 执行本地事务提交操作在ConnectionProxy的commit()完成。
  • commit()的过程当中按照register->commit的流程执行。
  • register()过程RM会向TC发起注册请求判断是否能够获取全局锁
  • register()通过DataSourceManager的branchRegister()操作完成。

TC处理全局锁申请流程

public class DefaultCore implements Core {    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,                                    RpcContext rpcContext) throws TransactionException {        response.setTransactionId(request.getTransactionId());        response.setBranchId(            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),                XID.generateXID(request.getTransactionId()), request.getLockKey()));    }    public Long branchRegister(BranchType branchType, String resourceId,                  String clientId, String xid, String lockKeys) throws TransactionException {        GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);        BranchSession branchSession = new BranchSession();        branchSession.setTransactionId(XID.getTransactionId(xid));        branchSession.setBranchId(UUIDGenerator.generateUUID());        branchSession.setApplicationId(globalSession.getApplicationId());        branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());        branchSession.setBranchType(branchType);        branchSession.setResourceId(resourceId);        branchSession.setLockKey(lockKeys);        branchSession.setClientId(clientId);        // 判断branchSession是否能够获取锁        if (!branchSession.lock()) {            throw new TransactionException(LockKeyConflict);        }        try {            globalSession.addBranch(branchSession);        } catch (RuntimeException ex) {            throw new TransactionException(FailedToAddBranch);        }        return branchSession.getBranchId();    }    public boolean lock() throws TransactionException {        return LockManagerFactory.get().acquireLock(this);    }}

说明:

  • TC 在处理branchRegister()的过程中会判断branchResiter请求携带的session信息能否获取全局锁。
  • branchSession.lock()判断能否获取锁,如果获取失败则抛出TransactionException(LockKeyConflict)异常。
  • branchSession.lock()判断能否获取锁,如果获取成功则将branchSession添加到全局锁当中。

TC 判断全局锁分配流程

public class DefaultLockManagerImpl implements LockManager {    public boolean acquireLock(BranchSession branchSession) throws TransactionException {        String resourceId = branchSession.getResourceId();        long transactionId = branchSession.getTransactionId();        //1、根据resourceId去LOCK_MAP获取,获取失败则新增一个空的对象。        ConcurrentHashMap
>> dbLockMap = LOCK_MAP.get(resourceId); if (dbLockMap == null) { LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap
>>()); dbLockMap = LOCK_MAP.get(resourceId); } ConcurrentHashMap
, Set
> bucketHolder = branchSession.getLockHolder(); // 2、获取branchSession的全局锁的key对象 String lockKey = branchSession.getLockKey(); if(StringUtils.isEmpty(lockKey)) { return true; } // 3、按照分号“;”切割多个LockKey,每个LockKey按照table:pk1;pk2;pk3格式组装。 String[] tableGroupedLockKeys = lockKey.split(";"); for (String tableGroupedLockKey : tableGroupedLockKeys) { int idx = tableGroupedLockKey.indexOf(":"); if (idx < 0) { branchSession.unlock(); throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey()); } // 4、分割获取branchRegister请求的表名和pks。 String tableName = tableGroupedLockKey.substring(0, idx); String mergedPKs = tableGroupedLockKey.substring(idx + 1); // 5、获取表下的已经加锁的记录tableLockMap ConcurrentHashMap
> tableLockMap = dbLockMap.get(tableName); if (tableLockMap == null) { dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap
>()); tableLockMap = dbLockMap.get(tableName); } // 6、遍历该表所有pks判断是否已加锁。 String[] pks = mergedPKs.split(","); for (String pk : pks) { // 7、同一个表的pk按照hash值进行hash分配到tableLockMap当中。 int bucketId = pk.hashCode() % BUCKET_PER_TABLE; Map
bucketLockMap = tableLockMap.get(bucketId); if (bucketLockMap == null) { tableLockMap.putIfAbsent(bucketId, new HashMap
()); bucketLockMap = tableLockMap.get(bucketId); } // 8、根据pk去获取bucketLockMap当中获取锁对象。 synchronized (bucketLockMap) { Long lockingTransactionId = bucketLockMap.get(pk); if (lockingTransactionId == null) { // No existing lock // 9、将锁添加到branchSession当中 bucketLockMap.put(pk, transactionId); Set
keysInHolder = bucketHolder.get(bucketLockMap); if (keysInHolder == null) { bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet
()); keysInHolder = bucketHolder.get(bucketLockMap); } keysInHolder.add(pk); } else if (lockingTransactionId.longValue() == transactionId) { // Locked by me continue; } else { // 直接返回异常 LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId); branchSession.unlock(); // Release all acquired locks. return false; } } } } return true; }}

说明:

  • TC 判断branchRegister操作能否获取锁按照维度层层递进判断。
  • 1、先从ResourceId的维度进行判断, LOCK_MAP.get(resourceId)。
  • 2、再从tableName的维度进行判断, dbLockMap.get(tableName)。
  • 3、再从pk的hashcode的维度进行判断,tableLockMap.get(bucketId)。
  • 4、在从pk的维度进行判断,bucketLockMap.get(pk)。
  • 5、如果按照上述维度判断后未存在加锁的branchSession就返回能够加锁,否则返回不能加锁。
  • 6、判断加锁过程中处理了幂等,如果自己已经加锁了可以再次获取锁。

TM 全局锁维护数据结构

private static final ConcurrentHashMap
>>> LOCK_MAP = new ConcurrentHashMap
>>>();

说明:

  • LOCK_MAP 作为TC全局锁的保存结构。
  • 第一层ConcurrentHashMap的key为resourceId。
  • 第二层ConcurrentHashMap的key为tableName。
  • 第三层ConcurrentHashMap的key为pk的hashCode。
  • 第四层的Map的key为pk,value为transactionId(RM携带过来)。

参考文章

Fescar源码分析连载

转载地址:http://uefzo.baihongyu.com/

你可能感兴趣的文章
企业级Nginx Web 服务优化实战
查看>>
IE和Firefox对同一域名进行请求的并发连接数限制
查看>>
arm linux ppp拨号gprs上网移植
查看>>
linux新建用户的全程解析
查看>>
微软原版Windows XP Pro With SP3 VOL MSDN原版镜像
查看>>
学点Unicode又不会死
查看>>
linux关于关闭防火墙和selinux的操作
查看>>
python 文件操作
查看>>
MCSE命令学习持续更新
查看>>
solaris11忘记root密码的处理方法
查看>>
Windows Server 2016 主域控制器搭建(一)
查看>>
如何将磁盘从GPT格式转换成MBR
查看>>
UI 架构 - 读Martin Flower相关文章总结
查看>>
Linux ---各种yum源配置详解
查看>>
CString类型转换为char类型
查看>>
DNS 在企业网络中的应用-2
查看>>
二维数组左滑删除某一行
查看>>
ChemDraw进行自动调整的步骤
查看>>
outlook收发hotmail邮件及IP被列为垃圾IP的处理办法
查看>>
静态路由
查看>>