Skip to content

Commit

Permalink
feat: adding sync functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
daithihearn committed Nov 4, 2023
1 parent 7ff8912 commit b4b07a6
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 7 deletions.
13 changes: 8 additions & 5 deletions DockerfileApi → Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ RUN go mod download
# Copy the rest of the application source code
COPY . .

# Build the application
# Assuming your main.go is inside cmd/api/
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /app/bin/app ./cmd/api
# Build the api
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /app/bin/api ./cmd/api

# Building the sync job
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /app/bin/sync ./cmd/sync

# Now, start a new stage with a smaller, minimal image
FROM alpine:latest
Expand All @@ -23,8 +25,9 @@ RUN apk --no-cache add ca-certificates tzdata

# Set the working directory and copy the binary from the previous stage
WORKDIR /root/
COPY --from=builder /app/bin/app .
COPY --from=builder /app/bin/api .
COPY --from=builder /app/bin/sync .
COPY --from=builder /app/pkg/i18n/*.toml ./pkg/i18n/

# Command to run when the container starts
CMD ["./app"]
CMD ["./api"]
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ lint: fmt #@ Run the linter
golint ./...
run: test docs vet #@ Start locally
go run cmd/api/main.go
sync: test vet #@ Sync local data with API
go run cmd/sync/main.go
update: #@ Update dependencies
go mod tidy
clear-build: #@ Clear build folder
Expand All @@ -24,4 +26,4 @@ build: test docs vet clear-build copy-translations #@ Build the binary
go build -o build/api/main cmd/api/main.go
.PHONY:build
image: docs vet #@ Build docker image
docker build -f DockerfileApi -t electricity-prices-api . --load
docker build -t electricity-prices . --load
29 changes: 29 additions & 0 deletions cmd/sync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"electricity-prices/pkg/db"
"electricity-prices/pkg/service"
"github.com/joho/godotenv"
"os"
"os/signal"
"syscall"
)

func init() {
// Load .env file if it exists
_ = godotenv.Load()
}

func main() {
// Catch SIGINT or SIGTERM and gracefully close the database connection.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
db.CloseMongoConnection()
os.Exit(0)
}()

// Sync with the API.
service.SyncWithAPI()
}
75 changes: 75 additions & 0 deletions pkg/client/ree-client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package client

import (
"electricity-prices/pkg/model"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)

const urlTemplate = "https://apidatos.ree.es/en/datos/mercados/precios-mercados-tiempo-real?time_trunc=hour&start_date=%sT00:00&end_date=%sT23:59"

// GetPricesFromRee returns the prices for the given date from the REE API
func GetPricesFromRee(date time.Time) ([]model.Price, bool, error) {
// Parse date to day string
day := date.Format("2006-01-02")

// Call to endpoint
resp, err := http.Get(fmt.Sprintf(urlTemplate, day, day))
if err != nil {
log.Fatalf("Error occurred while sending request to the server: %s", err)
}
defer resp.Body.Close()

// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Error occurred while reading response body: %s", err)
}

// Check if the status code indicates success
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// Initialize an instance of PVPC
var res model.ReeResponse

// Parse the JSON response body into the PVPC struct
err := json.Unmarshal(body, &res)
if err != nil {
log.Fatalf("Error occurred while unmarshaling the response body: %s", err)
}
if len(res.Errors) > 0 {
if res.Errors[0].Detail == "There are no data for the selected filters." {
return nil, true, nil
}
return nil, false, fmt.Errorf("error returned from API: %v", res.Errors[0])
}

var included model.ReeIncluded
for _, inc := range res.Included {
if inc.ID == "600" {
included = inc
continue
}
}
if len(included.Attributes.Values) == 0 {
return nil, false, fmt.Errorf("no prices returned from API")
}

prices := make([]model.Price, len(included.Attributes.Values))

for i, p := range included.Attributes.Values {
prices[i] = model.Price{
DateTime: p.DateTime,
Price: p.Price / 1000,
}
}

return prices, false, nil
}

return nil, false, fmt.Errorf("server responded with a non-successful status code: %d", resp.StatusCode)

}
3 changes: 2 additions & 1 deletion pkg/db/conection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func GetCollection() *mongo.Collection {
log.Fatal(err)
}

collection := client.Database("electricity-prices").Collection("prices")
db := client.Database("electricity-prices")
collection := db.Collection("prices")

return collection
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/db/prices-collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,55 @@ func GetPrice(now time.Time) (model.Price, error) {

return price, err
}

func SavePrices(prices []model.Price) error {
collection := GetCollection()

var documents []interface{}
for _, price := range prices {
documents = append(documents, price)
}

client, err := GetMongoClient()
if err != nil {
log.Fatalf("Error getting mongo client: %v", err)
}

// Insert the documents
// Start a session for the transaction.
session, err := client.StartSession()
if err != nil {
log.Fatalf("Error starting session: %v", err)
}
defer session.EndSession(context.Background())

// Define the work to be done in the transaction.
txnErr := mongo.WithSession(context.Background(), session, func(sc mongo.SessionContext) error {
// Start the transaction
err := session.StartTransaction()
if err != nil {
return err
}

_, err = collection.InsertMany(context.Background(), documents)
if err != nil {
// If there's an error, abort the transaction and return the error.
session.AbortTransaction(sc)
return err
}

// If everything went well, commit the transaction.
err = session.CommitTransaction(sc)
return err
})

if txnErr != nil {
log.Fatalf("Transaction failed: %v", txnErr)
}

return nil
}

func GetPrices(start time.Time, end time.Time) ([]model.Price, error) {

collection := GetCollection()
Expand Down Expand Up @@ -116,3 +165,35 @@ func GetThirtyDayAverage(date time.Time) (float64, error) {

return 0, fmt.Errorf("no results found")
}

func GetLatestPrice() (model.Price, error) {
// Define the aggregation pipeline
pipeline := mongo.Pipeline{
{{Key: "$sort", Value: bson.M{
"dateTime": -1,
}}},
{{Key: "$limit", Value: 1}},
}

cursor, err := ExecutePipeline(pipeline)
if err != nil {
return model.Price{}, err
}

defer func(cursor *mongo.Cursor, ctx context.Context) {
err := cursor.Close(ctx)
if err != nil {
log.Fatal(err)
}
}(cursor, context.Background())

var result model.Price
if cursor.Next(context.Background()) {
if err = cursor.Decode(&result); err != nil {
return model.Price{}, err
}
return result, nil
}

return model.Price{}, fmt.Errorf("no results found")
}
29 changes: 29 additions & 0 deletions pkg/model/ree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package model

import "time"

type ReePrices struct {
Price float64 `json:"value"`
DateTime time.Time `json:"datetime"`
}
type ReeAttributes struct {
Values []ReePrices `json:"values"`
}

type ReeIncluded struct {
Type string `json:"type"`
ID string `json:"id"`
GroupID string `json:"groupId"`
Attributes ReeAttributes `json:"attributes"`
}

type ReeError struct {
Code string `json:"code"`
Status string `json:"status"`
Title string `json:"title"`
Detail string `json:"detail"`
}
type ReeResponse struct {
Included []ReeIncluded `json:"included"`
Errors []ReeError `json:"errors"`
}
55 changes: 55 additions & 0 deletions pkg/service/sync-service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package service

import (
"electricity-prices/pkg/client"
"electricity-prices/pkg/db"
"electricity-prices/pkg/model"
"electricity-prices/pkg/utils"
"log"
"time"
)

func SyncWithAPI() {
log.Println("Starting to sync with API...")

// Get last day that was synced from database.
p, err := db.GetLatestPrice()
if err != nil {
p = model.Price{DateTime: utils.StartOfDay(time.Date(2014, 3, 31, 0, 0, 0, 0, time.Local))}
}
currentDate := utils.StartOfDay(p.DateTime).AddDate(0, 0, 1)
log.Println("Last day synced: ", currentDate)

// If last day is after tomorrow then exit
today := time.Now()
tomorrow := utils.StartOfDay(today.AddDate(0, 0, 1))

// Keep processing until we reach tomorrow
for {
if currentDate.After(tomorrow) {
log.Println("Fully synced. Exiting...")
break
}

// Get the prices from the API
prices, synced, err := client.GetPricesFromRee(currentDate)

if synced {
log.Println("Fully synced. Exiting...")
break
}

if err != nil {
panic(err)
}

log.Printf("Syncing prices for %s", currentDate.Format("January 2 2006"))

// Save the prices in the database
err = db.SavePrices(prices)
if err != nil {
panic(err)
}
currentDate = currentDate.AddDate(0, 0, 1)
}
}
27 changes: 27 additions & 0 deletions pkg/utils/date-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,30 @@ func SameHour(date1 time.Time, date2 time.Time) bool {
locDate2 := date2.In(location)
return locDate1.Hour() == locDate2.Hour() && locDate1.Day() == locDate2.Day() && locDate1.Month() == locDate2.Month() && locDate1.Year() == locDate2.Year()
}

func StartOfDay(date time.Time) time.Time {
location, err := time.LoadLocation("Europe/Madrid")
if err != nil {
log.Fatal(err)
}
localisedDate := date.In(location)
return time.Date(localisedDate.Year(), localisedDate.Month(), localisedDate.Day(), 0, 0, 0, 0, localisedDate.Location())
}

// ParseDateFromEsios
// day is in format 02/01/2006
// hour is in format 00-01 to represent a 1-hour period (should be parsed to the beginning of the period)
func ParseDateFromEsios(day, hour string) time.Time {
location, err := time.LoadLocation("Europe/Madrid")
if err != nil {
log.Fatal(err)
}

// Parse the date string
date, err := time.ParseInLocation("02/01/2006 15:04", day+" "+hour[:2]+":00", location)
if err != nil {
log.Fatal(err)
}

return date
}

0 comments on commit b4b07a6

Please sign in to comment.