@@ -104,6 +104,52 @@ def _run(self, report: pl.DataFrame) -> pl.DataFrame:
104104 )
105105
106106
107+ class DeduplicateResponsesProcessor (ReportProcessor ):
108+ """Deduplicate responses, keeping latest by "activity_end_time".
109+
110+ This processor removes duplicate item responses for the same
111+ user/activity/submission combination, keeping only the most recent entry.
112+ """
113+
114+ NAME = "DeduplicateResponses"
115+ PRIORITY = 9 # after datetime handling
116+ ENABLE = True
117+
118+ def _run (self , report : pl .DataFrame ) -> pl .DataFrame :
119+ """Deduplicate report by keeping latest activity_end_time."""
120+ # Define the columns that should be unique
121+ unique_cols = ["user_id" , "activity_id" , "activity_submission_id" , "item_id" ]
122+
123+ # Check which columns actually exist in the report
124+ existing_unique_cols = [col for col in unique_cols if col in report .columns ]
125+
126+ # Check for duplicates
127+ duplicate_check = report .group_by (existing_unique_cols ).agg (
128+ pl .count ().alias ("count" )
129+ )
130+ duplicates = duplicate_check .filter (pl .col ("count" ) > 1 )
131+
132+ if duplicates .height > 0 :
133+ LOG .warning (
134+ "Found %d duplicate item responses. "
135+ "Keeping the latest entry by activity_end_time." ,
136+ duplicates .height ,
137+ )
138+ LOG .debug ("Duplicate details:\n %s" , duplicates )
139+
140+ # Deduplicate by keeping the row with the latest activity_end_time
141+ report = report .sort ("activity_end_time" , descending = True ).unique (
142+ subset = existing_unique_cols , keep = "first"
143+ )
144+
145+ LOG .info (
146+ "Removed %d duplicate rows" ,
147+ duplicates ["count" ].sum () - duplicates .height ,
148+ )
149+
150+ return report
151+
152+
107153class ResponseStructProcessor (ReportProcessor ):
108154 """Convert response to struct using Lark.
109155
0 commit comments