Skip to content

Commit

Permalink
fix::> Changed the implementation Remove to broadcast the message
Browse files Browse the repository at this point in the history
  • Loading branch information
PsychoPunkSage committed Jul 10, 2024
1 parent 660ca29 commit ee5c970
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type MessageGetFile struct {
Key string
}

type MessageDeleteFile struct {
ID string
Key string
}

type FileServerOpts struct {
ID string
EncKey []byte
Expand Down Expand Up @@ -116,6 +121,37 @@ func (s *FileServer) Get(key string) (io.Reader, error) {
}

func (s *FileServer) Remove(key string) error {
if !s.store.Has(s.ID, key) {
fmt.Printf("[%s] The file (%s) is not present in the disk.\n", s.Transport.ListenAddress(), key)
return fmt.Errorf("file is not present")
}

fmt.Printf("[%s] File (%s) found locally, deleting it...\n", s.Transport.ListenAddress(), key)

// Message to be broadcasted.
msg := Message{
Payload: MessageDeleteFile{
ID: s.ID,
Key: cryptography.HashKey(key),
},
}

// Broadcasting the message
if err := s.broadcast(&msg); err != nil {
return err
}

time.Sleep(time.Millisecond * 500)

for _, peer := range s.peers {
fmt.Printf("[%s] receiving DELETE stream from peer: [%s]", s.Transport.ListenAddress(), peer.RemoteAddr())

if err := s.store.Delete(s.ID, key); err != nil {
return err
}
}

fmt.Printf("[%s] File (%s) DELETED.\n", s.Transport.ListenAddress(), key)
return s.store.Delete(s.ID, key)
}

Expand Down Expand Up @@ -304,6 +340,10 @@ func (s *FileServer) handleMessage(from string, msg *Message) error {
case *MessageGetFile:
fmt.Println("Received MessageGetFile")
return s.handleMessageGetFile(from, t)

case *MessageDeleteFile:
fmt.Println("Received MessageDeleteFile")
return s.handleMessageDeleteFile(from, t)
}
return nil
}
Expand All @@ -320,7 +360,7 @@ func (s *FileServer) handleMessageStoreFile(from string, msg *MessageStoreFile)
return err
}

log.Printf("[%s] Written (%d) bytes to disk\n", s.Transport.ListenAddress(), n)
fmt.Printf("[%s] Written (%d) bytes to disk\n", s.Transport.ListenAddress(), n)

peer.CloseStream() // Streaming is OVER!!

Expand Down Expand Up @@ -364,7 +404,28 @@ func (s *FileServer) handleMessageGetFile(from string, msg *MessageGetFile) erro
return nil
}

func (s *FileServer) handleMessageDeleteFile(from string, msg *MessageDeleteFile) error {
fmt.Printf("Received Message: %v\n", msg)

if !s.store.Has(msg.ID, msg.Key) {
return fmt.Errorf("[%s] file (%s) not found", s.Transport.ListenAddress(), msg.Key)
}

_, ok := s.peers[from]
if !ok {
return fmt.Errorf("peer {%s} not found", from)
}

if err := s.store.Delete(msg.ID, msg.Key); err != nil {
return err
}

fmt.Printf("[%s] Deleted file (%s) from disk\n", s.Transport.ListenAddress(), msg.Key)
return nil
}

func init() {
gob.Register(&MessageStoreFile{})
gob.Register(&MessageGetFile{})
gob.Register(&MessageDeleteFile{})
}

0 comments on commit ee5c970

Please sign in to comment.