Финальный код статьи доступен на Github! В статье используется Python 3.12.7.
¶ Предварительные требования
В первую очередь, вам понадобится пользовательская сессия в Telethon через свое приложение Telegram. Как создать свое приложение Telegram, я писал в другой статье, не буду заострять внимание. После получения параметров вы сможете открыть произвольную сессию и через этот скрипт.
Далее, вам понадобится ID канала, откуда вы хотите получить посты. Несколько вариантов описаны ниже.
Самое простое - скопируйте ссылку на пост. Получите ссылку вида https://t.me/c/<ID канала>/<номер поста>
. ID-канала получен!
Альтернативный способ - переслать из канала сообщение боту @ShowJsonBot. Далее, найти поле forward_origin
и взять оттуда значение id
. В примере ниже - это значение -1001112223333
. Далее, удалите -100
в начале, то есть оставьте значение 1112223333
...
"forward_origin": {
"type": "channel",
"chat": {
"id": -1001112223333,
"title": "TestChannel",
"type": "channel"
},
"message_id": 145151243,
"date": 1735919220
}
...
¶ Установка зависимостей
В руководстве я буду использовать библиотеку telethon
версии 1.38.0
. Обратите внимание на совместимость версий, если используете отличную.
pip install telethon==1.38.0
¶ Скрипт
Данный скрипт обрабатывает только текстовые сообщения и посты с фото. Из-за особенностей telethon посты с фото “разбиваются” на несколько.
Полученные посты сохраняются в файл messages.json
, в нем содержится:
- ID поста
- Дата публикации в UNIX-timestamp
- Текст поста
- Изображение поста (если есть)
Фото к сообщениям сохраняются в папку images
. Предварительно создайте ее.
Код также доступен на Github!
from telethon import TelegramClient
from telethon.helpers import TotalList
import asyncio
import json
from telethon.types import Message, Photo, MessageMediaPhoto
from telethon.tl.types import PeerChannel
import logging
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logging.basicConfig(level=logging.INFO)
app = TelegramClient(
session="userbot",
api_id=0, # INSERT API_ID
api_hash="YOUR_HASH",
)
CHANNEL_ID = 0 # YOUR CHANNEL
payload: list[dict] = []
async def main(client: TelegramClient):
min_id = 0
max_id = 100
while True:
logging.info("Getting messages from {} to {}".format(min_id, max_id))
res: TotalList | None | Message = await client.get_messages(
PeerChannel(CHANNEL_ID),
limit=(max_id - min_id),
max_id=max_id,
min_id=min_id,
)
assert isinstance(res, TotalList)
logging.info("Got {} messages".format(len(res)))
if len(res) == 0:
break
message: Message
for message in reversed(res):
media: str | None = None
date: float | None = None
message_text = ""
if message.date is not None:
date = message.date.timestamp()
if isinstance(message.message, str) and len(message.message) > 0:
message_text += message.message
if (
isinstance(message.media, MessageMediaPhoto)
and message.media.photo is not None
and isinstance(message.media.photo, Photo)
):
path = "./images/{}.jpg".format(message.id)
await client.download_media(message.media, file=path) # type: ignore
media = path
elif message.media is not None:
pass
payload.append(
{"id": message.id, "text": message_text, "media": media, "date": date}
)
with open("messages.json", "w", encoding="utf-8") as file:
file.write(json.dumps(payload))
max_id += 100
min_id += 100
if __name__ == "__main__":
with app as client:
loop.run_until_complete(main(client))
¶ Объяснение работы скрипта
¶ Запуск функции
Создаем event loop для обработки нашей асинхронной функции и устанавливаем его.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Выставляем уровень логгирования чтобы видеть прогресс выполнения и создаем объект клиента, через который будем работать. Вставьте свои параметры приложения Telegram, имя сессии менять опционально.
logging.basicConfig(level=logging.INFO)
app = TelegramClient(
session="userbot",
api_id=0, # INSERT API_ID
api_hash="YOUR_HASH",
)
Создаем переменную, где хранится ID канала, который мы “читаем” и переменную, куда будут записываться все сообщения
CHANNEL_ID = 0 # YOUR CHANNEL
payload: list[dict] = []
В конце скрипта добавляем открытие подключения клиентом при помощи with
и запускаем нашу функцию с переданным клиентом в ранее созданном event loop’e.
if __name__ == "__main__":
with app as client:
loop.run_until_complete(main(client))
¶ Функция обработки
Для соблюдения rate-лимитов и большей гибкости скрипт получает сообщения по 100 сообщений максимум (если посты не были удалены ранее, в таком случае меньше). Стандартная редакция скрипта начинает получения от постов с ID от 0 до 100 (т.е первые 100 сообщений в канале).
ID поста можно получить, скопировав ссылку на него.
24 async def main(client: TelegramClient):
25 min_id = 0
26 max_id = 100
27
28 while True:
29 logging.info("Getting messages from {} to {}".format(min_id, max_id))
30 res: TotalList | None | Message = await client.get_messages(
31 PeerChannel(CHANNEL_ID),
32 limit=(max_id - min_id),
33 max_id=max_id,
34 min_id=min_id,
35 )
36 assert isinstance(res, TotalList)
37 logging.info("Got {} messages".format(len(res)))
Если список сообщений пуст, скрипт завершает свою работу. Если 100 постов были удалены подряд, это также может сказаться на их количестве и вызвать ложное завершение скрипта, обратите внимание и отредактируйте поведение скрипта по необходимости.
38 if len(res) == 0:
39 break
Посты идут в обратном хронолическом порядке по умолчанию. Если у объекта сообщения есть текст и дата, они записываются в переменные для дальнейшей записи в payload
.
42 message: Message
43 for message in reversed(res):
44 media: str | None = None
45 date: float | None = None
46 message_text = ""
47 if message.date is not None:
48 date = message.date.timestamp()
49
50 if isinstance(message.message, str) and len(message.message) > 0:
51 message_text += message.message
Далее идет обработка медиавложений, с которой все сложнее. Поле media
объекта Message
может быть None
, а также иметь следующий тип:
TypeMessageMedia = Union[
MessageMediaEmpty,
MessageMediaPhoto,
MessageMediaGeo,
MessageMediaContact,
MessageMediaUnsupported,
MessageMediaDocument,
MessageMediaWebPage,
MessageMediaVenue,
MessageMediaGame,
MessageMediaInvoice,
MessageMediaGeoLive,
MessageMediaPoll,
MessageMediaDice,
MessageMediaStory,
MessageMediaGiveaway,
MessageMediaGiveawayResults,
MessageMediaPaidMedia
]
Поэтому, если вы хотите обработать отличные от изображений вложения, требуется проверка на то, чем является поле и дальнейшая обработка класса.
Обработка изображений также усложняется из-за системы типов, поскольку тип MessageMediaPhoto
имеет опциональное поле photo
, содержащее в себе тип TypePhoto
выглядящий следующим образом:
TypePhoto = Union[PhotoEmpty, Photo]
Таким образом, проверка ожидает что у сообщения есть медиавложение в виде фото, в котором информация о фото присутствует. В таком случае клиент скачивает медиафайл в папку images
с именем в виде ID сообщения. После, вся полученная информация добавляется в массив payload
.
53 if (
54 isinstance(message.media, MessageMediaPhoto)
55 and message.media.photo is not None
56 and isinstance(message.media.photo, Photo)
57 ):
58 path = "./images/{}.jpg".format(message.id)
59 await client.download_media(message.media, file=path) # type: ignore
60 media = path
61 elif message.media is not None:
62 pass
63
64 payload.append(
65 {"id": message.id, "text": message_text, "media": media, "date": date}
66 )
В конце, мы сдвигаем порог ID постов на 100 и записываем имеющиеся данные в файл, чтобы они не “потерялись” в случае ошибки.
67 with open("messages.json", "w", encoding="utf-8") as file:
68 file.write(json.dumps(payload))
69 max_id += 100
70 min_id += 100
¶ Заключение
Данный скрипт может развиваться под различные применения, поэтому я надеюсь что он полслужит отправной точкой для вашей задачи и поможет совладать с тяжелой структурой Telethon'а
. Удачи!