Skip to content

Commit

Permalink
add tsdbMetaFSM
Browse files Browse the repository at this point in the history
  • Loading branch information
dulio committed Jun 20, 2021
1 parent d8f26b4 commit 4217ae7
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
<artifactId>metrics-core</artifactId>
<version>4.0.2</version>
</dependency>
<!-- gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.7</version>
</dependency>

<!-- log -->
<dependency>
Expand Down
149 changes: 149 additions & 0 deletions src/main/java/com/dulio/demo/raft/tsdb/TsdbMetaStateMachine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.dulio.demo.raft.tsdb;

import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Utils;
import com.dulio.demo.raft.counter.CounterClosure;
import com.dulio.demo.raft.counter.CounterOperation;
import com.dulio.demo.raft.tsdb.model.TsdbMetaKey;
import com.dulio.demo.raft.tsdb.model.TsdbMetaValue;
import com.dulio.demo.raft.tsdb.snapshot.TsdbMetaSnapshotFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/**
*
*/
public class TsdbMetaStateMachine extends StateMachineAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(TsdbMetaStateMachine.class);

/**
* TsdbMetaMap
*/
private final ConcurrentMap<TsdbMetaKey, TsdbMetaValue> tsdbMetaData = new ConcurrentHashMap<>();
/**
* Leader term
*/
private final AtomicLong leaderTerm = new AtomicLong(-1);

public boolean isLeader() {
return this.leaderTerm.get() > 0;
}

@Override
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
long current = 0;
CounterOperation counterOperation = null;

CounterClosure closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional parsing.
closure = (CounterClosure) iter.done();
counterOperation = closure.getCounterOperation();
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), CounterOperation.class.getName());
} catch (final CodecException e) {
LOGGER.error("Fail to decode IncrementAndGetRequest", e);
}
}
// if (counterOperation != null) {
// switch (counterOperation.getOp()) {
// case GET:
// current = this.value.get();
// LOGGER.info("Get value={} at logIndex={}", current, iter.getIndex());
// break;
// case INCREMENT:
// final long delta = counterOperation.getDelta();
// final long prev = this.value.get();
// current = this.value.addAndGet(delta);
// LOGGER.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
// break;
// }
//
// if (closure != null) {
// closure.success(current);
// closure.run(Status.OK());
// }
// }
iter.next();
}
}

@Override
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
final ConcurrentMap<TsdbMetaKey, TsdbMetaValue> currVal = this.tsdbMetaData;
Utils.runInThread(() -> {
final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(writer.getPath() + File.separator + "data");
if (snapshot.save(currVal)) {
if (writer.addFile("data")) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
}
} else {
done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
}
});
}

@Override
public void onError(final RaftException e) {
LOGGER.error("Raft error: {}", e, e);
}

@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
if (isLeader()) {
LOGGER.warn("Leader is not supposed to load snapshot");
return false;
}
if (reader.getFileMeta("data") == null) {
LOGGER.error("Fail to find data file in {}", reader.getPath());
return false;
}
final TsdbMetaSnapshotFile snapshot = new TsdbMetaSnapshotFile(reader.getPath() + File.separator + "data");
try {
this.tsdbMetaData.clear();
this.tsdbMetaData.putAll(snapshot.load());
return true;
} catch (final IOException e) {
LOGGER.error("Fail to load snapshot from {}", snapshot.getPath());
return false;
}

}

@Override
public void onLeaderStart(final long term) {
this.leaderTerm.set(term);
super.onLeaderStart(term);

}

@Override
public void onLeaderStop(final Status status) {
this.leaderTerm.set(-1);
super.onLeaderStop(status);
}

}
45 changes: 45 additions & 0 deletions src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.dulio.demo.raft.tsdb.model;

import java.io.Serializable;
import java.util.Date;

/**
* @class: TsdbMetaKey
* @program: raft-demo
* @description: Tsdb meta data
* @author: huadu.shen
* @created: 2021/06/20 22:06
*/
public class TsdbMetaKey implements Serializable {
private String databaseName;
private String measurement;
private String retentionPolicy;
private Date startDatetime;
private Date endDatetime;

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = result + databaseName.hashCode() * prime;
result = result + measurement.hashCode() * prime;
result = result + retentionPolicy.hashCode() * prime;
result = result + startDatetime.hashCode() * prime;
result = result + endDatetime.hashCode() * prime;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
TsdbMetaKey other = (TsdbMetaKey) obj;
if (!databaseName.equals(other.databaseName)) return false;
if (!measurement.equals(other.measurement)) return false;
if (!retentionPolicy.equals(other.retentionPolicy)) return false;
if (!startDatetime.equals(other.startDatetime)) return false;
if (!endDatetime.equals(other.endDatetime)) return false;
return true;
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/dulio/demo/raft/tsdb/model/TsdbMetaValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.dulio.demo.raft.tsdb.model;

import java.io.Serializable;
import java.util.Date;

/**
* @class: TsdbMetaValue
* @program: raft-demo
* @description: 该版本不做数据分片,只做数据副本
* TODO 下一步可以支持分片
* @author: huadu.shen
* @created: 2021/06/20 22:06
*/
public class TsdbMetaValue implements Serializable {
private String[] quorumReplicas;
private int quorumWrite;
private int quorumRead;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dulio.demo.raft.tsdb.snapshot;

import com.dulio.demo.raft.tsdb.model.TsdbMetaKey;
import com.dulio.demo.raft.tsdb.model.TsdbMetaValue;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public class TsdbMetaSnapshotFile {

private static final Logger LOG = LoggerFactory.getLogger(TsdbMetaSnapshotFile.class);

private final String path;

private final Gson gson = new GsonBuilder().create();

public TsdbMetaSnapshotFile(String path) {
super();
this.path = path;
}

public String getPath() {
return this.path;
}

/**
* Save value to snapshot file.
*/
public boolean save(final Map<TsdbMetaKey, TsdbMetaValue> value) {
try {
FileUtils.writeStringToFile(new File(path), gson.toJson(value));
return true;
} catch (IOException e) {
LOG.error("Fail to save snapshot", e);
return false;
}
}

public Map<TsdbMetaKey, TsdbMetaValue> load() throws IOException {
final String s = FileUtils.readFileToString(new File(path));
if (!StringUtils.isBlank(s)) {
return gson.fromJson(s, new TypeToken<Map<TsdbMetaKey, TsdbMetaValue>>() {}.getType());
}
throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
}
}

0 comments on commit 4217ae7

Please sign in to comment.