@@ -6,12 +6,12 @@ use axum::{
66 response:: { IntoResponse , Json } ,
77} ;
88use axum_auto_routes:: route;
9+ use dashmap:: DashMap ;
910use futures:: StreamExt ;
1011use mongodb:: bson:: doc;
12+ use once_cell:: sync:: Lazy ;
1113use serde:: Deserialize ;
1214use std:: sync:: Arc ;
13- use dashmap:: DashMap ;
14- use once_cell:: sync:: Lazy ;
1515use std:: time:: { Duration , Instant } ;
1616
1717#[ derive( Deserialize ) ]
@@ -20,94 +20,69 @@ pub struct GetQuestsQuery {
2020}
2121
2222// In-memory cache for endpoint results
23- static QUEST_PARTICIPATION_CACHE : Lazy < DashMap < u32 , ( Instant , Vec < serde_json:: Value > ) > > = Lazy :: new ( DashMap :: new) ;
23+ static QUEST_PARTICIPATION_CACHE : Lazy < DashMap < u32 , ( Instant , Vec < serde_json:: Value > ) > > =
24+ Lazy :: new ( DashMap :: new) ;
2425const QUEST_PARTICIPATION_CACHE_TTL : Duration = Duration :: from_secs ( 60 ) ; // 1 minute cache
2526
26- // IMPORTANT: Ensure the following indexes exist in MongoDB for optimal performance:
27- // db.tasks.createIndex({ quest_id: 1 })
28- // db.completed_tasks.createIndex({ task_id: 1, timestamp: 1 })
29- // db.quests.createIndex({ id: 1 })
30- // These indexes will significantly speed up the aggregation pipeline.
31-
3227#[ route( get, "/analytics/get_quest_participation" ) ]
3328pub async fn handler (
3429 State ( state) : State < Arc < AppState > > ,
3530 Query ( query) : Query < GetQuestsQuery > ,
3631) -> impl IntoResponse {
3732 // Check cache first
38- if let Some ( ( cached_at, cached_result) ) = QUEST_PARTICIPATION_CACHE . get ( & query. id ) . map ( |v| v. value ( ) . clone ( ) ) {
33+ if let Some ( ( cached_at, cached_result) ) = QUEST_PARTICIPATION_CACHE
34+ . get ( & query. id )
35+ . map ( |v| v. value ( ) . clone ( ) )
36+ {
3937 if cached_at. elapsed ( ) < QUEST_PARTICIPATION_CACHE_TTL {
4038 return ( StatusCode :: OK , Json ( cached_result) ) . into_response ( ) ;
4139 }
4240 }
41+
4342 let current_time = chrono:: Utc :: now ( ) . timestamp_millis ( ) ;
43+
4444 let quest_id = query. id ;
45- let day_wise_distribution = vec ! [
46- doc! {
47- "$match" : doc! {
48- "quest_id" : quest_id
49- }
50- } ,
45+ let pipeline = vec ! [
46+ doc! { "$match" : { "quest_id" : quest_id } } ,
5147 doc! {
52- "$lookup" : doc! {
48+ "$lookup" : {
5349 "from" : "quests" ,
5450 "localField" : "quest_id" ,
5551 "foreignField" : "id" ,
5652 "as" : "questDetails"
5753 }
5854 } ,
5955 doc! {
60- "$set" : doc! {
61- "expiry" : doc! {
62- "$arrayElemAt" : [
63- "$questDetails.expiry" ,
64- 0
65- ]
66- }
56+ "$set" : {
57+ "expiry" : { "$arrayElemAt" : [ "$questDetails.expiry" , 0 ] }
6758 }
6859 } ,
6960 doc! {
70- "$group" : doc! {
71- "_id" : doc! {
72- "expiry" : "$expiry"
73- } ,
74- "ids" : doc! {
75- "$push" : "$id"
76- } ,
77- "otherDetails" : doc! {
78- "$push" : "$$ROOT"
79- }
61+ "$group" : {
62+ "_id" : { "expiry" : "$expiry" } ,
63+ "ids" : { "$push" : "$id" } ,
64+ "otherDetails" : { "$push" : "$$ROOT" }
8065 }
8166 } ,
8267 doc! {
83- "$lookup" : doc! {
68+ "$lookup" : {
8469 "from" : "completed_tasks" ,
85- "let" : doc! {
70+ "let" : {
8671 "localIds" : "$ids" ,
8772 "expiry" : "$_id.expiry"
8873 } ,
8974 "pipeline" : [
90- doc! {
91- "$match" : doc! {
92- "$expr" : doc! {
75+ {
76+ "$match" : {
77+ "$expr" : {
9378 "$and" : [
94- doc! {
95- "$in" : [
96- "$task_id" ,
97- "$$localIds"
79+ { "$in" : [ "$task_id" , "$$localIds" ] } ,
80+ {
81+ "$lte" : [
82+ "$timestamp" ,
83+ { "$ifNull" : [ "$$expiry" , current_time] }
9884 ]
99- } ,
100- doc! {
101- "$lte" : [
102- "$timestamp" ,
103- doc! {
104- "$ifNull" : [
105- "$$expiry" ,
106- current_time
107- ]
108- }
109- ]
110- }
85+ }
11186 ]
11287 }
11388 }
@@ -116,95 +91,87 @@ pub async fn handler(
11691 "as" : "matching_documents"
11792 }
11893 } ,
94+ doc! { "$unwind" : "$matching_documents" } ,
11995 doc! {
120- "$unwind" : "$matching_documents"
121- } ,
122- doc! {
123- "$group" : doc! {
96+ "$group" : {
12497 "_id" : "$matching_documents.task_id" ,
125- "count" : doc! {
126- "$sum" : 1
127- } ,
128- "details" : doc! {
129- "$first" : "$otherDetails"
130- }
98+ "count" : { "$sum" : 1 } ,
99+ "details" : { "$first" : "$otherDetails" }
131100 }
132101 } ,
133102 doc! {
134- "$project" : doc! {
103+ "$project" : {
135104 "_id" : 1 ,
136105 "count" : 1 ,
137- "otherDetails" : doc! {
138- "$filter" : doc! {
106+ "otherDetails" : {
107+ "$filter" : {
139108 "input" : "$details" ,
140109 "as" : "detail" ,
141- "cond" : doc! {
142- "$eq" : [
143- "$$detail.id" ,
144- "$_id"
145- ]
146- }
110+ "cond" : { "$eq" : [ "$$detail.id" , "$_id" ] }
147111 }
148112 }
149113 }
150114 } ,
115+ doc! { "$unwind" : "$otherDetails" } ,
151116 doc! {
152- "$unwind" : "$otherDetails"
153- } ,
154- doc! {
155- "$replaceRoot" : doc! {
156- "newRoot" : doc! {
117+ "$replaceRoot" : {
118+ "newRoot" : {
157119 "$mergeObjects" : [
158120 "$matching_documents" ,
159121 "$otherDetails" ,
160- doc! {
161- "count" : "$count"
162- }
122+ { "count" : "$count" }
163123 ]
164124 }
165125 }
166126 } ,
167127 doc! {
168- "$project" : doc! {
169- "otherDetails" : 0 ,
170- "_id" : 0 ,
171- "verify_endpoint" : 0 ,
172- "verify_endpoint_type" : 0 ,
173- "verify_redirect" : 0 ,
174- "href" : 0 ,
175- "cta" : 0 ,
176- "id" : 0 ,
177- "quest_id" : 0 ,
178- "questDetails" : 0 ,
179- "expiry" : 0
128+ "$project" : {
129+ "otherDetails" : 0 ,
130+ "_id" : 0 ,
131+ "verify_endpoint" : 0 ,
132+ "verify_endpoint_type" : 0 ,
133+ "verify_redirect" : 0 ,
134+ "href" : 0 ,
135+ "cta" : 0 ,
136+ "id" : 0 ,
137+ "quest_id" : 0 ,
138+ "questDetails" : 0 ,
139+ "expiry" : 0
180140 }
181141 } ,
182142 ] ;
183143
184144 match state
185145 . db
186146 . collection :: < QuestTaskDocument > ( "tasks" )
187- . aggregate ( day_wise_distribution , None )
147+ . aggregate ( pipeline , None )
188148 . await
189149 {
190150 Ok ( mut cursor) => {
191151 let mut task_activity = Vec :: new ( ) ;
192152 while let Some ( result) = cursor. next ( ) . await {
193153 match result {
194- Ok ( document) => {
195- // Convert Document to serde_json::Value
196- let value: serde_json:: Value = match serde_json:: to_value ( & document) {
197- Ok ( val) => val,
198- Err ( _) => continue ,
199- } ;
200- task_activity. push ( value) ;
154+ Ok ( document) => match serde_json:: to_value ( & document) {
155+ Ok ( json) => task_activity. push ( json) ,
156+ Err ( e) => {
157+ state. logger . warning ( format ! (
158+ "[WARN] Quest ID {} - Skipping doc due to serialization error: {:?}" ,
159+ query. id, e
160+ ) ) ;
161+ }
162+ } ,
163+ Err ( e) => {
164+ state. logger . warning ( format ! (
165+ "[WARN] Quest ID {} - Cursor read error: {:?}" ,
166+ query. id, e
167+ ) ) ;
201168 }
202- _ => continue ,
203169 }
204170 }
205- // Store in cache
171+
206172 QUEST_PARTICIPATION_CACHE . insert ( query. id , ( Instant :: now ( ) , task_activity. clone ( ) ) ) ;
207- return ( StatusCode :: OK , Json ( task_activity) ) . into_response ( ) ;
173+
174+ ( StatusCode :: OK , Json ( task_activity) ) . into_response ( )
208175 }
209176 Err ( _) => get_error ( "Error querying tasks" . to_string ( ) ) ,
210177 }
0 commit comments