Skip to content

扩展WorkerIdAssigner #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.settings
.project
*target/
*/target/*
.classpath
.springBeans
.idea/
*.iml
*.log
*.txt
*.xml.bak
*rebel.xml
*.DS_Store
*/logs/*
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<jdk.version>1.8</jdk.version>
<spring.version>4.2.5.RELEASE</spring.version>
<slf4j-version>1.7.7</slf4j-version>
<curator.version>2.12.0</curator.version>
</properties>

<!-- Dependencies -->
Expand Down Expand Up @@ -66,7 +67,11 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<!-- Logger -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
34 changes: 29 additions & 5 deletions src/main/java/com/baidu/fsg/uid/impl/DefaultUidGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ public void afterPropertiesSet() throws Exception {
bitsAllocator = new BitsAllocator(timeBits, workerBits, seqBits);

// initialize worker id
workerId = workerIdAssigner.assignWorkerId();
if (workerId > bitsAllocator.getMaxWorkerId()) {
throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId());
}
assignWorkerId();

LOGGER.info("Initialized bits(1, {}, {}, {}) for workerID:{}", timeBits, workerBits, seqBits, workerId);
}
Expand Down Expand Up @@ -138,7 +135,23 @@ protected synchronized long nextId() {
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
LOGGER.warn("Clock moved backwards. Refusing for {} seconds", refusedSeconds);
if (refusedSeconds <= 5) {
try {
//时间偏差大小小于5ms,则等待两倍时间
wait(refusedSeconds << 1);//wait
} catch (InterruptedException e) {
e.printStackTrace();
}
currentSecond = getCurrentSecond();
if (currentSecond < lastSecond) {//时钟回拨较大
//获取新的workerId
assignWorkerId();
}
}else {//时钟回拨较大
//获取新的workerId
assignWorkerId();
}
}

// At the same second, increase sequence
Expand Down Expand Up @@ -183,6 +196,17 @@ private long getCurrentSecond() {

return currentSecond;
}

/**
* initialize worker id
*/
private void assignWorkerId() {
// initialize worker id
workerId = workerIdAssigner.assignWorkerId();
if (workerId > bitsAllocator.getMaxWorkerId()) {
throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId());
}
}

/**
* Setters for spring property
Expand Down
106 changes: 106 additions & 0 deletions src/main/java/com/baidu/fsg/uid/worker/ZookeeperWorkerIdAssigner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* hsjry.com Inc.
* Copyright (c) 2014-2018 All Rights Reserved.
*/
package com.baidu.fsg.uid.worker;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;

import com.baidu.fsg.uid.exception.UidGenerateException;

/**
* zookeeper模式WorkerID分配器
* @author liangjf
* @version $Id: ZookeeperWorkerIdAssigner.java, v 1.0 2018年10月24日 下午3:26:50 liangjf Exp $
* @since 1.0
*/
public class ZookeeperWorkerIdAssigner implements WorkerIdAssigner {

/** 服务器地址 */
private String servers = "127.0.0.1:2181";
/** 初试时间 */
private int baseSleepTimeMs = 1000;
/** 重试次数 */
private int maxRetries = 3;
/** 会话超时时间 */
private int sessionTimeoutMs = 5000;
/** 结点路径 */
private String path = "/sq";

private CuratorFramework client;

public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public int getBaseSleepTimeMs() {
return baseSleepTimeMs;
}

public void setBaseSleepTimeMs(int baseSleepTimeMs) {
this.baseSleepTimeMs = baseSleepTimeMs;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}

public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}

public void init() {
//重试策略,初试时间1秒,重试10次
RetryPolicy policy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
//通过工厂创建Curator
client = CuratorFrameworkFactory.builder().connectString(servers)
.sessionTimeoutMs(sessionTimeoutMs).retryPolicy(policy).build();
//开启连接
client.start();
}

/**
* @see com.baidu.fsg.uid.worker.WorkerIdAssigner#assignWorkerId()
*/
public long assignWorkerId() {
init();
try {
Stat stat = client.checkExists().forPath(path);
if(stat == null) {
client.create().creatingParentsIfNeeded().forPath(path);
}
stat = client.setData().forPath(path,new byte[0]);
return stat.getVersion();
} catch (Exception e) {
throw new UidGenerateException("创建zookeeper节点失败", e);
}
}

public void close() {
client.close();
}
}
127 changes: 127 additions & 0 deletions src/test/java/com/baidu/fsg/uid/ZookeeperCachedUidGeneratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.baidu.fsg.uid;

import com.baidu.fsg.uid.impl.CachedUidGenerator;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Test for {@link CachedUidGenerator}
*
* @author yutianbao
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:uid/zoo-cached-uid-spring.xml" })
public class ZookeeperCachedUidGeneratorTest {
private static final int SIZE = 7000000; // 700w
private static final boolean VERBOSE = false;
private static final int THREADS = Runtime.getRuntime().availableProcessors() << 1;

@Resource
private UidGenerator uidGenerator;

/**
* Test for serially generate
*
* @throws IOException
*/
@Test
public void testSerialGenerate() throws IOException {
// Generate UID serially
Set<Long> uidSet = new HashSet<>(SIZE);
for (int i = 0; i < SIZE; i++) {
doGenerate(uidSet, i);
}

// Check UIDs are all unique
checkUniqueID(uidSet);
}

/**
* Test for parallel generate
*
* @throws InterruptedException
* @throws IOException
*/
@Test
public void testParallelGenerate() throws InterruptedException, IOException {
AtomicInteger control = new AtomicInteger(-1);
Set<Long> uidSet = new ConcurrentSkipListSet<>();

// Initialize threads
List<Thread> threadList = new ArrayList<>(THREADS);
for (int i = 0; i < THREADS; i++) {
Thread thread = new Thread(() -> workerRun(uidSet, control));
thread.setName("UID-generator-" + i);

threadList.add(thread);
thread.start();
}

// Wait for worker done
for (Thread thread : threadList) {
thread.join();
}

// Check generate 700w times
Assert.assertEquals(SIZE, control.get());

// Check UIDs are all unique
checkUniqueID(uidSet);
}

/**
* Woker run
*/
private void workerRun(Set<Long> uidSet, AtomicInteger control) {
for (;;) {
int myPosition = control.updateAndGet(old -> (old == SIZE ? SIZE : old + 1));
if (myPosition == SIZE) {
return;
}

doGenerate(uidSet, myPosition);
}
}

/**
* Do generating
*/
private void doGenerate(Set<Long> uidSet, int index) {
long uid = uidGenerator.getUID();
String parsedInfo = uidGenerator.parseUID(uid);
boolean existed = !uidSet.add(uid);
if (existed) {
System.out.println("Found duplicate UID " + uid);
}

// Check UID is positive, and can be parsed
Assert.assertTrue(uid > 0L);
Assert.assertTrue(StringUtils.isNotBlank(parsedInfo));

if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + " No." + index + " >>> " + parsedInfo);
}
}

/**
* Check UIDs are all unique
*/
private void checkUniqueID(Set<Long> uidSet) throws IOException {
System.out.println(uidSet.size());
Assert.assertEquals(SIZE, uidSet.size());
}

}
Loading