225 lines
7.8 KiB
Python
225 lines
7.8 KiB
Python
import os
|
||
import io
|
||
import json
|
||
import time
|
||
import telebot
|
||
import requests
|
||
import openai
|
||
from dotenv import load_dotenv
|
||
from pydub import AudioSegment
|
||
import hashlib
|
||
import threading
|
||
from datetime import datetime, timedelta
|
||
from transcription_dict import TranscriptionDict
|
||
from custom_openai import CustomAudio
|
||
|
||
# Load the environment variables from the .env file
|
||
load_dotenv()
|
||
|
||
# Load the OpenAI API key from the environment variable
|
||
openai.api_key = os.environ.get("OPENAI_API_KEY")
|
||
|
||
# Load the Telegram bot token from the environment variable
|
||
bot = telebot.TeleBot(os.environ.get("TELEGRAM_BOT_TOKEN"))
|
||
|
||
transcription_dict = TranscriptionDict()
|
||
|
||
def check_user(func):
|
||
def check_user_int(message):
|
||
if message.from_user.id in [40196122, 210098655, 158846071]:
|
||
return False
|
||
return True
|
||
|
||
def wrapper(message, *args, **kwargs):
|
||
print(message.from_user.id, message.from_user.first_name,
|
||
message.from_user.last_name, message.from_user.username, message.text, message.voice)
|
||
|
||
if check_user_int(message):
|
||
print("Доступ запрещен для данного пользователя.")
|
||
bot.send_message(
|
||
message.chat.id, "Доступ запрещен для данного пользователя.")
|
||
return None
|
||
else:
|
||
return func(message, *args, **kwargs)
|
||
|
||
return wrapper
|
||
|
||
|
||
def generate_image_response(message_text):
|
||
try:
|
||
response = openai.Image.create(
|
||
prompt=message_text,
|
||
n=1,
|
||
size="512x512",
|
||
response_format="url"
|
||
)
|
||
# Extract the generated text from the response and return it
|
||
print(response)
|
||
|
||
return response['data'][0]['url']
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
return ("Что-то пошло не так c Image...", e)
|
||
|
||
|
||
def generate_completion_response(message_text):
|
||
# wrap to try catch
|
||
try:
|
||
response = openai.Completion.create(
|
||
engine="text-davinci-003",
|
||
prompt=message_text,
|
||
max_tokens=500,
|
||
n=1,
|
||
stop=None,
|
||
temperature=0.7,
|
||
)
|
||
# Extract the generated text from the response and return it
|
||
print(response)
|
||
|
||
return response.choices[0].text.strip()
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
return "Что-то пошло не так c OpenAI..."
|
||
|
||
|
||
def generate_chat_response(message_text):
|
||
|
||
try:
|
||
response = openai.ChatCompletion.create(
|
||
model="gpt-3.5-turbo",
|
||
messages=[
|
||
{"role": "system",
|
||
"content": f"You are nerd and scientific friend. Current date: {time.strftime('%d/%m/%Y %H:%M:%S')}. Use Markdown formatting answers. Use russian language for answers"},
|
||
{"role": "user", "content": message_text}
|
||
],
|
||
max_tokens=2500,
|
||
temperature=0.7,
|
||
frequency_penalty=0,
|
||
presence_penalty=0,
|
||
# user="user"
|
||
)
|
||
# Extract the generated text from the response and return it
|
||
print(response)
|
||
|
||
return (response.choices[0]['message']['content'], response.choices[0]['finish_reason'], response.usage.total_tokens)
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
return "Что-то пошло не так c OpenAI...", None, None
|
||
|
||
|
||
@bot.message_handler(commands=['start'])
|
||
def start(message):
|
||
bot.send_message(
|
||
message.chat.id, 'Привет! Я бот. Отправьте мне сообщение, а я отвечу вам, используя OpenAI GPT!')
|
||
|
||
|
||
@bot.message_handler(commands=['imagine'])
|
||
@check_user
|
||
def handle_image_massage(message):
|
||
try:
|
||
bot.send_chat_action(message.chat.id, 'upload_photo')
|
||
|
||
image_url = generate_image_response(message.text)
|
||
|
||
bot.send_photo(message.chat.id, image_url)
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
@bot.message_handler(content_types=['voice'])
|
||
@check_user
|
||
def handle_voice_message(message):
|
||
# Download the voice message file
|
||
file_id = message.voice.file_id
|
||
file_path = bot.get_file(file_id).file_path
|
||
file_url = f'https://api.telegram.org/file/bot{bot.token}/{file_path}'
|
||
r = requests.get(file_url)
|
||
|
||
if r.status_code == 200:
|
||
voice_data = io.BytesIO(r.content)
|
||
|
||
ogg_audio = AudioSegment.from_ogg(voice_data)
|
||
mp3_data = io.BytesIO()
|
||
ogg_audio.export(mp3_data, format="mp3")
|
||
mp3_data.seek(0)
|
||
|
||
audio_bytes = bytearray(mp3_data.read())
|
||
|
||
transcript = CustomAudio.transcribe("whisper-1", file=audio_bytes, filename="voice_message.mp3")
|
||
|
||
transcription = transcript['text']
|
||
|
||
transcription_hash = hashlib.md5(transcription.encode('utf-8')).hexdigest()[:8]
|
||
transcription_dict.set(transcription_hash, transcription)
|
||
|
||
markup = telebot.types.InlineKeyboardMarkup()
|
||
markup.add(telebot.types.InlineKeyboardButton("Отправить текст", callback_data=f"voice_text:{transcription_hash}"))
|
||
bot.reply_to(message, transcription, reply_markup=markup)
|
||
else:
|
||
bot.reply_to(message, "Не удалось получить голосовое сообщение.")
|
||
|
||
# Define a callback function to handle the voice text button click
|
||
@bot.callback_query_handler(func=lambda call: call.data.startswith("voice_text:"))
|
||
def send_voice_text(call):
|
||
# Extract the transcribed text from the callback data
|
||
transcription_hash = call.data.split(':')[1]
|
||
voice_text = transcription_dict.get(transcription_hash)
|
||
|
||
if voice_text:
|
||
# Generate a response using the ChatGPT model
|
||
(response_text, reason, cost) = generate_chat_response(voice_text)
|
||
|
||
print(response_text)
|
||
|
||
# Send the response back to the user with markdown formatting
|
||
try:
|
||
ans = bot.send_message(chat_id=call.message.chat.id, text=response_text , parse_mode='Markdown')
|
||
bot.reply_to(ans, f"Finish reason: {reason} \nCost: {cost}")
|
||
except Exception as e:
|
||
print(e)
|
||
bot.send_message(chat_id=call.message.chat.id, text="Что-то пошло не так c ответом. Пробую другой метод...")
|
||
|
||
try:
|
||
bot.send_message(chat_id=call.message.chat.id, text=response_text)
|
||
except Exception as e:
|
||
print(e)
|
||
bot.send_message(chat_id=call.message.chat.id, text="Что-то пошло не так c ответом")
|
||
|
||
else:
|
||
bot.send_message(chat_id=call.message.chat.id, text="Не удалось получить текст голосового сообщения.")
|
||
|
||
# Define a handler function for incoming messages
|
||
@bot.message_handler(func=lambda message: True)
|
||
@check_user
|
||
def handle_message(message):
|
||
# send typing message to user
|
||
answer = bot.reply_to(message, "Подождите, я думаю...")
|
||
bot.send_chat_action(message.chat.id, 'typing')
|
||
|
||
# Generate a response using the ChatGPT model
|
||
(response_text, reason, cost) = generate_chat_response(message.text)
|
||
|
||
print(response_text)
|
||
|
||
# Send the response back to the user with markdown formatting
|
||
try:
|
||
ans = bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=response_text , parse_mode='Markdown')
|
||
# Send response text with reason and cost formated
|
||
bot.reply_to(ans, f"Finish reason: {reason} \nCost: {cost}")
|
||
except Exception as e:
|
||
print(e)
|
||
bot.reply_to(message, "Что-то пошло не так c ответом. Пробую другой метод...")
|
||
|
||
try:
|
||
bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=response_text)
|
||
except Exception as e:
|
||
print(e)
|
||
bot.reply_to(message, "Что-то пошло не так c ответом...")
|
||
|
||
|
||
|
||
# Start the bot and listen for incoming messages
|
||
bot.polling()
|