Skip to content

Commit

Permalink
Merge pull request #99 from rule110-io/feature/wallet-functions
Browse files Browse the repository at this point in the history
Feature/wallet functions
  • Loading branch information
lightmyfire17 authored Sep 1, 2022
2 parents 2cf763c + 6d1c42a commit 52e02ed
Show file tree
Hide file tree
Showing 59 changed files with 2,730 additions and 1,371 deletions.
66 changes: 59 additions & 7 deletions backend/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,76 @@ package surge

import (
"log"
"strconv"

"github.com/nknorg/nkn-sdk-go"
"github.com/rule110-io/surge/backend/constants"
)

func subscribeToPubSub(topic string) {
txnHash, err := client.Subscribe("", topic, constants.SubscriptionDuration, "Surge Beta Client", nil)
var TransactionFee string

func subscribeToPubSub(topic string) bool {
config := &nkn.DefaultTransactionConfig

calculatedFee, err := CalculateFee(TransactionFee)
if err != nil {
pushError("Error on subscribe to topic", err.Error())
return false
}
config.Fee = calculatedFee

feeFloat, _ := strconv.ParseFloat(config.Fee, 64)
hasBalance, _ := ValidateBalanceForTransaction(0, feeFloat, true)
if !hasBalance {
pushError("Error on subscribe to topic", "Not enough fee in wallet, consider depositing NKN or if possible lower transaction fees in the wallet settings.")
updateTopicSubscriptionState(topic, 0)
return false
}

updateTopicSubscriptionState(topic, 1)
txnHash, err := client.Subscribe("", topic, constants.SubscriptionDuration, constants.TransactionMeta, config)
if err != nil {
log.Println("Probably already subscribed to:", topic, "error:", err)
log.Println("Subsription transaction failed for topic:", topic, "error:", err)
updateTopicSubscriptionState(topic, 1)
return false
} else {
log.Println("Subscribed: ", topic, txnHash)
log.Println("Subscribed: ", topic, txnHash, "fee paid:", config.Fee)
}
updateTopicSubscriptionState(topic, 2)
return true
}

func unsubscribeToPubSub(topic string) {
txnHash, err := client.Unsubscribe("", topic, nil)
func unsubscribeToPubSub(topic string) bool {
config := &nkn.DefaultTransactionConfig

var err error = nil
config.Fee, err = CalculateFee(TransactionFee)
if err != nil {
pushError("Error on unsubscribe to topic", err.Error())
return false
}

initialState := 0
currentState, exists := topicEncodedSubcribeStateMap[topic]
if exists {
initialState = currentState
}

feeFloat, _ := strconv.ParseFloat(config.Fee, 64)
hasBalance, _ := ValidateBalanceForTransaction(0, feeFloat, true)
if !hasBalance {
pushError("Error on unsubscribe to topic", "Not enough fee in wallet, consider depositing NKN or if possible lower transaction fees in the wallet settings.")
updateTopicSubscriptionState(topic, initialState)
return false
}
updateTopicSubscriptionState(topic, 1)

txnHash, err := client.Unsubscribe("", topic, config)
if err != nil {
log.Println("Probably not subscribed to:", topic, "error:", err)
} else {
log.Println("Unsubscribed: ", topic, txnHash)
log.Println("Unsubscribed: ", topic, txnHash, "fee paid:", config.Fee)
}
updateTopicSubscriptionState(topic, 0)
return true
}
49 changes: 24 additions & 25 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func WailsBind(ctx *context.Context) {
for !clientInitialized {
time.Sleep(time.Second)
if tryCount%10 == 0 {
pushError("Connection to NKN not yet established 0", "do you have an active internet connection?")
pushError("Connection to NKN not yet established", "do you have an active internet connection?")
}
tryCount++
}
Expand All @@ -82,6 +82,7 @@ func WailsBind(ctx *context.Context) {
go updateFileDataWorker()

FrontendReady = true
log.Println("Frontend connected")
}

//InitializeClient Initiates the surge client and instantiates connection with the NKN network
Expand All @@ -90,7 +91,7 @@ func InitializeClient(args []string) bool {

account := InitializeAccount()
client, err = nkn.NewMultiClient(account, "", getNumberClients(), false, &nkn.ClientConfig{
ConnectRetries: 1000,
ConnectRetries: 10,
SeedRPCServerAddr: GetBootstrapRPC(),
})
if err != nil {
Expand Down Expand Up @@ -130,6 +131,14 @@ func InitializeClient(args []string) bool {
}

messaging.Initialize(client, client.Account(), MessageReceived)

//Get the transaction fee setting
TransactionFee, err = DbReadSetting("defaultTxFee")
if err != nil {
DbWriteSetting("defaultTxFee", "0")
TransactionFee = "0"
}

go autoSubscribeWorker()

go platform.WatchOSXHandler()
Expand Down Expand Up @@ -170,6 +179,10 @@ func StopClient() {
//Persist our connections for future bootstraps
PersistRPC(client)

for _, v := range topicsMap {
log.Println("Disconnecting from topic", v.Name)
AnnounceDisconnect(v.Name)
}
client.Close()
}

Expand Down Expand Up @@ -201,8 +214,7 @@ func DownloadFileByHash(Hash string) bool {
numChunks := int((file.FileSize-1)/int64(constants.ChunkSize)) + 1

//When downloading from remote enter file into db
dbFile, err := dbGetFile(Hash)
log.Println(dbFile)
_, err = dbGetFile(Hash)
if err != nil {
file.Path = path
file.NumChunks = numChunks
Expand All @@ -217,7 +229,7 @@ func DownloadFileByHash(Hash string) bool {
randomChunks[i] = i
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(randomChunks), func(i, j int) { randomChunks[i], randomChunks[j] = randomChunks[j], randomChunks[i] })
//rand.Shuffle(len(randomChunks), func(i, j int) { randomChunks[i], randomChunks[j] = randomChunks[j], randomChunks[i] })

downloadChunks(file, randomChunks)

Expand All @@ -244,17 +256,11 @@ func restartDownload(Hash string) {

//Nothing more to download
if numChunks == 0 {
platform.ShowNotification("Download Finished", "Download for "+file.FileName+" finished!")
pushNotification("Download Finished", file.FileName)
file.IsDownloading = false
file.IsUploading = true
file.IsAvailable = true
dbInsertFile(*file)
return
}

rand.Seed(time.Now().UnixNano())
rand.Shuffle(numChunks, func(i, j int) { missingChunks[i], missingChunks[j] = missingChunks[j], missingChunks[i] })
//rand.Shuffle(numChunks, func(i, j int) { missingChunks[i], missingChunks[j] = missingChunks[j], missingChunks[i] })

log.Println("Restarting Download for", file.FileName)

Expand Down Expand Up @@ -292,7 +298,7 @@ func onClientConnected(session *sessionmanager.Session, isDialIn bool) {
go updateNumClientStore()
addr := session.Session.RemoteAddr().String()

fmt.Println(string("\033[36m"), "Client Connected", addr, string("\033[0m"))
log.Println("Client Connected", addr)

go listenToSession(session)
}
Expand All @@ -303,12 +309,8 @@ func onClientDisconnected(addr string) {
mutexes.ListedFilesLock.Lock()
defer mutexes.ListedFilesLock.Unlock()

//Remove this address from remote file seeders
for i := 0; i < len(ListedFiles); i++ {
fmt.Println(string("\033[31m"), "onClientDisconnected", ListedFiles[i].FileName)
}

RemoveSeeder(addr)
log.Println("Client Disconnected", addr)

//Remove empty seeders listings
for i := 0; i < len(ListedFiles); i++ {
Expand All @@ -327,12 +329,10 @@ func listenToSession(Session *sessionmanager.Session) {

addr := Session.Session.RemoteAddr().String()

fmt.Println(string("\033[31m"), "Initiate Session", addr, string("\033[0m"))
log.Println("Initiate Session", addr)

for Session.Session != nil {
data, chunkType, err := SessionRead(Session)
fmt.Println(string("\033[31m"), "Read data from session", addr, string("\033[0m"))

if err != nil {
log.Println("Session read failed, closing session error:", err)
break
Expand All @@ -343,7 +343,7 @@ func listenToSession(Session *sessionmanager.Session) {
switch chunkType {
case constants.SurgeChunkID:
//Write add to download internally after parsing data
processChunk(Session, data)
go processChunk(Session, data)
}
}
}
Expand All @@ -355,7 +355,6 @@ func processChunk(Session *sessionmanager.Session, Data []byte) {
if err := proto.Unmarshal(Data, surgeMessage); err != nil {
log.Panic("Failed to parse surge message:", err)
}
fmt.Println(string("\033[31m"), "PROCESSING CHUNK", string("\033[0m"))

//Write add to download
mutexes.BandwidthAccumulatorMapLock.Lock()
Expand All @@ -364,7 +363,7 @@ func processChunk(Session *sessionmanager.Session, Data []byte) {

//Data nill means its a request for data
if surgeMessage.Data == nil {
go TransmitChunk(Session, surgeMessage.FileID, surgeMessage.ChunkID)
TransmitChunk(Session, surgeMessage.FileID, surgeMessage.ChunkID)
} else { //If data is not nill we are receiving data

//When we receive a chunk mark it as no longer in transit
Expand All @@ -381,7 +380,7 @@ func processChunk(Session *sessionmanager.Session, Data []byte) {
}
mutexes.WorkerMapLock.Unlock()

go WriteChunk(surgeMessage.FileID, surgeMessage.ChunkID, surgeMessage.Data)
WriteChunk(surgeMessage.FileID, surgeMessage.ChunkID, surgeMessage.Data)
}
}

Expand Down
45 changes: 21 additions & 24 deletions backend/communication.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,43 @@
package surge

import (
"fmt"
"strconv"
"strings"

"github.com/rule110-io/surge/backend/constants"
"github.com/rule110-io/surge/backend/messaging"
"github.com/rule110-io/surge/backend/models"
"github.com/rule110-io/surge/backend/mutexes"
"github.com/rule110-io/surge/backend/sessionmanager"
)

const (
MessageIDAnnounceFiles = iota
MessageIDAnnounceFilesReply
MessageIDAnnounceNewFile
MessageIDAnnounceRemoveFile
MessageIDAnnounceDisconnect
)

func MessageReceived(msg *messaging.MessageReceivedObj) {
fmt.Println(string("\033[36m"), "MESSAGE RECEIVED", string(msg.Data))
fmt.Println(msg.Data)

switch msg.Type {
case MessageIDAnnounceFiles:
SendAnnounceFilesReply(msg)
processQueryResponse(msg.Sender, msg.Data)
if msg.Sender != GetAccountAddress() {
go SendAnnounceFilesReply(msg)
}
go processQueryResponse(msg.Sender, msg.Data)
case MessageIDAnnounceFilesReply:
//process file data
processQueryResponse(msg.Sender, msg.Data)
go processQueryResponse(msg.Sender, msg.Data)
case MessageIDAnnounceNewFile:
//process file data
processQueryResponse(msg.Sender, msg.Data)
go processQueryResponse(msg.Sender, msg.Data)
case MessageIDAnnounceRemoveFile:
processRemoveFile(string(msg.Data), msg.Sender)
go processRemoveFile(string(msg.Data), msg.Sender)
case MessageIDAnnounceDisconnect:
go sessionmanager.CloseSession(msg.Sender)
}

}

func AnnounceFiles(topicEncoded string) {
fmt.Println(string("\033[36m"), "ANNOUNCING FILES FOR TOPIC", topicEncoded)

payload := getTopicPayload(topicEncoded)

dataObj := messaging.MessageObj{
Expand All @@ -53,8 +50,6 @@ func AnnounceFiles(topicEncoded string) {
}

func SendAnnounceFilesReply(msg *messaging.MessageReceivedObj) {
fmt.Println(string("\033[36m"), "SENDING FILE REQUEST REPLY", msg.TopicEncoded, msg.Sender)

payload := getTopicPayload(msg.TopicEncoded)

if len(payload) > 0 {
Expand All @@ -69,8 +64,6 @@ func SendAnnounceFilesReply(msg *messaging.MessageReceivedObj) {
}

func AnnounceNewFile(file *models.File) {
fmt.Println(string("\033[36m"), "ANNOUNCE NEW FILE FOR TOPIC", file.Topic)

//Create payload
payload := surgeGenerateTopicPayload(file.FileName, file.FileSize, file.FileHash, file.Topic)

Expand All @@ -85,8 +78,6 @@ func AnnounceNewFile(file *models.File) {
}

func AnnounceRemoveFile(topic string, fileHash string) {
fmt.Println(string("\033[36m"), "ANNOUNCE REMOVE FILE FOR TOPIC ", topic, " hash: ", fileHash)

//Create the data object
dataObj := messaging.MessageObj{
Type: MessageIDAnnounceRemoveFile,
Expand All @@ -97,9 +88,17 @@ func AnnounceRemoveFile(topic string, fileHash string) {
messaging.Broadcast(&dataObj)
}

func processRemoveFile(hash string, seeder string) {
fmt.Println(string("\033[36m"), "PROCESS REMOVE FILE FOR TOPIC, hash:", hash, " seeder: ", seeder)
func AnnounceDisconnect(topic string) {
//Create the data object
dataObj := messaging.MessageObj{
Type: MessageIDAnnounceDisconnect,
TopicEncoded: TopicEncode(topic),
}

messaging.Broadcast(&dataObj)
}

func processRemoveFile(hash string, seeder string) {
RemoveFileSeeder(hash, seeder)

mutexes.ListedFilesLock.Lock()
Expand All @@ -121,8 +120,6 @@ func processQueryResponse(seeder string, Data []byte) {

//Try to parse SurgeMessage
s := string(Data)
fmt.Println(string("\033[36m"), "file query response received", seeder, string("\033[0m"))

mutexes.ListedFilesLock.Lock()

//Parse the response
Expand Down
5 changes: 4 additions & 1 deletion backend/constants/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ const (
NumWorkersMax = 12

//duration of a subscription blocktime is ~20sec
SubscriptionDuration = 180
SubscriptionDuration = 4000

//official surge wallets
TeamAddressA = "7a48870a43d1512e467e8df103b1dee8d908f297ffe1fb45e81317965597bc7c"
TeamAddressB = "44734f736b31e522e9be64a812cf42d0822c765f4bc13404d3169ff8e3d54c9e"
TeamAddressC = "68a10e26288b9e97fc97362eb935574dd3db74004f0081918ee121d15ed1d29b"

//Meta payload for onchain transactions
TransactionMeta = "Surge 2.0 Client"
)
4 changes: 2 additions & 2 deletions backend/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func dbGetAllFiles() []models.File {

return nil
}); err != nil {
log.Println(err)
log.Println("Get all db files error:", err)
} else {
return files
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func dbDeleteFile(Hash string) error {
}
return nil
}); err != nil {
log.Println(err)
log.Println("Db delete file", err)
return err
}
return nil
Expand Down
Loading

0 comments on commit 52e02ed

Please sign in to comment.