This commit is contained in:
Anton Palgunov 2023-04-01 18:33:23 +01:00
parent f919d10c56
commit 7b99cda947
3 changed files with 157 additions and 32 deletions

2
.gitignore vendored
View File

@ -171,4 +171,4 @@ docs
# jupyter
.ipyn
.ipynb
*.ipynb

183
bot.py
View File

@ -1,7 +1,12 @@
import os
import io
import json
import time
import telebot
import requests
import openai
from dotenv import load_dotenv
from pydub import AudioSegment
# Load the environment variables from the .env file
load_dotenv()
@ -12,49 +17,167 @@ openai.api_key = os.environ.get("OPENAI_API_KEY")
# Load the Telegram bot token from the environment variable
bot = telebot.TeleBot(os.environ.get("TELEGRAM_BOT_TOKEN"))
# Define a function to generate a response using the ChatGPT model
def generate_response(message_text):
# Call the OpenAI API to generate a response
response = openai.Completion.create(
engine="text-davinci-003",
prompt=message_text,
max_tokens=1024,
n=1,
stop=None,
temperature=0.7,
)
# print all the response
# print(response)
def check_user(func):
def check_user_int(message):
if message.from_user.id in [40196122, 210098655, 158846071]:
return False
return True
# Extract the generated text from the response and return it
return response.choices[0].text.strip()
def wrapper(message, *args, **kwargs):
print(message.from_user.id, message.from_user.first_name,
message.from_user.last_name, message.from_user.username, message.text, message.voice)
if check_user_int(message):
print("Доступ запрещен для данного пользователя.")
bot.send_message(
message.chat.id, "Доступ запрещен для данного пользователя.")
return None
else:
return func(message, *args, **kwargs)
return wrapper
def generate_image_response(message_text):
try:
response = openai.Image.create(
prompt=message_text,
n=1,
size="512x512",
response_format="url"
)
# Extract the generated text from the response and return it
print(response)
return response['data'][0]['url']
except Exception as e:
print(e)
return ("Что-то пошло не так c Image...", e)
def generate_completion_response(message_text):
# wrap to try catch
try:
response = openai.Completion.create(
engine="text-davinci-003",
prompt=message_text,
max_tokens=500,
n=1,
stop=None,
temperature=0.7,
)
# Extract the generated text from the response and return it
print(response)
return response.choices[0].text.strip()
except Exception as e:
print(e)
return "Что-то пошло не так c OpenAI..."
def generate_chat_response(message_text):
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system",
"content": f"You are nerd and scientific friend. Current date: {time.strftime('%d/%m/%Y %H:%M:%S')}. Use Markdown formatting answers. Use russian language for answers"},
{"role": "user", "content": message_text}
],
max_tokens=2500,
temperature=0.7,
frequency_penalty=0,
presence_penalty=0,
# user="user"
)
# Extract the generated text from the response and return it
print(response)
return (response.choices[0]['message']['content'], response.choices[0]['finish_reason'], response.usage.total_tokens)
except Exception as e:
print(e)
return "Что-то пошло не так c OpenAI..."
@bot.message_handler(commands=['start'])
def start(message):
bot.send_message(
message.chat.id, 'Привет! Я бот. Отправьте мне сообщение, а я отвечу вам, используя OpenAI GPT!')
@bot.message_handler(commands=['imagine'])
@check_user
def handle_image_massage(message):
try:
bot.send_chat_action(message.chat.id, 'upload_photo')
image_url = generate_image_response(message.text)
bot.send_photo(message.chat.id, image_url)
except Exception as e:
print(e)
@bot.message_handler(content_types=['voice'])
@check_user
def handle_voice_message(message):
# Download the voice message file
file_id = message.voice.file_id
file_path = bot.get_file(file_id).file_path
file_url = f'https://api.telegram.org/file/bot{bot.token}/{file_path}'
r = requests.get(file_url)
if r.status_code == 200:
voice_data = io.BytesIO(r.content)
ogg_audio = AudioSegment.from_ogg(voice_data)
mp3_data = io.BytesIO()
ogg_audio.export(mp3_data, format="mp3")
mp3_data.seek(0)
audio_bytes = bytearray(mp3_data.read())
transcript = openai.Audio.transcribe("whisper-1", file=audio_bytes, filename="voice_message.mp3")
transcription = transcript['text']
# Return the transcribed text back to the user
bot.reply_to(message, transcription)
else:
bot.reply_to(message, "Не удалось получить голосовое сообщение.")
# Define a handler function for incoming messages
@bot.message_handler(func=lambda message: True)
@check_user
def handle_message(message):
print(message.from_user.id, message.from_user.first_name, message.from_user.last_name, message.from_user.username, message.text)
# check if the message comming from user id 210098655 Коля or 40196122 Я
if message.from_user.id != 210098655 and message.from_user.id != 40196122:
# Send a message to the user
bot.reply_to(message, "Я не узнаю тебя, ты кто?")
return
# send confirm message to user
# bot.reply_to(message, "Опять работать...")
# send typing message to user
answer = bot.reply_to(message, "Подождите, я думаю...")
bot.send_chat_action(message.chat.id, 'typing')
# Generate a response using the ChatGPT model
response_text = generate_response(message.text)
(response_text, reason, cost) = generate_chat_response(message.text)
print(response_text)
# Send the response back to the user
bot.reply_to(message, response_text)
# Send the response back to the user with markdown formatting
try:
ans = bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=response_text , parse_mode='Markdown')
# Send response text with reason and cost formated
bot.reply_to(ans, f"Finish reason: {reason} \nCost: {cost}")
except Exception as e:
print(e)
bot.reply_to(message, "Что-то пошло не так c ответом. Пробую другой метод...")
try:
bot.edit_message_text(chat_id=message.chat.id, message_id=answer.message_id, text=response_text)
except Exception as e:
print(e)
bot.reply_to(message, "Что-то пошло не так c ответом...")
# Start the bot and listen for incoming messages
bot.polling()

View File

@ -1,3 +1,5 @@
pyTelegramBotAPI
openai
python-dotenv
python-dotenv
requests
pydub