telegram-ai-bot/bot.py
2023-04-01 22:52:04 +01:00

223 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import io
import time
import telebot
import requests
import openai
from dotenv import load_dotenv
from pydub import AudioSegment
import hashlib
from transcription_dict import TranscriptionDict
from custom_openai import CustomAudio
# Load the environment variables from the .env file
load_dotenv()
# Load the OpenAI API key from the environment variable
openai.api_key = os.environ.get("OPENAI_API_KEY")
# Load the Telegram bot token from the environment variable
bot = telebot.TeleBot(os.environ.get("TELEGRAM_BOT_TOKEN"))
transcription_dict = TranscriptionDict()
def check_user(func):
def check_user_int(message):
if message.from_user.id in [40196122, 210098655, 158846071]:
return False
return True
def wrapper(message, *args, **kwargs):
print(message.from_user.id, message.from_user.first_name,
message.from_user.last_name, message.from_user.username, message.text, message.voice)
if check_user_int(message):
print("Доступ запрещен для данного пользователя.")
bot.send_message(
message.chat.id, "Доступ запрещен для данного пользователя.")
return None
else:
return func(message, *args, **kwargs)
return wrapper
def generate_image_response(message_text):
try:
response = openai.Image.create(
prompt=message_text,
n=1,
size="512x512",
response_format="url"
)
# Extract the generated text from the response and return it
print(response)
return response['data'][0]['url']
except Exception as e:
print(e)
return ("Что-то пошло не так c Image...", e)
def generate_completion_response(message_text):
# wrap to try catch
try:
response = openai.Completion.create(
engine="text-davinci-003",
prompt=message_text,
max_tokens=500,
n=1,
stop=None,
temperature=0.7,
)
# Extract the generated text from the response and return it
print(response)
return response.choices[0].text.strip()
except Exception as e:
print(e)
return "Что-то пошло не так c OpenAI..."
def generate_chat_response(message_text):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system",
"content": f"You are nerd and scientific friend. Current date: {time.strftime('%d/%m/%Y %H:%M:%S')}. Use Markdown formatting answers. Use russian language for answers"},
{"role": "user", "content": message_text}
],
max_tokens=2500,
temperature=0.7,
frequency_penalty=0,
presence_penalty=0,
# user="user"
)
# Extract the generated text from the response and return it
print(response)
return (response.choices[0]['message']['content'], response.choices[0]['finish_reason'], response.usage.total_tokens)
except Exception as e:
print(e)
return "Что-то пошло не так c OpenAI...", None, None
@bot.message_handler(commands=['start'])
def start(message):
bot.send_message(
message.chat.id, 'Привет! Я бот. Отправьте мне сообщение, а я отвечу вам, используя OpenAI GPT!')
@bot.message_handler(commands=['imagine'])
@check_user
def handle_image_massage(message):
try:
bot.send_chat_action(message.chat.id, 'upload_photo')
image_url = generate_image_response(message.text)
bot.send_photo(message.chat.id, image_url)
except Exception as e:
print(e)
@bot.message_handler(content_types=['voice'])
@check_user
def handle_voice_message(message):
# Download the voice message file
file_id = message.voice.file_id
file_path = bot.get_file(file_id).file_path
file_url = f'https://api.telegram.org/file/bot{bot.token}/{file_path}'
r = requests.get(file_url)
if r.status_code == 200:
voice_data = io.BytesIO(r.content)
ogg_audio = AudioSegment.from_ogg(voice_data)
mp3_data = io.BytesIO()
ogg_audio.export(mp3_data, format="mp3")
mp3_data.seek(0)
audio_bytes = bytearray(mp3_data.read())
transcript = CustomAudio.transcribe("whisper-1", file=audio_bytes, filename="voice_message.mp3")
transcription = transcript['text']
transcription_hash = hashlib.md5(transcription.encode('utf-8')).hexdigest()[:8]
transcription_dict.set(transcription_hash, transcription)
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton("Отправить текст", callback_data=f"voice_text:{transcription_hash}"))
bot.reply_to(message, transcription, reply_markup=markup)
else:
bot.reply_to(message, "Не удалось получить голосовое сообщение.")
# Define a callback function to handle the voice text button click
@bot.callback_query_handler(func=lambda call: call.data.startswith("voice_text:"))
def send_voice_text(call):
# Extract the transcribed text from the callback data
transcription_hash = call.data.split(':')[1]
voice_text = transcription_dict.get(transcription_hash)
answer = bot.reply_to(call.message, "Подождите, я думаю...")
if voice_text:
# Generate a response using the ChatGPT model
(response_text, reason, cost) = generate_chat_response(voice_text)
print(response_text)
send_response(call.message, answer, response_text, reason, cost)
else:
bot.send_message(chat_id=call.message.chat.id, text="Не удалось получить текст голосового сообщения.")
# Define a handler function for incoming messages
@bot.message_handler(func=lambda message: True)
@check_user
def handle_message(message):
# send typing message to user
answer = bot.reply_to(message, "Подождите, я думаю...")
bot.send_chat_action(message.chat.id, 'typing')
# Generate a response using the ChatGPT model
(response_text, reason, cost) = generate_chat_response(message.text)
print(response_text)
# Send the response back to the user with markdown formatting
send_response(message, answer, response_text, reason, cost)
def send_response(message, answer, response_text, reason, cost):
try:
if "stop" not in reason:
ans = bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=f"Finish reason: {reason} \nCost: {cost}" )
bot.reply_to(ans, response_text, parse_mode='Markdown')
else:
bot.delete_message(chat_id=message.chat.id, message_id=answer.message_id)
bot.reply_to(message, text=response_text, parse_mode='Markdown')
# Send response text with reason and cost formated
except Exception as e:
print(e)
bot.reply_to(message, "Что-то пошло не так c ответом. Пробую другой метод...")
try:
bot.reply_to(message, text=response_text)
except Exception as e:
print(e)
bot.reply_to(message, "Что-то пошло не так c ответом...")
# Start the bot and listen for incoming messages
bot.polling()