import os import io import time import telebot import requests import openai from dotenv import load_dotenv from pydub import AudioSegment import hashlib from transcription_dict import TranscriptionDict from custom_openai import CustomAudio # Load the environment variables from the .env file load_dotenv() # Load the OpenAI API key from the environment variable openai.api_key = os.environ.get("OPENAI_API_KEY") # Load the Telegram bot token from the environment variable bot = telebot.TeleBot(os.environ.get("TELEGRAM_BOT_TOKEN")) transcription_dict = TranscriptionDict() def check_user(func): def check_user_int(message): if message.from_user.id in [40196122, 210098655, 158846071]: return False return True def wrapper(message, *args, **kwargs): print(message.from_user.id, message.from_user.first_name, message.from_user.last_name, message.from_user.username, message.text, message.voice) if check_user_int(message): print("Доступ запрещен для данного пользователя.") bot.send_message( message.chat.id, "Доступ запрещен для данного пользователя.") return None else: return func(message, *args, **kwargs) return wrapper def generate_image_response(message_text): try: response = openai.Image.create( prompt=message_text, n=1, size="512x512", response_format="url" ) # Extract the generated text from the response and return it print(response) return response['data'][0]['url'] except Exception as e: print(e) return ("Что-то пошло не так c Image...", e) def generate_completion_response(message_text): # wrap to try catch try: response = openai.Completion.create( engine="text-davinci-003", prompt=message_text, max_tokens=500, n=1, stop=None, temperature=0.7, ) # Extract the generated text from the response and return it print(response) return response.choices[0].text.strip() except Exception as e: print(e) return "Что-то пошло не так c OpenAI..." def generate_chat_response(message_text): try: response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": f"You are nerd and scientific friend. Current date: {time.strftime('%d/%m/%Y %H:%M:%S')}. Use Markdown formatting answers. Use russian language for answers"}, {"role": "user", "content": message_text} ], max_tokens=2500, temperature=0.7, frequency_penalty=0, presence_penalty=0, # user="user" ) # Extract the generated text from the response and return it print(response) return (response.choices[0]['message']['content'], response.choices[0]['finish_reason'], response.usage.total_tokens) except Exception as e: print(e) return "Что-то пошло не так c OpenAI...", None, None @bot.message_handler(commands=['start']) def start(message): bot.send_message( message.chat.id, 'Привет! Я бот. Отправьте мне сообщение, а я отвечу вам, используя OpenAI GPT!') @bot.message_handler(commands=['imagine']) @check_user def handle_image_massage(message): try: bot.send_chat_action(message.chat.id, 'upload_photo') image_url = generate_image_response(message.text) bot.send_photo(message.chat.id, image_url) except Exception as e: print(e) @bot.message_handler(content_types=['voice']) @check_user def handle_voice_message(message): # Download the voice message file file_id = message.voice.file_id file_path = bot.get_file(file_id).file_path file_url = f'https://api.telegram.org/file/bot{bot.token}/{file_path}' r = requests.get(file_url) if r.status_code == 200: voice_data = io.BytesIO(r.content) ogg_audio = AudioSegment.from_ogg(voice_data) mp3_data = io.BytesIO() ogg_audio.export(mp3_data, format="mp3") mp3_data.seek(0) audio_bytes = bytearray(mp3_data.read()) transcript = CustomAudio.transcribe("whisper-1", file=audio_bytes, filename="voice_message.mp3") transcription = transcript['text'] transcription_hash = hashlib.md5(transcription.encode('utf-8')).hexdigest()[:8] transcription_dict.set(transcription_hash, transcription) markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton("Отправить текст", callback_data=f"voice_text:{transcription_hash}")) bot.reply_to(message, transcription, reply_markup=markup) else: bot.reply_to(message, "Не удалось получить голосовое сообщение.") # Define a callback function to handle the voice text button click @bot.callback_query_handler(func=lambda call: call.data.startswith("voice_text:")) def send_voice_text(call): # Extract the transcribed text from the callback data transcription_hash = call.data.split(':')[1] voice_text = transcription_dict.get(transcription_hash) answer = bot.reply_to(call.message, "Подождите, я думаю...") if voice_text: # Generate a response using the ChatGPT model (response_text, reason, cost) = generate_chat_response(voice_text) print(response_text) send_response(call.message, answer, response_text, reason, cost) else: bot.send_message(chat_id=call.message.chat.id, text="Не удалось получить текст голосового сообщения.") # Define a handler function for incoming messages @bot.message_handler(func=lambda message: True) @check_user def handle_message(message): # send typing message to user answer = bot.reply_to(message, "Подождите, я думаю...") bot.send_chat_action(message.chat.id, 'typing') # Generate a response using the ChatGPT model (response_text, reason, cost) = generate_chat_response(message.text) print(response_text) # Send the response back to the user with markdown formatting send_response(message, answer, response_text, reason, cost) def send_response(message, answer, response_text, reason, cost): try: if "stop" not in reason: ans = bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=f"Finish reason: {reason} \nCost: {cost}" ) bot.reply_to(ans, response_text, parse_mode='Markdown') else: bot.delete_message(chat_id=message.chat.id, message_id=answer.message_id) bot.reply_to(message, text=response_text, parse_mode='Markdown') # Send response text with reason and cost formated except Exception as e: print(e) bot.reply_to(message, "Что-то пошло не так c ответом. Пробую другой метод...") try: bot.reply_to(message, text=response_text) except Exception as e: print(e) bot.reply_to(message, "Что-то пошло не так c ответом...") # Start the bot and listen for incoming messages bot.polling()