telegram-ai-bot/transcription_dict.py

38 lines
1.1 KiB
Python

import time
import threading
class TranscriptionDict:
def __init__(self, max_size=1000):
self.max_size = max_size
self._dict = {}
# Start a thread to clean expired items every 3 minutes
self._clean_thread = threading.Thread(target=self._clean_expired_items_periodically)
self._clean_thread.daemon = True
self._clean_thread.start()
def __len__(self):
return len(self._dict)
def get(self, key):
return self._dict[key]
def set(self, key, value):
self._dict[key] = value
if len(self._dict) > self.max_size:
self._clean_expired_items()
def __contains__(self, key):
return key in self._dict
def _clean_expired_items(self):
now = time.time()
expired_items = [k for k, v in self._dict.items() if now - v[1] >= 180] # 180 seconds = 3 minutes
for key in expired_items:
del self._dict[key]
def _clean_expired_items_periodically(self):
while True:
time.sleep(180) # Check for expired items every 3 minutes
self._clean_expired_items()