Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

实现全局锁在 zookeeper下的持久化, 完善 ZookeeperRepository 的相关方法, 并编写相关测试用例 #362

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public final class HmilyRepositoryNode {
private static final String HMILY_PARTICIPANT_UNDO = "hmily_participant_undo";

private static final String ROOT_PATH_PREFIX = "hmily-repository";

private static final String HMILY_LOCK_GLOBAL = "hmily_lock_global";

private final String appName;

Expand All @@ -36,7 +38,7 @@ public String getRootPathPrefix() {
* @return hmily transaction root path
*/
public String getHmilyTransactionRootPath() {
return Joiner.on("/").join("", ROOT_PATH_PREFIX, HMILY_TRANSACTION_GLOBAL);
return Joiner.on("/").join("", ROOT_PATH_PREFIX, appName, HMILY_TRANSACTION_GLOBAL);
}

/**
Expand Down Expand Up @@ -86,4 +88,23 @@ public String getHmilyParticipantUndoRootPath() {
public String getHmilyParticipantUndoRealPath(final Long undoId) {
return Joiner.on("/").join(getHmilyParticipantUndoRootPath(), undoId);
}

/**
* Get hmily lock root path.
*
* @return hmily lock root path
*/
public String getHmilyLockRootPath() {
return Joiner.on("/").join("", ROOT_PATH_PREFIX, appName, HMILY_LOCK_GLOBAL);
}

/**
* Get hmily lock real path.
*
* @param lockId lock id
* @return hmily lock real path
*/
public String getHmilyLockRealPath(final String lockId) {
return Joiner.on("/").join(getHmilyLockRootPath(), lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.base.Joiner;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

import java.io.Serializable;
Expand All @@ -29,7 +28,6 @@
* @author xiaoyu
*/
@Getter
@RequiredArgsConstructor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要去掉lombok自动生成构造函数,而自己去写

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修改

@ToString
public class HmilyLock implements Serializable {

Expand All @@ -38,28 +36,39 @@ public class HmilyLock implements Serializable {
/**
* transaction id.
*/
private final Long transId;
private Long transId;

/**
* participant id.
*/
private final Long participantId;
private Long participantId;

/**
* resource id.
*/
private final String resourceId;
private String resourceId;

/**
* target table name.
*/
private final String targetTableName;
private String targetTableName;

/**
* target table pk.
*/
private final String targetTablePk;

private String targetTablePk;

public HmilyLock(final Long transId, final Long participantId, final String resourceId, final String targetTableName, final String targetTablePk) {
this.transId = transId;
this.participantId = participantId;
this.resourceId = resourceId;
this.targetTableName = targetTableName;
this.targetTablePk = targetTablePk;
}

public HmilyLock() {
}

/**
* Get lock id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@
public class ZookeeperRepository implements HmilyRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRepository.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在idea添加设置,空行也要缩进


private static final CountDownLatch LATCH = new CountDownLatch(1);

private static volatile ZooKeeper zooKeeper;

private HmilySerializer hmilySerializer;

private HmilyRepositoryNode node;
Expand Down Expand Up @@ -223,7 +223,7 @@ public int removeHmilyTransactionByDate(final Date date) {
return dateParam.after(hmilyTransaction.getUpdateTime()) && hmilyTransaction.getStatus() == HmilyActionEnum.DELETE.getCode();
}, date);
}

@Override
public int createHmilyParticipant(final HmilyParticipant hmilyParticipant) throws HmilyRepositoryException {
try {
Expand Down Expand Up @@ -288,7 +288,7 @@ public boolean existHmilyParticipantByTransId(final Long transId) {
return transIdParam.compareTo(hmilyParticipant.getTransId()) == 0;
}, transId);
}

@Override
public int updateHmilyParticipantStatus(final Long participantId, final Integer status) throws HmilyRepositoryException {
String path = node.getHmilyParticipantRealPath(participantId);
Expand Down Expand Up @@ -440,25 +440,61 @@ public int updateHmilyParticipantUndoStatus(final Long undoId, final Integer sta
}
return HmilyRepository.FAIL_ROWS;
}

@Override
public int writeHmilyLocks(final Collection<HmilyLock> locks) {
// TODO
return 0;
for (HmilyLock lock : locks) {
String path = node.getHmilyLockRealPath(lock.getLockId().replace("/", "-"));
try {
create(node.getHmilyLockRootPath());
Stat stat = zooKeeper.exists(path, false);
if (stat == null) {
zooKeeper.create(path, hmilySerializer.serialize(lock), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zooKeeper.setData(path, hmilySerializer.serialize(lock), stat.getVersion());
}
} catch (KeeperException | InterruptedException e) {
throw new HmilyException(e);
}
}
return locks.size();
}

@Override
public int releaseHmilyLocks(final Collection<HmilyLock> locks) {
// TODO
return 0;
for (HmilyLock lock : locks) {
String path = node.getHmilyLockRealPath(lock.getLockId().replace("/", "-"));
try {
if (checkPath(path, false)) {
return FAIL_ROWS;
}
zooKeeper.delete(path, -1);
} catch (InterruptedException | KeeperException e) {
LOGGER.error("removeHmilyLock occur a exception", e);
return HmilyRepository.FAIL_ROWS;
}
}
return locks.size();
}

@Override
public Optional<HmilyLock> findHmilyLockById(final String lockId) {
// TODO
String path = node.getHmilyLockRealPath(lockId.replace("/", "-"));
try {
if (checkPath(path, false)) {
return Optional.empty();
}
byte[] data = zooKeeper.getData(path, false, null);
if (data == null || data.length == 0) {
return Optional.empty();
}
return Optional.of(hmilySerializer.deSerialize(data, HmilyLock.class));
} catch (KeeperException | InterruptedException e) {
LOGGER.error("findHmilyLockById occur a exception", e);
}
return Optional.empty();
}

private void connect(final HmilyZookeeperConfig config) {
try {
zooKeeper = new ZooKeeper(config.getHost(), config.getSessionTimeOut(), watchedEvent -> {
Expand All @@ -467,9 +503,9 @@ private void connect(final HmilyZookeeperConfig config) {
}
});
LATCH.await();
Stat stat = zooKeeper.exists(node.getRootPathPrefix(), false);
Stat stat = zooKeeper.exists("/" + node.getRootPathPrefix(), false);
if (stat == null) {
zooKeeper.create(node.getRootPathPrefix(), node.getRootPathPrefix().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/" + node.getRootPathPrefix(), node.getRootPathPrefix().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new HmilyRuntimeException(e);
Expand Down Expand Up @@ -581,18 +617,18 @@ private <T> int removeByFilter(final String path, final Class<T> deserializeClas
}
return HmilyRepository.FAIL_ROWS;
}

/**
* The type Path tokenizer.
*/
static class PathTokenizer {

private String path = "";

private String[] nodes;

private int index;

/**
* Instantiates a new Path tokenizer.
*
Expand All @@ -607,7 +643,7 @@ static class PathTokenizer {
index = 1;
}
}

/**
* Next path string.
*
Expand All @@ -618,7 +654,7 @@ public String nextPath() {
index++;
return path;
}

/**
* Has next boolean.
*
Expand All @@ -628,14 +664,14 @@ public boolean hasNext() {
return index < nodes.length;
}
}

/**
* The interface Filter.
*
* @param <T> the type parameter
*/
interface Filter<T> {

/**
* Filter boolean.
*
Expand Down
Loading