Skip to content

Commit

Permalink
Merge branch 'BEDS-1120/make-healthz-quite-again' into staging-2
Browse files Browse the repository at this point in the history
  • Loading branch information
invis-bitfly committed Jan 27, 2025
2 parents 5345f13 + bfd64a5 commit 47b766b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 26 deletions.
22 changes: 5 additions & 17 deletions backend/pkg/api/data_access/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
metadata
FROM status_reports
LEFT JOIN clean_shutdown_events cse ON status_reports.emitter = clean_shutdown_events.emitter
WHERE expires_at > now() and deployment_type = {deployment_type:String} and (status_reports.inserted_at < cse.inserted_at or cse.inserted_at is null)
WHERE expires_at > now() and deployment_type = {deployment_type:String} and (status_reports.inserted_at < cse.inserted_at or cse.inserted_at is null) AND event_id != {running_event_id:String}
ORDER BY
event_id ASC,
emitter ASC,
run_id ASC,
insert_id DESC
), latest_report_per_run as (
), latest_report_per_emitter as (
SELECT
event_id,
emitter,
Expand All @@ -64,20 +64,8 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
active_reports
GROUP BY
event_id,
emitter,
run_id
emitter
order by insert_id desc
), latest_report_per_status as (
select
event_id,
emitter,
status,
any(inserted_at) as inserted_at,
any(expires_at) as expires_at,
any(timeouts_at) as timeouts_at,
any(metadata) AS metadata
from latest_report_per_run
group by event_id, emitter, status
)
SELECT
event_id,
Expand All @@ -97,7 +85,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
)
) as result
FROM
latest_report_per_status
latest_report_per_emitter
GROUP BY
event_id,
status
Expand All @@ -112,7 +100,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
response.Reports = make(map[string][]types.HealthzResult)
response.ReportingUUID = utils.GetUUID()
response.DeploymentType = utils.Config.DeploymentType
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", string(constants.Event_MonitoringCleanShutdown)))
err := db.ClickHouseReader.SelectContext(ctx, &results, query, ch.Named("deployment_type", utils.Config.DeploymentType), ch.Named("clean_shutdown_event_id", string(constants.Event_MonitoringCleanShutdown)), ch.Named("running_event_id", string(constants.Running)))
if err != nil {
response.Reports["response_error"] = []types.HealthzResult{
{
Expand Down
13 changes: 6 additions & 7 deletions backend/pkg/exporter/modules/rocketpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -191,28 +189,29 @@ func (rp *RocketpoolExporter) Run() error {

for {
t0 := time.Now()
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, time.Hour*4, rp.UpdateInterval) // currently takes 2h40m on mainnet...
r(constants.Running, nil)
// TODO: re-enable status reports after this thing is more stable
//r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, time.Hour*4, rp.UpdateInterval) // currently takes 2h40m on mainnet...
//r(constants.Running, nil)
var err error
err = rp.Update(count)
if err != nil {
log.Error(err, "error updating rocketpool-data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
//r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(errorInterval)
continue
}
err = rp.Save(count)
if err != nil {
log.Error(err, "error saving rocketpool-data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
//r(constants.Failure, map[string]string{"error": err.Error()})
time.Sleep(errorInterval)
continue
}

services.ReportStatus("rocketpoolExporter", "Running", nil)

metrics.TaskDuration.WithLabelValues("exporter_rocketpoolExporter").Observe(time.Since(t0).Seconds())
r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())})
//r(constants.Success, map[string]string{"took": time.Since(t0).String(), "took_raw": fmt.Sprintf("%v", time.Since(t0).Milliseconds())})

log.InfoWithFields(log.Fields{"duration": time.Since(t0)}, "exported rocketpool-data")
count++
Expand Down
7 changes: 5 additions & 2 deletions backend/pkg/exporter/modules/slot_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ var latestEpoch, latestSlot, finalizedEpoch, latestProposed uint64

var processSlotMutex = &sync.Mutex{}

func (d *slotExporterData) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
processSlotMutex.Lock() // only process one slot at a time
func (d *slotExporterData) OnHead(_ *constypes.StandardEventHeadResponse) (err error) {
if !processSlotMutex.TryLock() {
log.Infof("slotExporter is still running, skipping this run")
return nil
}
defer processSlotMutex.Unlock()

latestEpoch, latestSlot, finalizedEpoch, latestProposed = 0, 0, 0, 0
Expand Down

0 comments on commit 47b766b

Please sign in to comment.