From 229c299bfa52ff07c4802930d934770e45c67d80 Mon Sep 17 00:00:00 2001 From: shenhd Date: Mon, 15 Aug 2022 20:00:55 +0800 Subject: [PATCH] init repo --- .gitignore | 140 +++---- pom.xml | 288 +++++++-------- .../java/com/dulio/demo/raft/RaftDemo.java | 56 --- .../demo/raft/counter/CounterClient.java | 210 +++++------ .../demo/raft/counter/CounterClosure.java | 120 +++--- .../demo/raft/counter/CounterOperation.java | 124 +++---- .../demo/raft/counter/CounterServer.java | 278 +++++++------- .../demo/raft/counter/CounterService.java | 72 ++-- .../demo/raft/counter/CounterServiceImpl.java | 248 ++++++------- .../raft/counter/CounterStateMachine.java | 346 +++++++++--------- .../raft/counter/rpc/GetValueRequest.java | 82 ++--- .../counter/rpc/GetValueRequestProcessor.java | 114 +++--- .../counter/rpc/IncrementAndGetRequest.java | 82 ++--- .../rpc/IncrementAndGetRequestProcessor.java | 114 +++--- .../demo/raft/counter/rpc/ValueResponse.java | 184 +++++----- .../counter/snapshot/CounterSnapshotFile.java | 138 +++---- .../dulio/demo/raft/tsdb/TsdbMetaServer.java | 141 +++++++ .../demo/raft/tsdb/TsdbMetaStateMachine.java | 298 +++++++-------- .../demo/raft/tsdb/model/TsdbMetaKey.java | 90 ++--- .../demo/raft/tsdb/model/TsdbMetaValue.java | 36 +- .../tsdb/snapshot/TsdbMetaSnapshotFile.java | 143 ++++---- src/main/resources/log4j.properties | 20 +- src/main/resources/log4j2.xml | 224 ++++++------ 23 files changed, 1815 insertions(+), 1733 deletions(-) delete mode 100644 src/main/java/com/dulio/demo/raft/RaftDemo.java create mode 100644 src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaServer.java diff --git a/.gitignore b/.gitignore index cd4cf3b..b342c4e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,70 +1,70 @@ -*.iml -data/*.log* -.idea/ - -#src/test/ -.idea -.idea/* -target -target/classes/com/dulio/demo/logging/ -target/classes/log4j.properties -target/generated-sources/ -### Java template -*.class - -# BlueJ files -*.ctxt - -# Mobile Tools for Java (J2ME) -.mtj.tmp/ - -# Package Files # -*.jar -*.war -*.ear - -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* -### JetBrains template -# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm -# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 - -# User-specific stuff: -.idea/workspace.xml -.idea/tasks.xml - -# Sensitive or high-churn files: -.idea/dataSources/ -.idea/dataSources.ids -.idea/dataSources.xml -.idea/dataSources.local.xml -.idea/sqlDataSources.xml -.idea/dynamic.xml -.idea/uiDesigner.xml - -# Gradle: -.idea/gradle.xml -.idea/libraries - -# Mongo Explorer plugin: -.idea/mongoSettings.xml - -## File-based project format: -*.iws - -## Plugin-specific files: - -# IntelliJ -/out/ - -# mpeltonen/sbt-idea plugin -.idea_modules/ - -# JIRA plugin -atlassian-ide-plugin.xml - -# Crashlytics plugin (for Android Studio and IntelliJ) -com_crashlytics_export_strings.xml -crashlytics.properties -crashlytics-build.properties -fabric.properties +*.iml +data/*.log* +.idea/ + +#src/test/ +.idea +.idea/* +target +target/classes/com/dulio/demo/logging/ +target/classes/log4j.properties +target/generated-sources/ +### Java template +*.class + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/workspace.xml +.idea/tasks.xml + +# Sensitive or high-churn files: +.idea/dataSources/ +.idea/dataSources.ids +.idea/dataSources.xml +.idea/dataSources.local.xml +.idea/sqlDataSources.xml +.idea/dynamic.xml +.idea/uiDesigner.xml + +# Gradle: +.idea/gradle.xml +.idea/libraries + +# Mongo Explorer plugin: +.idea/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties diff --git a/pom.xml b/pom.xml index 402e34d..e95a31d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,145 +1,145 @@ - - - 4.0.0 - - com.dulio.demo - raft-demo - 1.0.0 - - - - - - - - com.alipay.sofa - jraft-core - 1.3.7 - - - com.alipay.sofa - jraft-rheakv-core - 1.3.7 - - - - com.google.code.findbugs - jsr305 - 3.0.2 - - - - com.alipay.sofa - bolt - 1.6.2 - - - com.alipay.sofa - hessian - 3.3.6 - - - - com.lmax - disruptor - 3.3.7 - - - commons-io - commons-io - 2.4 - - - commons-lang - commons-lang - 2.6 - - - - - - - - - com.google.protobuf - protobuf-java - 3.5.1 - - - - io.protostuff - protostuff-core - 1.6.0 - - - io.protostuff - protostuff-runtime - 1.6.0 - - - - org.rocksdb - rocksdbjni - 5.18.4 - - - - net.openhft - affinity - 3.1.7 - - - - io.dropwizard.metrics - metrics-core - 4.0.2 - - - - com.google.code.gson - gson - 2.8.7 - - - - - org.slf4j - slf4j-api - 1.7.21 - - - org.slf4j - slf4j-simple - 1.6.6 - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.4.3 - - - package - - shade - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.3 - - 1.8 - 1.8 - - - - + + + 4.0.0 + + com.dulio.demo + raft-demo + 1.0.0 + + + + + + + + com.alipay.sofa + jraft-core + 1.3.7 + + + com.alipay.sofa + jraft-rheakv-core + 1.3.7 + + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + + com.alipay.sofa + bolt + 1.6.2 + + + com.alipay.sofa + hessian + 3.3.6 + + + + com.lmax + disruptor + 3.3.7 + + + commons-io + commons-io + 2.4 + + + commons-lang + commons-lang + 2.6 + + + + + + + + + com.google.protobuf + protobuf-java + 3.5.1 + + + + io.protostuff + protostuff-core + 1.6.0 + + + io.protostuff + protostuff-runtime + 1.6.0 + + + + org.rocksdb + rocksdbjni + 5.18.4 + + + + net.openhft + affinity + 3.1.7 + + + + io.dropwizard.metrics + metrics-core + 4.0.2 + + + + com.google.code.gson + gson + 2.8.7 + + + + + org.slf4j + slf4j-api + 1.7.21 + + + org.slf4j + slf4j-simple + 1.6.6 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.3 + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + \ No newline at end of file diff --git a/src/main/java/com/dulio/demo/raft/RaftDemo.java b/src/main/java/com/dulio/demo/raft/RaftDemo.java deleted file mode 100644 index 1265d4f..0000000 --- a/src/main/java/com/dulio/demo/raft/RaftDemo.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.dulio.demo.raft; - -import com.alipay.sofa.jraft.*; -import com.alipay.sofa.jraft.closure.TaskClosure; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.codahale.metrics.ConsoleReporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -public class RaftDemo { - private static final Logger LOGGER = LoggerFactory.getLogger(RaftDemo.class); - - public static void main(String[] args) { -// Endpoint addr = JRaftUtils.getEndPoint("localhost:8080"); -// PeerId peer = JRaftUtils.getPeerId("localhost:8080"); -// // 三个节点组成的 raft group 配置,注意节点之间用逗号隔开 -// Configuration conf = JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"); - - String groupId = "jraft"; - PeerId serverId = JRaftUtils.getPeerId("localhost:8080"); - NodeOptions nodeOptions = new NodeOptions(); // 配置 node options - nodeOptions.setEnableMetrics(true); - - RaftGroupService cluster = new RaftGroupService(groupId, serverId, nodeOptions); - Node node = cluster.start(); - - // 使用 node 提交任务 - Closure done = new TaskClosure() { - @Override - public void onCommitted() { - LOGGER.info("Task Closure committed"); - } - - @Override - public void run(Status status) { - LOGGER.info("status={}", status); - } - }; - Task task = new Task(); - task.setData(ByteBuffer.wrap("hello".getBytes())); - task.setDone(done); - node.apply(task); - - // 将指标定期 30 秒间隔输出到 console - ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry()) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - reporter.start(30, TimeUnit.SECONDS); - } -} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterClient.java b/src/main/java/com/dulio/demo/raft/counter/CounterClient.java index 7d36a5d..a8d4878 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterClient.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterClient.java @@ -1,105 +1,105 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import com.alipay.sofa.jraft.RouteTable; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.error.RemotingException; -import com.alipay.sofa.jraft.option.CliOptions; -import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; -import com.dulio.demo.raft.counter.rpc.IncrementAndGetRequest; -import com.dulio.demo.raft.counter.rpc.ValueResponse; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; - -public class CounterClient { - private static final Queue globalQueue = new ConcurrentLinkedQueue<>(); - - public static void main(final String[] args) throws Exception { - if (args.length != 2) { - System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}"); - System.out - .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); - System.exit(1); - } - final String groupId = args[0]; - final String confStr = args[1]; - - final Configuration conf = new Configuration(); - if (!conf.parse(confStr)) { - throw new IllegalArgumentException("Fail to parse conf:" + confStr); - } - - RouteTable.getInstance().updateConfiguration(groupId, conf); - - final CliClientServiceImpl cliClientService = new CliClientServiceImpl(); - cliClientService.init(new CliOptions()); - - if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) { - throw new IllegalStateException("Refresh leader failed"); - } - - final PeerId leader = RouteTable.getInstance().selectLeader(groupId); - System.out.println("Leader is " + leader); - final int n = 1; - final CountDownLatch latch = new CountDownLatch(n); - final long start = System.currentTimeMillis(); - for (int i = 0; i < n; i++) { - incrementAndGet(cliClientService, leader, 1, latch); - } - latch.await(); - List data = new ArrayList<>(globalQueue); - System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms."); - System.out.println(n + " min, max : " + Collections.min(data) +","+ Collections.max(data)); - System.exit(0); - } - - private static void incrementAndGet(final CliClientServiceImpl cliClientService, final PeerId leader, - final long delta, CountDownLatch latch) throws RemotingException, - InterruptedException { - final IncrementAndGetRequest request = new IncrementAndGetRequest(); - request.setDelta(delta); - cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { - - @Override - public void complete(Object result, Throwable err) { - if (err == null) { - latch.countDown(); - globalQueue.add(((ValueResponse) result).getValue()); - //System.out.println("incrementAndGet result:" + result); - } else { - err.printStackTrace(); - latch.countDown(); - } - } - - @Override - public Executor executor() { - return null; - } - }, 5000); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import com.alipay.sofa.jraft.RouteTable; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; +import com.dulio.demo.raft.counter.rpc.IncrementAndGetRequest; +import com.dulio.demo.raft.counter.rpc.ValueResponse; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; + +public class CounterClient { + private static final Queue globalQueue = new ConcurrentLinkedQueue<>(); + + public static void main(final String[] args) throws Exception { + if (args.length != 2) { + System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}"); + System.out + .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); + System.exit(1); + } + final String groupId = args[0]; + final String confStr = args[1]; + + final Configuration conf = new Configuration(); + if (!conf.parse(confStr)) { + throw new IllegalArgumentException("Fail to parse conf:" + confStr); + } + + RouteTable.getInstance().updateConfiguration(groupId, conf); + + final CliClientServiceImpl cliClientService = new CliClientServiceImpl(); + cliClientService.init(new CliOptions()); + + if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) { + throw new IllegalStateException("Refresh leader failed"); + } + + final PeerId leader = RouteTable.getInstance().selectLeader(groupId); + System.out.println("Leader is " + leader); + final int n = 1; + final CountDownLatch latch = new CountDownLatch(n); + final long start = System.currentTimeMillis(); + for (int i = 0; i < n; i++) { + incrementAndGet(cliClientService, leader, 1, latch); + } + latch.await(); + List data = new ArrayList<>(globalQueue); + System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms."); + System.out.println(n + " min, max : " + Collections.min(data) +","+ Collections.max(data)); + System.exit(0); + } + + private static void incrementAndGet(final CliClientServiceImpl cliClientService, final PeerId leader, + final long delta, CountDownLatch latch) throws RemotingException, + InterruptedException { + final IncrementAndGetRequest request = new IncrementAndGetRequest(); + request.setDelta(delta); + cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() { + + @Override + public void complete(Object result, Throwable err) { + if (err == null) { + latch.countDown(); + globalQueue.add(((ValueResponse) result).getValue()); + //System.out.println("incrementAndGet result:" + result); + } else { + err.printStackTrace(); + latch.countDown(); + } + } + + @Override + public Executor executor() { + return null; + } + }, 5000); + } + +} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterClosure.java b/src/main/java/com/dulio/demo/raft/counter/CounterClosure.java index b3b0207..0e26ea4 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterClosure.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterClosure.java @@ -1,60 +1,60 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import com.alipay.sofa.jraft.Closure; -import com.dulio.demo.raft.counter.rpc.ValueResponse; - -/** - * @author likun (saimu.msm@antfin.com) - */ -public abstract class CounterClosure implements Closure { - - private ValueResponse valueResponse; - private CounterOperation counterOperation; - - public void setCounterOperation(CounterOperation counterOperation) { - this.counterOperation = counterOperation; - } - - public CounterOperation getCounterOperation() { - return counterOperation; - } - - public ValueResponse getValueResponse() { - return valueResponse; - } - - public void setValueResponse(ValueResponse valueResponse) { - this.valueResponse = valueResponse; - } - - protected void failure(final String errorMsg, final String redirect) { - final ValueResponse response = new ValueResponse(); - response.setSuccess(false); - response.setErrorMsg(errorMsg); - response.setRedirect(redirect); - setValueResponse(response); - } - - protected void success(final long value) { - final ValueResponse response = new ValueResponse(); - response.setValue(value); - response.setSuccess(true); - setValueResponse(response); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import com.alipay.sofa.jraft.Closure; +import com.dulio.demo.raft.counter.rpc.ValueResponse; + +/** + * @author likun (saimu.msm@antfin.com) + */ +public abstract class CounterClosure implements Closure { + + private ValueResponse valueResponse; + private CounterOperation counterOperation; + + public void setCounterOperation(CounterOperation counterOperation) { + this.counterOperation = counterOperation; + } + + public CounterOperation getCounterOperation() { + return counterOperation; + } + + public ValueResponse getValueResponse() { + return valueResponse; + } + + public void setValueResponse(ValueResponse valueResponse) { + this.valueResponse = valueResponse; + } + + protected void failure(final String errorMsg, final String redirect) { + final ValueResponse response = new ValueResponse(); + response.setSuccess(false); + response.setErrorMsg(errorMsg); + response.setRedirect(redirect); + setValueResponse(response); + } + + protected void success(final long value) { + final ValueResponse response = new ValueResponse(); + response.setValue(value); + response.setSuccess(true); + setValueResponse(response); + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterOperation.java b/src/main/java/com/dulio/demo/raft/counter/CounterOperation.java index 6dd8970..b98992f 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterOperation.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterOperation.java @@ -1,62 +1,62 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import java.io.Serializable; - -/** - * The counter operation - * - * @author likun (saimu.msm@antfin.com) - */ -public class CounterOperation implements Serializable { - - private static final long serialVersionUID = -6597003954824547294L; - - /** Get value */ - public static final byte GET = 0x01; - /** Increment and get value */ - public static final byte INCREMENT = 0x02; - - private byte op; - private long delta; - - public static CounterOperation createGet() { - return new CounterOperation(GET); - } - - public static CounterOperation createIncrement(final long delta) { - return new CounterOperation(INCREMENT, delta); - } - - public CounterOperation(byte op) { - this(op, 0); - } - - public CounterOperation(byte op, long delta) { - this.op = op; - this.delta = delta; - } - - public byte getOp() { - return op; - } - - public long getDelta() { - return delta; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import java.io.Serializable; + +/** + * The counter operation + * + * @author likun (saimu.msm@antfin.com) + */ +public class CounterOperation implements Serializable { + + private static final long serialVersionUID = -6597003954824547294L; + + /** Get value */ + public static final byte GET = 0x01; + /** Increment and get value */ + public static final byte INCREMENT = 0x02; + + private byte op; + private long delta; + + public static CounterOperation createGet() { + return new CounterOperation(GET); + } + + public static CounterOperation createIncrement(final long delta) { + return new CounterOperation(INCREMENT, delta); + } + + public CounterOperation(byte op) { + this(op, 0); + } + + public CounterOperation(byte op, long delta) { + this.op = op; + this.delta = delta; + } + + public byte getOp() { + return op; + } + + public long getDelta() { + return delta; + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterServer.java b/src/main/java/com/dulio/demo/raft/counter/CounterServer.java index eae9eff..d09cdb1 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterServer.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterServer.java @@ -1,139 +1,139 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import java.io.File; -import java.io.IOException; - -import org.apache.commons.io.FileUtils; - -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.dulio.demo.raft.counter.rpc.GetValueRequestProcessor; -import com.dulio.demo.raft.counter.rpc.IncrementAndGetRequestProcessor; -import com.dulio.demo.raft.counter.rpc.ValueResponse; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; -import com.alipay.sofa.jraft.rpc.RpcServer; - -/** - * Counter server that keeps a counter value in a raft group. - * @author boyan (boyan@alibaba-inc.com) - * 2018-Apr-09 4:51:02 PM - */ -public class CounterServer { - - private RaftGroupService raftGroupService; - private Node node; - private CounterStateMachine fsm; - - public CounterServer(final String dataPath, final String groupId, final PeerId serverId, - final NodeOptions nodeOptions) throws IOException { - // 初始化路径 - FileUtils.forceMkdir(new File(dataPath)); - - // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开 - final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); - // 注册业务处理器 - CounterService counterService = new CounterServiceImpl(this); - rpcServer.registerProcessor(new GetValueRequestProcessor(counterService)); - rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService)); - // 初始化状态机 - this.fsm = new CounterStateMachine(); - // 设置状态机到启动参数 - nodeOptions.setFsm(this.fsm); - // 设置存储路径 - // 日志, 必须 - nodeOptions.setLogUri(dataPath + File.separator + "log"); - // 元信息, 必须 - nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); - // snapshot, 可选, 一般都推荐 - nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); - // 初始化 raft group 服务框架 - this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); - // 启动 - this.node = this.raftGroupService.start(); - } - - public CounterStateMachine getFsm() { - return this.fsm; - } - - public Node getNode() { - return this.node; - } - - public RaftGroupService RaftGroupService() { - return this.raftGroupService; - } - - /** - * Redirect request to new leader - */ - public ValueResponse redirect() { - final ValueResponse response = new ValueResponse(); - response.setSuccess(false); - if (this.node != null) { - final PeerId leader = this.node.getLeaderId(); - if (leader != null) { - response.setRedirect(leader.toString()); - } - } - return response; - } - - public static void main(final String[] args) throws IOException { - if (args.length != 4) { - System.out - .println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}"); - System.out - .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); - System.exit(1); - } - final String dataPath = args[0]; - final String groupId = args[1]; - final String serverIdStr = args[2]; - final String initConfStr = args[3]; - - final NodeOptions nodeOptions = new NodeOptions(); - // 为了测试,调整 snapshot 间隔等参数 - // 设置选举超时时间为 1 秒 - nodeOptions.setElectionTimeoutMs(1000); - // 关闭 CLI 服务。 - nodeOptions.setDisableCli(false); - // 每隔30秒做一次 snapshot - nodeOptions.setSnapshotIntervalSecs(30); - // 解析参数 - final PeerId serverId = new PeerId(); - if (!serverId.parse(serverIdStr)) { - throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr); - } - final Configuration initConf = new Configuration(); - if (!initConf.parse(initConfStr)) { - throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr); - } - // 设置初始集群配置 - nodeOptions.setInitialConf(initConf); - - // 启动 - final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions); - System.out.println("Started counter server at port:" - + counterServer.getNode().getNodeId().getPeerId().getPort()); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.dulio.demo.raft.counter.rpc.GetValueRequestProcessor; +import com.dulio.demo.raft.counter.rpc.IncrementAndGetRequestProcessor; +import com.dulio.demo.raft.counter.rpc.ValueResponse; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; + +/** + * Counter server that keeps a counter value in a raft group. + * @author boyan (boyan@alibaba-inc.com) + * 2018-Apr-09 4:51:02 PM + */ +public class CounterServer { + + private RaftGroupService raftGroupService; + private Node node; + private CounterStateMachine fsm; + + public CounterServer(final String dataPath, final String groupId, final PeerId serverId, + final NodeOptions nodeOptions) throws IOException { + // 初始化路径 + FileUtils.forceMkdir(new File(dataPath)); + + // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开 + final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + // 注册业务处理器 + CounterService counterService = new CounterServiceImpl(this); + rpcServer.registerProcessor(new GetValueRequestProcessor(counterService)); + rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService)); + // 初始化状态机 + this.fsm = new CounterStateMachine(); + // 设置状态机到启动参数 + nodeOptions.setFsm(this.fsm); + // 设置存储路径 + // 日志, 必须 + nodeOptions.setLogUri(dataPath + File.separator + "log"); + // 元信息, 必须 + nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); + // snapshot, 可选, 一般都推荐 + nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); + // 初始化 raft group 服务框架 + this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); + // 启动 + this.node = this.raftGroupService.start(); + } + + public CounterStateMachine getFsm() { + return this.fsm; + } + + public Node getNode() { + return this.node; + } + + public RaftGroupService RaftGroupService() { + return this.raftGroupService; + } + + /** + * Redirect request to new leader + */ + public ValueResponse redirect() { + final ValueResponse response = new ValueResponse(); + response.setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + response.setRedirect(leader.toString()); + } + } + return response; + } + + public static void main(final String[] args) throws IOException { + if (args.length != 4) { + System.out + .println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}"); + System.out + .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); + System.exit(1); + } + final String dataPath = args[0]; + final String groupId = args[1]; + final String serverIdStr = args[2]; + final String initConfStr = args[3]; + + final NodeOptions nodeOptions = new NodeOptions(); + // 为了测试,调整 snapshot 间隔等参数 + // 设置选举超时时间为 1 秒 + nodeOptions.setElectionTimeoutMs(1000); + // 关闭 CLI 服务。 + nodeOptions.setDisableCli(false); + // 每隔30秒做一次 snapshot + nodeOptions.setSnapshotIntervalSecs(30); + // 解析参数 + final PeerId serverId = new PeerId(); + if (!serverId.parse(serverIdStr)) { + throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr); + } + final Configuration initConf = new Configuration(); + if (!initConf.parse(initConfStr)) { + throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr); + } + // 设置初始集群配置 + nodeOptions.setInitialConf(initConf); + + // 启动 + final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions); + System.out.println("Started counter server at port:" + + counterServer.getNode().getNodeId().getPeerId().getPort()); + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterService.java b/src/main/java/com/dulio/demo/raft/counter/CounterService.java index d8062e1..4cd66ee 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterService.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterService.java @@ -1,37 +1,37 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -/** - * The counter service supporting query and count function. - * - * @author likun (saimu.msm@antfin.com) - */ -public interface CounterService { - - /** - * Get current value from counter - * - * Provide consistent reading if {@code readOnlySafe} is true. - */ - void get(final boolean readOnlySafe, final CounterClosure closure); - - /** - * Add delta to counter then get value - */ - void incrementAndGet(final long delta, final CounterClosure closure); +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +/** + * The counter service supporting query and count function. + * + * @author likun (saimu.msm@antfin.com) + */ +public interface CounterService { + + /** + * Get current value from counter + * + * Provide consistent reading if {@code readOnlySafe} is true. + */ + void get(final boolean readOnlySafe, final CounterClosure closure); + + /** + * Add delta to counter then get value + */ + void incrementAndGet(final long delta, final CounterClosure closure); } \ No newline at end of file diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterServiceImpl.java b/src/main/java/com/dulio/demo/raft/counter/CounterServiceImpl.java index 3cbb574..597002b 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterServiceImpl.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterServiceImpl.java @@ -1,124 +1,124 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import java.nio.ByteBuffer; -import java.util.concurrent.Executor; - -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.closure.ReadIndexClosure; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.rhea.StoreEngineHelper; -import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions; -import com.alipay.sofa.jraft.util.BytesUtil; - -/** - * @author likun (saimu.msm@antfin.com) - */ -public class CounterServiceImpl implements CounterService { - private static final Logger LOG = LoggerFactory.getLogger(CounterServiceImpl.class); - - private final CounterServer counterServer; - private final Executor readIndexExecutor; - - public CounterServiceImpl(CounterServer counterServer) { - this.counterServer = counterServer; - this.readIndexExecutor = createReadIndexExecutor(); - } - - private Executor createReadIndexExecutor() { - final StoreEngineOptions opts = new StoreEngineOptions(); - return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); - } - - @Override - public void get(final boolean readOnlySafe, final CounterClosure closure) { - if (!readOnlySafe) { - closure.success(getValue()); - closure.run(Status.OK()); - return; - } - - this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - if (status.isOk()) { - closure.success(getValue()); - closure.run(Status.OK()); - return; - } - CounterServiceImpl.this.readIndexExecutor.execute(() -> { - if (isLeader()) { - LOG.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyOperation(CounterOperation.createGet(), closure); - } else { - handlerNotLeaderError(closure); - } - }); - } - }); - } - - private boolean isLeader() { - return this.counterServer.getFsm().isLeader(); - } - - private long getValue() { - return this.counterServer.getFsm().getValue(); - } - - private String getRedirect() { - return this.counterServer.redirect().getRedirect(); - } - - @Override - public void incrementAndGet(final long delta, final CounterClosure closure) { - applyOperation(CounterOperation.createIncrement(delta), closure); - } - - private void applyOperation(final CounterOperation op, final CounterClosure closure) { - if (!isLeader()) { - handlerNotLeaderError(closure); - return; - } - - try { - closure.setCounterOperation(op); - final Task task = new Task(); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); - task.setDone(closure); - this.counterServer.getNode().apply(task); - } catch (CodecException e) { - String errorMsg = "Fail to encode CounterOperation"; - LOG.error(errorMsg, e); - closure.failure(errorMsg, StringUtils.EMPTY); - closure.run(new Status(RaftError.EINTERNAL, errorMsg)); - } - } - - private void handlerNotLeaderError(final CounterClosure closure) { - closure.failure("Not leader.", getRedirect()); - closure.run(new Status(RaftError.EPERM, "Not leader")); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import java.nio.ByteBuffer; +import java.util.concurrent.Executor; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.closure.ReadIndexClosure; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.rhea.StoreEngineHelper; +import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions; +import com.alipay.sofa.jraft.util.BytesUtil; + +/** + * @author likun (saimu.msm@antfin.com) + */ +public class CounterServiceImpl implements CounterService { + private static final Logger LOG = LoggerFactory.getLogger(CounterServiceImpl.class); + + private final CounterServer counterServer; + private final Executor readIndexExecutor; + + public CounterServiceImpl(CounterServer counterServer) { + this.counterServer = counterServer; + this.readIndexExecutor = createReadIndexExecutor(); + } + + private Executor createReadIndexExecutor() { + final StoreEngineOptions opts = new StoreEngineOptions(); + return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); + } + + @Override + public void get(final boolean readOnlySafe, final CounterClosure closure) { + if (!readOnlySafe) { + closure.success(getValue()); + closure.run(Status.OK()); + return; + } + + this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + @Override + public void run(Status status, long index, byte[] reqCtx) { + if (status.isOk()) { + closure.success(getValue()); + closure.run(Status.OK()); + return; + } + CounterServiceImpl.this.readIndexExecutor.execute(() -> { + if (isLeader()) { + LOG.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); + applyOperation(CounterOperation.createGet(), closure); + } else { + handlerNotLeaderError(closure); + } + }); + } + }); + } + + private boolean isLeader() { + return this.counterServer.getFsm().isLeader(); + } + + private long getValue() { + return this.counterServer.getFsm().getValue(); + } + + private String getRedirect() { + return this.counterServer.redirect().getRedirect(); + } + + @Override + public void incrementAndGet(final long delta, final CounterClosure closure) { + applyOperation(CounterOperation.createIncrement(delta), closure); + } + + private void applyOperation(final CounterOperation op, final CounterClosure closure) { + if (!isLeader()) { + handlerNotLeaderError(closure); + return; + } + + try { + closure.setCounterOperation(op); + final Task task = new Task(); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); + task.setDone(closure); + this.counterServer.getNode().apply(task); + } catch (CodecException e) { + String errorMsg = "Fail to encode CounterOperation"; + LOG.error(errorMsg, e); + closure.failure(errorMsg, StringUtils.EMPTY); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); + } + } + + private void handlerNotLeaderError(final CounterClosure closure) { + closure.failure("Not leader.", getRedirect()); + closure.run(new Status(RaftError.EPERM, "Not leader")); + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/CounterStateMachine.java b/src/main/java/com/dulio/demo/raft/counter/CounterStateMachine.java index d61f021..697d3ca 100644 --- a/src/main/java/com/dulio/demo/raft/counter/CounterStateMachine.java +++ b/src/main/java/com/dulio/demo/raft/counter/CounterStateMachine.java @@ -1,173 +1,173 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter; - -import static com.dulio.demo.raft.counter.CounterOperation.GET; -import static com.dulio.demo.raft.counter.CounterOperation.INCREMENT; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.Iterator; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.core.StateMachineAdapter; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.error.RaftException; -import com.dulio.demo.raft.counter.snapshot.CounterSnapshotFile; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; -import com.alipay.sofa.jraft.util.Utils; - -/** - * Counter state machine. - * - * @author boyan (boyan@alibaba-inc.com) - *

- * 2018-Apr-09 4:52:31 PM - */ -public class CounterStateMachine extends StateMachineAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class); - - /** - * Counter value - */ - private final AtomicLong value = new AtomicLong(0); - /** - * Leader term - */ - private final AtomicLong leaderTerm = new AtomicLong(-1); - - public boolean isLeader() { - return this.leaderTerm.get() > 0; - } - - /** - * Returns current value. - */ - public long getValue() { - return this.value.get(); - } - - @Override - public void onApply(final Iterator iter) { - while (iter.hasNext()) { - long current = 0; - CounterOperation counterOperation = null; - - CounterClosure closure = null; - if (iter.done() != null) { - // This task is applied by this node, get value from closure to avoid additional parsing. - closure = (CounterClosure) iter.done(); - counterOperation = closure.getCounterOperation(); - } else { - // Have to parse FetchAddRequest from this user log. - final ByteBuffer data = iter.getData(); - try { - counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), CounterOperation.class.getName()); - } catch (final CodecException e) { - LOG.error("Fail to decode IncrementAndGetRequest", e); - } - } - if (counterOperation != null) { - switch (counterOperation.getOp()) { - case GET: - current = this.value.get(); - LOG.info("Get value={} at logIndex={}", current, iter.getIndex()); - break; - case INCREMENT: - final long delta = counterOperation.getDelta(); - final long prev = this.value.get(); - current = this.value.addAndGet(delta); - LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); - break; - } - - if (closure != null) { - closure.success(current); - closure.run(Status.OK()); - } - } - iter.next(); - } - } - - @Override - public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { - final long currVal = this.value.get(); - Utils.runInThread(() -> { - final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data"); - if (snapshot.save(currVal)) { - if (writer.addFile("data")) { - done.run(Status.OK()); - } else { - done.run(new Status(RaftError.EIO, "Fail to add file to writer")); - } - } else { - done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); - } - }); - } - - @Override - public void onError(final RaftException e) { - LOG.error("Raft error: {}", e, e); - } - - @Override - public boolean onSnapshotLoad(final SnapshotReader reader) { - if (isLeader()) { - LOG.warn("Leader is not supposed to load snapshot"); - return false; - } - if (reader.getFileMeta("data") == null) { - LOG.error("Fail to find data file in {}", reader.getPath()); - return false; - } - final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data"); - try { - this.value.set(snapshot.load()); - return true; - } catch (final IOException e) { - LOG.error("Fail to load snapshot from {}", snapshot.getPath()); - return false; - } - - } - - @Override - public void onLeaderStart(final long term) { - this.leaderTerm.set(term); - super.onLeaderStart(term); - - } - - @Override - public void onLeaderStop(final Status status) { - this.leaderTerm.set(-1); - super.onLeaderStop(status); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter; + +import static com.dulio.demo.raft.counter.CounterOperation.GET; +import static com.dulio.demo.raft.counter.CounterOperation.INCREMENT; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RaftException; +import com.dulio.demo.raft.counter.snapshot.CounterSnapshotFile; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.alipay.sofa.jraft.util.Utils; + +/** + * Counter state machine. + * + * @author boyan (boyan@alibaba-inc.com) + *

+ * 2018-Apr-09 4:52:31 PM + */ +public class CounterStateMachine extends StateMachineAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class); + + /** + * Counter value + */ + private final AtomicLong value = new AtomicLong(0); + /** + * Leader term + */ + private final AtomicLong leaderTerm = new AtomicLong(-1); + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + /** + * Returns current value. + */ + public long getValue() { + return this.value.get(); + } + + @Override + public void onApply(final Iterator iter) { + while (iter.hasNext()) { + long current = 0; + CounterOperation counterOperation = null; + + CounterClosure closure = null; + if (iter.done() != null) { + // This task is applied by this node, get value from closure to avoid additional parsing. + closure = (CounterClosure) iter.done(); + counterOperation = closure.getCounterOperation(); + } else { + // Have to parse FetchAddRequest from this user log. + final ByteBuffer data = iter.getData(); + try { + counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), CounterOperation.class.getName()); + } catch (final CodecException e) { + LOG.error("Fail to decode IncrementAndGetRequest", e); + } + } + if (counterOperation != null) { + switch (counterOperation.getOp()) { + case GET: + current = this.value.get(); + LOG.info("Get value={} at logIndex={}", current, iter.getIndex()); + break; + case INCREMENT: + final long delta = counterOperation.getDelta(); + final long prev = this.value.get(); + current = this.value.addAndGet(delta); + LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); + break; + } + + if (closure != null) { + closure.success(current); + closure.run(Status.OK()); + } + } + iter.next(); + } + } + + @Override + public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { + final long currVal = this.value.get(); + Utils.runInThread(() -> { + final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data"); + if (snapshot.save(currVal)) { + if (writer.addFile("data")) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + } + } else { + done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); + } + }); + } + + @Override + public void onError(final RaftException e) { + LOG.error("Raft error: {}", e, e); + } + + @Override + public boolean onSnapshotLoad(final SnapshotReader reader) { + if (isLeader()) { + LOG.warn("Leader is not supposed to load snapshot"); + return false; + } + if (reader.getFileMeta("data") == null) { + LOG.error("Fail to find data file in {}", reader.getPath()); + return false; + } + final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data"); + try { + this.value.set(snapshot.load()); + return true; + } catch (final IOException e) { + LOG.error("Fail to load snapshot from {}", snapshot.getPath()); + return false; + } + + } + + @Override + public void onLeaderStart(final long term) { + this.leaderTerm.set(term); + super.onLeaderStart(term); + + } + + @Override + public void onLeaderStop(final Status status) { + this.leaderTerm.set(-1); + super.onLeaderStop(status); + } + +} diff --git a/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequest.java b/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequest.java index ca4368f..6d68d62 100644 --- a/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequest.java +++ b/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequest.java @@ -1,41 +1,41 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.rpc; - -import java.io.Serializable; - -/** - * Get the latest value request. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 4:54:17 PM - */ -public class GetValueRequest implements Serializable { - - private static final long serialVersionUID = 9218253805003988802L; - - private boolean readOnlySafe = true; - - public boolean isReadOnlySafe() { - return readOnlySafe; - } - - public void setReadOnlySafe(boolean readOnlySafe) { - this.readOnlySafe = readOnlySafe; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.rpc; + +import java.io.Serializable; + +/** + * Get the latest value request. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 4:54:17 PM + */ +public class GetValueRequest implements Serializable { + + private static final long serialVersionUID = 9218253805003988802L; + + private boolean readOnlySafe = true; + + public boolean isReadOnlySafe() { + return readOnlySafe; + } + + public void setReadOnlySafe(boolean readOnlySafe) { + this.readOnlySafe = readOnlySafe; + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequestProcessor.java b/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequestProcessor.java index da54065..fd2c1a0 100644 --- a/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequestProcessor.java +++ b/src/main/java/com/dulio/demo/raft/counter/rpc/GetValueRequestProcessor.java @@ -1,57 +1,57 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.rpc; - -import com.alipay.sofa.jraft.Status; -import com.dulio.demo.raft.counter.CounterClosure; -import com.dulio.demo.raft.counter.CounterService; -import com.alipay.sofa.jraft.rpc.RpcContext; -import com.alipay.sofa.jraft.rpc.RpcProcessor; - -/** - * GetValueRequest processor. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 5:48:33 PM - */ -public class GetValueRequestProcessor implements RpcProcessor { - - private final CounterService counterService; - - public GetValueRequestProcessor(CounterService counterService) { - super(); - this.counterService = counterService; - } - - @Override - public void handleRequest(final RpcContext rpcCtx, final GetValueRequest request) { - final CounterClosure closure = new CounterClosure() { - @Override - public void run(Status status) { - rpcCtx.sendResponse(getValueResponse()); - } - }; - - this.counterService.get(request.isReadOnlySafe(), closure); - } - - @Override - public String interest() { - return GetValueRequest.class.getName(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.rpc; + +import com.alipay.sofa.jraft.Status; +import com.dulio.demo.raft.counter.CounterClosure; +import com.dulio.demo.raft.counter.CounterService; +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; + +/** + * GetValueRequest processor. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 5:48:33 PM + */ +public class GetValueRequestProcessor implements RpcProcessor { + + private final CounterService counterService; + + public GetValueRequestProcessor(CounterService counterService) { + super(); + this.counterService = counterService; + } + + @Override + public void handleRequest(final RpcContext rpcCtx, final GetValueRequest request) { + final CounterClosure closure = new CounterClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getValueResponse()); + } + }; + + this.counterService.get(request.isReadOnlySafe(), closure); + } + + @Override + public String interest() { + return GetValueRequest.class.getName(); + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequest.java b/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequest.java index a9311ec..fba6b34 100644 --- a/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequest.java +++ b/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequest.java @@ -1,41 +1,41 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.rpc; - -import java.io.Serializable; - -/** - * Increment value with delta and get the new value request. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 4:53:22 PM - */ -public class IncrementAndGetRequest implements Serializable { - - private static final long serialVersionUID = -5623664785560971849L; - - private long delta; - - public long getDelta() { - return this.delta; - } - - public void setDelta(long delta) { - this.delta = delta; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.rpc; + +import java.io.Serializable; + +/** + * Increment value with delta and get the new value request. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 4:53:22 PM + */ +public class IncrementAndGetRequest implements Serializable { + + private static final long serialVersionUID = -5623664785560971849L; + + private long delta; + + public long getDelta() { + return this.delta; + } + + public void setDelta(long delta) { + this.delta = delta; + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequestProcessor.java b/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequestProcessor.java index 209e04d..d94cb4d 100644 --- a/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequestProcessor.java +++ b/src/main/java/com/dulio/demo/raft/counter/rpc/IncrementAndGetRequestProcessor.java @@ -1,57 +1,57 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.rpc; - -import com.alipay.sofa.jraft.Status; -import com.dulio.demo.raft.counter.CounterClosure; -import com.dulio.demo.raft.counter.CounterService; -import com.alipay.sofa.jraft.rpc.RpcContext; -import com.alipay.sofa.jraft.rpc.RpcProcessor; - -/** - * IncrementAndGetRequest processor. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 5:43:57 PM - */ -public class IncrementAndGetRequestProcessor implements RpcProcessor { - - private final CounterService counterService; - - public IncrementAndGetRequestProcessor(CounterService counterService) { - super(); - this.counterService = counterService; - } - - @Override - public void handleRequest(final RpcContext rpcCtx, final IncrementAndGetRequest request) { - final CounterClosure closure = new CounterClosure() { - @Override - public void run(Status status) { - rpcCtx.sendResponse(getValueResponse()); - } - }; - - this.counterService.incrementAndGet(request.getDelta(), closure); - } - - @Override - public String interest() { - return IncrementAndGetRequest.class.getName(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.rpc; + +import com.alipay.sofa.jraft.Status; +import com.dulio.demo.raft.counter.CounterClosure; +import com.dulio.demo.raft.counter.CounterService; +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; + +/** + * IncrementAndGetRequest processor. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 5:43:57 PM + */ +public class IncrementAndGetRequestProcessor implements RpcProcessor { + + private final CounterService counterService; + + public IncrementAndGetRequestProcessor(CounterService counterService) { + super(); + this.counterService = counterService; + } + + @Override + public void handleRequest(final RpcContext rpcCtx, final IncrementAndGetRequest request) { + final CounterClosure closure = new CounterClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getValueResponse()); + } + }; + + this.counterService.incrementAndGet(request.getDelta(), closure); + } + + @Override + public String interest() { + return IncrementAndGetRequest.class.getName(); + } +} diff --git a/src/main/java/com/dulio/demo/raft/counter/rpc/ValueResponse.java b/src/main/java/com/dulio/demo/raft/counter/rpc/ValueResponse.java index 55f8c96..3991250 100644 --- a/src/main/java/com/dulio/demo/raft/counter/rpc/ValueResponse.java +++ b/src/main/java/com/dulio/demo/raft/counter/rpc/ValueResponse.java @@ -1,92 +1,92 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.rpc; - -import java.io.Serializable; - -/** - * Value response. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 4:55:35 PM - */ -public class ValueResponse implements Serializable { - - private static final long serialVersionUID = -4220017686727146773L; - - private long value; - private boolean success; - - /** - * redirect peer id - */ - private String redirect; - - private String errorMsg; - - public String getErrorMsg() { - return this.errorMsg; - } - - public void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; - } - - public String getRedirect() { - return this.redirect; - } - - public void setRedirect(String redirect) { - this.redirect = redirect; - } - - public boolean isSuccess() { - return this.success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public long getValue() { - return this.value; - } - - public void setValue(long value) { - this.value = value; - } - - public ValueResponse(long value, boolean success, String redirect, String errorMsg) { - super(); - this.value = value; - this.success = success; - this.redirect = redirect; - this.errorMsg = errorMsg; - } - - public ValueResponse() { - super(); - } - - @Override - public String toString() { - return "ValueResponse [value=" + this.value + ", success=" + this.success + ", redirect=" + this.redirect - + ", errorMsg=" + this.errorMsg + "]"; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.rpc; + +import java.io.Serializable; + +/** + * Value response. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 4:55:35 PM + */ +public class ValueResponse implements Serializable { + + private static final long serialVersionUID = -4220017686727146773L; + + private long value; + private boolean success; + + /** + * redirect peer id + */ + private String redirect; + + private String errorMsg; + + public String getErrorMsg() { + return this.errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + + public String getRedirect() { + return this.redirect; + } + + public void setRedirect(String redirect) { + this.redirect = redirect; + } + + public boolean isSuccess() { + return this.success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public long getValue() { + return this.value; + } + + public void setValue(long value) { + this.value = value; + } + + public ValueResponse(long value, boolean success, String redirect, String errorMsg) { + super(); + this.value = value; + this.success = success; + this.redirect = redirect; + this.errorMsg = errorMsg; + } + + public ValueResponse() { + super(); + } + + @Override + public String toString() { + return "ValueResponse [value=" + this.value + ", success=" + this.success + ", redirect=" + this.redirect + + ", errorMsg=" + this.errorMsg + "]"; + } + +} diff --git a/src/main/java/com/dulio/demo/raft/counter/snapshot/CounterSnapshotFile.java b/src/main/java/com/dulio/demo/raft/counter/snapshot/CounterSnapshotFile.java index 6e69af3..424da6b 100644 --- a/src/main/java/com/dulio/demo/raft/counter/snapshot/CounterSnapshotFile.java +++ b/src/main/java/com/dulio/demo/raft/counter/snapshot/CounterSnapshotFile.java @@ -1,69 +1,69 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.counter.snapshot; - -import java.io.File; -import java.io.IOException; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Counter snapshot file. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 5:14:55 PM - */ -public class CounterSnapshotFile { - - private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class); - - private String path; - - public CounterSnapshotFile(String path) { - super(); - this.path = path; - } - - public String getPath() { - return this.path; - } - - /** - * Save value to snapshot file. - */ - public boolean save(final long value) { - try { - FileUtils.writeStringToFile(new File(path), String.valueOf(value)); - return true; - } catch (IOException e) { - LOG.error("Fail to save snapshot", e); - return false; - } - } - - public long load() throws IOException { - final String s = FileUtils.readFileToString(new File(path)); - if (!StringUtils.isBlank(s)) { - return Long.parseLong(s); - } - throw new IOException("Fail to load snapshot from " + path + ",content: " + s); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.counter.snapshot; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Counter snapshot file. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 5:14:55 PM + */ +public class CounterSnapshotFile { + + private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class); + + private String path; + + public CounterSnapshotFile(String path) { + super(); + this.path = path; + } + + public String getPath() { + return this.path; + } + + /** + * Save value to snapshot file. + */ + public boolean save(final long value) { + try { + FileUtils.writeStringToFile(new File(path), String.valueOf(value)); + return true; + } catch (IOException e) { + LOG.error("Fail to save snapshot", e); + return false; + } + } + + public long load() throws IOException { + final String s = FileUtils.readFileToString(new File(path)); + if (!StringUtils.isBlank(s)) { + return Long.parseLong(s); + } + throw new IOException("Fail to load snapshot from " + path + ",content: " + s); + } +} diff --git a/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaServer.java b/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaServer.java new file mode 100644 index 0000000..8c0b4bb --- /dev/null +++ b/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaServer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.tsdb; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.dulio.demo.raft.counter.CounterService; +import com.dulio.demo.raft.counter.CounterServiceImpl; +import com.dulio.demo.raft.counter.CounterStateMachine; +import com.dulio.demo.raft.counter.rpc.GetValueRequestProcessor; +import com.dulio.demo.raft.counter.rpc.IncrementAndGetRequestProcessor; +import com.dulio.demo.raft.counter.rpc.ValueResponse; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; + +/** + * Counter server that keeps a counter value in a raft group. + * @author boyan (boyan@alibaba-inc.com) + * 2018-Apr-09 4:51:02 PM + */ +public class TsdbMetaServer { + + private RaftGroupService raftGroupService; + private Node node; + private CounterStateMachine fsm; + + public TsdbMetaServer(final String dataPath, final String groupId, final PeerId serverId, + final NodeOptions nodeOptions) throws IOException { + // 初始化路径 + FileUtils.forceMkdir(new File(dataPath)); + + // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开 + final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + // 注册业务处理器 + CounterService counterService = new CounterServiceImpl(this); + rpcServer.registerProcessor(new GetValueRequestProcessor(counterService)); + rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService)); + // 初始化状态机 + this.fsm = new CounterStateMachine(); + // 设置状态机到启动参数 + nodeOptions.setFsm(this.fsm); + // 设置存储路径 + // 日志, 必须 + nodeOptions.setLogUri(dataPath + File.separator + "log"); + // 元信息, 必须 + nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); + // snapshot, 可选, 一般都推荐 + nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); + // 初始化 raft group 服务框架 + this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); + // 启动 + this.node = this.raftGroupService.start(); + } + + public CounterStateMachine getFsm() { + return this.fsm; + } + + public Node getNode() { + return this.node; + } + + public RaftGroupService RaftGroupService() { + return this.raftGroupService; + } + + /** + * Redirect request to new leader + */ + public ValueResponse redirect() { + final ValueResponse response = new ValueResponse(); + response.setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + response.setRedirect(leader.toString()); + } + } + return response; + } + + public static void main(final String[] args) throws IOException { + if (args.length != 4) { + System.out + .println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}"); + System.out + .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); + System.exit(1); + } + final String dataPath = args[0]; + final String groupId = args[1]; + final String serverIdStr = args[2]; + final String initConfStr = args[3]; + + final NodeOptions nodeOptions = new NodeOptions(); + // 为了测试,调整 snapshot 间隔等参数 + // 设置选举超时时间为 1 秒 + nodeOptions.setElectionTimeoutMs(1000); + // 关闭 CLI 服务。 + nodeOptions.setDisableCli(false); + // 每隔30秒做一次 snapshot + nodeOptions.setSnapshotIntervalSecs(30); + // 解析参数 + final PeerId serverId = new PeerId(); + if (!serverId.parse(serverIdStr)) { + throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr); + } + final Configuration initConf = new Configuration(); + if (!initConf.parse(initConfStr)) { + throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr); + } + // 设置初始集群配置 + nodeOptions.setInitialConf(initConf); + + // 启动 + final TsdbMetaServer counterServer = new TsdbMetaServer(dataPath, groupId, serverId, nodeOptions); + System.out.println("Started counter server at port:" + + counterServer.getNode().getNodeId().getPeerId().getPort()); + } +} diff --git a/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaStateMachine.java b/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaStateMachine.java index 6e56a66..e34aa1a 100644 --- a/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaStateMachine.java +++ b/src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaStateMachine.java @@ -1,149 +1,149 @@ -package com.dulio.demo.raft.tsdb; - -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.Iterator; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.core.StateMachineAdapter; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.error.RaftException; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; -import com.alipay.sofa.jraft.util.Utils; -import com.dulio.demo.raft.counter.CounterClosure; -import com.dulio.demo.raft.counter.CounterOperation; -import com.dulio.demo.raft.tsdb.model.TsdbMetaKey; -import com.dulio.demo.raft.tsdb.model.TsdbMetaValue; -import com.dulio.demo.raft.tsdb.snapshot.TsdbMetaSnapshotFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -/** - * - */ -public class TsdbMetaStateMachine extends StateMachineAdapter { - private static final Logger LOGGER = LoggerFactory.getLogger(TsdbMetaStateMachine.class); - - /** - * TsdbMetaMap - */ - private final ConcurrentMap tsdbMetaData = new ConcurrentHashMap<>(); - /** - * Leader term - */ - private final AtomicLong leaderTerm = new AtomicLong(-1); - - public boolean isLeader() { - return this.leaderTerm.get() > 0; - } - - @Override - public void onApply(final Iterator iter) { - while (iter.hasNext()) { - long current = 0; - CounterOperation counterOperation = null; - - CounterClosure closure = null; - if (iter.done() != null) { - // This task is applied by this node, get value from closure to avoid additional parsing. - closure = (CounterClosure) iter.done(); - counterOperation = closure.getCounterOperation(); - } else { - // Have to parse FetchAddRequest from this user log. - final ByteBuffer data = iter.getData(); - try { - counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), CounterOperation.class.getName()); - } catch (final CodecException e) { - LOGGER.error("Fail to decode IncrementAndGetRequest", e); - } - } -// if (counterOperation != null) { -// switch (counterOperation.getOp()) { -// case GET: -// current = this.value.get(); -// LOGGER.info("Get value={} at logIndex={}", current, iter.getIndex()); -// break; -// case INCREMENT: -// final long delta = counterOperation.getDelta(); -// final long prev = this.value.get(); -// current = this.value.addAndGet(delta); -// LOGGER.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); -// break; -// } -// -// if (closure != null) { -// closure.success(current); -// closure.run(Status.OK()); -// } -// } - iter.next(); - } - } - - @Override - public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { - final ConcurrentMap currVal = this.tsdbMetaData; - Utils.runInThread(() -> { - final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(writer.getPath() + File.separator + "data"); - if (snapshot.save(currVal)) { - if (writer.addFile("data")) { - done.run(Status.OK()); - } else { - done.run(new Status(RaftError.EIO, "Fail to add file to writer")); - } - } else { - done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); - } - }); - } - - @Override - public void onError(final RaftException e) { - LOGGER.error("Raft error: {}", e, e); - } - - @Override - public boolean onSnapshotLoad(final SnapshotReader reader) { - if (isLeader()) { - LOGGER.warn("Leader is not supposed to load snapshot"); - return false; - } - if (reader.getFileMeta("data") == null) { - LOGGER.error("Fail to find data file in {}", reader.getPath()); - return false; - } - final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(reader.getPath() + File.separator + "data"); - try { - this.tsdbMetaData.clear(); - this.tsdbMetaData.putAll(snapshot.load()); - return true; - } catch (final IOException e) { - LOGGER.error("Fail to load snapshot from {}", snapshot.getPath()); - return false; - } - - } - - @Override - public void onLeaderStart(final long term) { - this.leaderTerm.set(term); - super.onLeaderStart(term); - - } - - @Override - public void onLeaderStop(final Status status) { - this.leaderTerm.set(-1); - super.onLeaderStop(status); - } - -} +package com.dulio.demo.raft.tsdb; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RaftException; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.alipay.sofa.jraft.util.Utils; +import com.dulio.demo.raft.counter.CounterClosure; +import com.dulio.demo.raft.counter.CounterOperation; +import com.dulio.demo.raft.tsdb.model.TsdbMetaKey; +import com.dulio.demo.raft.tsdb.model.TsdbMetaValue; +import com.dulio.demo.raft.tsdb.snapshot.TsdbMetaSnapshotFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + */ +public class TsdbMetaStateMachine extends StateMachineAdapter { + private static final Logger LOGGER = LoggerFactory.getLogger(TsdbMetaStateMachine.class); + + /** + * TsdbMetaMap + */ + private final ConcurrentMap tsdbMetaData = new ConcurrentHashMap<>(); + /** + * Leader term + */ + private final AtomicLong leaderTerm = new AtomicLong(-1); + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + @Override + public void onApply(final Iterator iter) { + while (iter.hasNext()) { + long current = 0; + CounterOperation counterOperation = null; + + CounterClosure closure = null; + if (iter.done() != null) { + // This task is applied by this node, get value from closure to avoid additional parsing. + closure = (CounterClosure) iter.done(); + counterOperation = closure.getCounterOperation(); + } else { + // Have to parse FetchAddRequest from this user log. + final ByteBuffer data = iter.getData(); + try { + counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), CounterOperation.class.getName()); + } catch (final CodecException e) { + LOGGER.error("Fail to decode IncrementAndGetRequest", e); + } + } +// if (counterOperation != null) { +// switch (counterOperation.getOp()) { +// case GET: +// current = this.value.get(); +// LOGGER.info("Get value={} at logIndex={}", current, iter.getIndex()); +// break; +// case INCREMENT: +// final long delta = counterOperation.getDelta(); +// final long prev = this.value.get(); +// current = this.value.addAndGet(delta); +// LOGGER.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); +// break; +// } +// +// if (closure != null) { +// closure.success(current); +// closure.run(Status.OK()); +// } +// } + iter.next(); + } + } + + @Override + public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { + final ConcurrentMap currVal = this.tsdbMetaData; + Utils.runInThread(() -> { + final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(writer.getPath() + File.separator + "data"); + if (snapshot.save(currVal)) { + if (writer.addFile("data")) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + } + } else { + done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); + } + }); + } + + @Override + public void onError(final RaftException e) { + LOGGER.error("Raft error: {}", e, e); + } + + @Override + public boolean onSnapshotLoad(final SnapshotReader reader) { + if (isLeader()) { + LOGGER.warn("Leader is not supposed to load snapshot"); + return false; + } + if (reader.getFileMeta("data") == null) { + LOGGER.error("Fail to find data file in {}", reader.getPath()); + return false; + } + final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(reader.getPath() + File.separator + "data"); + try { + this.tsdbMetaData.clear(); + this.tsdbMetaData.putAll(snapshot.load()); + return true; + } catch (final IOException e) { + LOGGER.error("Fail to load snapshot from {}", snapshot.getPath()); + return false; + } + + } + + @Override + public void onLeaderStart(final long term) { + this.leaderTerm.set(term); + super.onLeaderStart(term); + + } + + @Override + public void onLeaderStop(final Status status) { + this.leaderTerm.set(-1); + super.onLeaderStop(status); + } + +} diff --git a/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaKey.java b/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaKey.java index 621ec46..02767fd 100644 --- a/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaKey.java +++ b/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaKey.java @@ -1,45 +1,45 @@ -package com.dulio.demo.raft.tsdb.model; - -import java.io.Serializable; -import java.util.Date; - -/** - * @class: TsdbMetaKey - * @program: raft-demo - * @description: Tsdb meta data - * @author: huadu.shen - * @created: 2021/06/20 22:06 - */ -public class TsdbMetaKey implements Serializable { - private String databaseName; - private String measurement; - private String retentionPolicy; - private Date startDatetime; - private Date endDatetime; - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = result + databaseName.hashCode() * prime; - result = result + measurement.hashCode() * prime; - result = result + retentionPolicy.hashCode() * prime; - result = result + startDatetime.hashCode() * prime; - result = result + endDatetime.hashCode() * prime; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; - TsdbMetaKey other = (TsdbMetaKey) obj; - if (!databaseName.equals(other.databaseName)) return false; - if (!measurement.equals(other.measurement)) return false; - if (!retentionPolicy.equals(other.retentionPolicy)) return false; - if (!startDatetime.equals(other.startDatetime)) return false; - if (!endDatetime.equals(other.endDatetime)) return false; - return true; - } -} +package com.dulio.demo.raft.tsdb.model; + +import java.io.Serializable; +import java.util.Date; + +/** + * @class: TsdbMetaKey + * @program: raft-demo + * @description: Tsdb meta data + * @author: huadu.shen + * @created: 2021/06/20 22:06 + */ +public class TsdbMetaKey implements Serializable { + private String databaseName; + private String measurement; + private String retentionPolicy; + private Date startDatetime; + private Date endDatetime; + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = result + databaseName.hashCode() * prime; + result = result + measurement.hashCode() * prime; + result = result + retentionPolicy.hashCode() * prime; + result = result + startDatetime.hashCode() * prime; + result = result + endDatetime.hashCode() * prime; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + TsdbMetaKey other = (TsdbMetaKey) obj; + if (!databaseName.equals(other.databaseName)) return false; + if (!measurement.equals(other.measurement)) return false; + if (!retentionPolicy.equals(other.retentionPolicy)) return false; + if (!startDatetime.equals(other.startDatetime)) return false; + if (!endDatetime.equals(other.endDatetime)) return false; + return true; + } +} diff --git a/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaValue.java b/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaValue.java index 8ad8efb..54a137e 100644 --- a/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaValue.java +++ b/src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaValue.java @@ -1,18 +1,18 @@ -package com.dulio.demo.raft.tsdb.model; - -import java.io.Serializable; -import java.util.Date; - -/** - * @class: TsdbMetaValue - * @program: raft-demo - * @description: 该版本不做数据分片,只做数据副本 - * TODO 下一步可以支持分片 - * @author: huadu.shen - * @created: 2021/06/20 22:06 - */ -public class TsdbMetaValue implements Serializable { - private String[] quorumReplicas; - private int quorumWrite; - private int quorumRead; -} +package com.dulio.demo.raft.tsdb.model; + +import java.io.Serializable; +import java.util.Date; + +/** + * @class: TsdbMetaValue + * @program: raft-demo + * @description: 该版本不做数据分片,只做数据副本 + * TODO 下一步可以支持分片 + * @author: huadu.shen + * @created: 2021/06/20 22:06 + */ +public class TsdbMetaValue implements Serializable { + private String[] quorumReplicas; + private int quorumWrite; + private int quorumRead; +} diff --git a/src/main/java/com/dulio/demo/raft/tsdb/snapshot/TsdbMetaSnapshotFile.java b/src/main/java/com/dulio/demo/raft/tsdb/snapshot/TsdbMetaSnapshotFile.java index 80144e1..cd08001 100644 --- a/src/main/java/com/dulio/demo/raft/tsdb/snapshot/TsdbMetaSnapshotFile.java +++ b/src/main/java/com/dulio/demo/raft/tsdb/snapshot/TsdbMetaSnapshotFile.java @@ -1,73 +1,70 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dulio.demo.raft.tsdb.snapshot; - -import com.dulio.demo.raft.tsdb.model.TsdbMetaKey; -import com.dulio.demo.raft.tsdb.model.TsdbMetaValue; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -public class TsdbMetaSnapshotFile { - - private static final Logger LOG = LoggerFactory.getLogger(TsdbMetaSnapshotFile.class); - - private final String path; - - private final Gson gson = new GsonBuilder().create(); - - public TsdbMetaSnapshotFile(String path) { - super(); - this.path = path; - } - - public String getPath() { - return this.path; - } - - /** - * Save value to snapshot file. - */ - public boolean save(final Map value) { - try { - FileUtils.writeStringToFile(new File(path), gson.toJson(value)); - return true; - } catch (IOException e) { - LOG.error("Fail to save snapshot", e); - return false; - } - } - - public Map load() throws IOException { - final String s = FileUtils.readFileToString(new File(path)); - if (!StringUtils.isBlank(s)) { - return gson.fromJson(s, new TypeToken>() {}.getType()); - } - throw new IOException("Fail to load snapshot from " + path + ",content: " + s); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dulio.demo.raft.tsdb.snapshot; + +import com.dulio.demo.raft.tsdb.model.TsdbMetaKey; +import com.dulio.demo.raft.tsdb.model.TsdbMetaValue; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class TsdbMetaSnapshotFile { + + private static final Logger LOG = LoggerFactory.getLogger(TsdbMetaSnapshotFile.class); + + private final String path; + + private final Gson gson = new GsonBuilder().create(); + + public TsdbMetaSnapshotFile(String path) { + super(); + this.path = path; + } + + public String getPath() { + return this.path; + } + + /** + * Save value to snapshot file. + */ + public boolean save(final Map value) { + try { + FileUtils.writeStringToFile(new File(path), gson.toJson(value)); + return true; + } catch (IOException e) { + LOG.error("Fail to save snapshot", e); + return false; + } + } + + public Map load() throws IOException { + final String s = FileUtils.readFileToString(new File(path)); + if (!StringUtils.isBlank(s)) { + return gson.fromJson(s, new TypeToken>() {}.getType()); + } + throw new IOException("Fail to load snapshot from " + path + ",content: " + s); + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 5cf27c5..05dd4db 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -1,11 +1,11 @@ -log4j.rootLogger=debug,console,logfile -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n - -log4j.appender.logfile=org.apache.log4j.RollingFileAppender -log4j.appender.logfile.File=data/log4j.log -log4j.appender.logfile.MaxFileSize=1KB -log4j.appender.logfile.MaxBackupIndex=3 -log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.rootLogger=debug,console,logfile +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n + +log4j.appender.logfile=org.apache.log4j.RollingFileAppender +log4j.appender.logfile.File=data/log4j.log +log4j.appender.logfile.MaxFileSize=1KB +log4j.appender.logfile.MaxBackupIndex=3 +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n \ No newline at end of file diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 2cb9bf8..f89dbc3 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -1,112 +1,112 @@ - - - - - - - - - %d %p %c{1.}: %m%n - - - - - - %d %p %c{1.}: %m%n - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + %d %p %c{1.}: %m%n + + + + + + %d %p %c{1.}: %m%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +