-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Ybr8u speaker indentification #3966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces functionality for speaker identification by extracting and storing speech samples. It includes new database functions to manage speech samples, a complex background processing pipeline in pusher.py to extract samples from audio chunks, and significant refactoring in transcribe.py to improve timestamp handling for conversation segments. My review focuses on potential race conditions, performance, error handling, and a critical bug in timestamp logic. I've identified a critical race condition in the database logic that could violate sample limits, and a critical bug in timestamp handling that could lead to data corruption. I've also pointed out opportunities for performance improvement and more robust error logging.
| person_ref = db.collection('users').document(uid).collection('people').document(person_id) | ||
| person_doc = person_ref.get() | ||
|
|
||
| if not person_doc.exists: | ||
| return False | ||
|
|
||
| person_data = person_doc.to_dict() | ||
| current_samples = person_data.get('speech_samples', []) | ||
|
|
||
| # Check if we've hit the limit | ||
| if len(current_samples) >= max_samples: | ||
| return False | ||
|
|
||
| person_ref.update({ | ||
| 'speech_samples': firestore.ArrayUnion([sample_path]), | ||
| 'updated_at': datetime.now(timezone.utc), | ||
| }) | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition in this function. If two requests to add a sample arrive concurrently when the sample count is near the maximum, both could pass the len(current_samples) >= max_samples check before either one updates the database. This would result in more than max_samples being stored, violating the function's constraint. This read-check-update logic must be atomic. You can achieve this by using a Firestore transaction. You will need to import transactional from google.cloud.firestore_v1.
person_ref = db.collection('users').document(uid).collection('people').document(person_id)
@transactional
def _update_in_transaction(transaction):
person_doc = person_ref.get(transaction=transaction)
if not person_doc.exists:
return False
person_data = person_doc.to_dict()
current_samples = person_data.get('speech_samples', [])
if len(current_samples) >= max_samples:
return False
transaction.update(person_ref, {
'speech_samples': firestore.ArrayUnion([sample_path]),
'updated_at': datetime.now(timezone.utc),
})
return True
transaction = db.transaction()
return _update_in_transaction(transaction)| { | ||
| 'data': bytes(private_cloud_sync_buffer), | ||
| 'conversation_id': current_conversation_id, | ||
| 'timestamp': private_cloud_chunk_start_time or time.time(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback or time.time() for the timestamp is incorrect. It uses the time of flushing, not a time related to when the audio was recorded. This can lead to incorrect timestamps for the last audio chunk, potentially causing audio ordering issues. Based on the logic, private_cloud_chunk_start_time should never be None if private_cloud_sync_buffer has data. Using time.time() hides a potential logic error with an incorrect value. The fallback should be removed. If this state is possible, the root cause should be investigated.
| 'timestamp': private_cloud_chunk_start_time or time.time(), | |
| 'timestamp': private_cloud_chunk_start_time, |
| try: | ||
| # Check current sample count once | ||
| sample_count = users_db.get_person_speech_samples_count(uid, person_id) | ||
| if sample_count >= 5: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The maximum number of samples 5 is hardcoded. This value is also used on line 92 and is the default in users_db.add_person_speech_sample. To prevent inconsistencies and improve maintainability, this magic number should be defined as a constant at the module level (e.g., MAX_SPEAKER_SAMPLES = 5 with the other constants at the top of the file) and used in all these places.
| abs_end = started_at_ts + segment_end | ||
|
|
||
| # Find relevant chunks | ||
| sorted_chunks = sorted(chunks, key=lambda c: c['timestamp']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| except Exception as e: | ||
| print(f"Error extracting speaker samples: {e}", uid, conversation_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception and only printing it to stdout is not ideal for background tasks, as it can hide important errors and makes debugging difficult. It's better to use the logging module to log the full exception with a stack trace. This will provide better observability. This pattern of except Exception as e: print(...) is repeated in process_private_cloud_queue (lines 312, 318) and process_speaker_sample_queue (line 365) and should be improved in all locations. You would need to import logging at the top of the file.
| except Exception as e: | |
| print(f"Error extracting speaker samples: {e}", uid, conversation_id) | |
| except Exception: | |
| logging.exception(f"Error extracting speaker samples for uid={uid} conversation_id={conversation_id}") |
No description provided.