Получаем все сообщения канала на Python и Telethon

04.02.2025

Илья Замарацких

Telegram

Python

Содержание

Финальный код статьи доступен на Github! В статье используется Python 3.12.7.

Предварительные требования

В первую очередь, вам понадобится пользовательская сессия в Telethon через свое приложение Telegram. Как создать свое приложение Telegram, я писал в другой статье, не буду заострять внимание. После получения параметров вы сможете открыть произвольную сессию и через этот скрипт.

Далее, вам понадобится ID канала, откуда вы хотите получить посты. Несколько вариантов описаны ниже.

Как получить ID канала не выходя из Telegram

Самое простое - скопируйте ссылку на пост. Получите ссылку вида https://t.me/c/<ID канала>/<номер поста>. ID-канала получен!

Альтернативный способ - переслать из канала сообщение боту @ShowJsonBot. Далее, найти поле forward_origin и взять оттуда значение id. В примере ниже - это значение -1001112223333. Далее, удалите -100 в начале, то есть оставьте значение 1112223333

...
  "forward_origin": {
   "type": "channel",
   "chat": {
    "id": -1001112223333,
    "title": "TestChannel",
    "type": "channel"
   },
   "message_id": 145151243,
   "date": 1735919220
  }
...

Установка зависимостей

В руководстве я буду использовать библиотеку telethon версии 1.38.0. Обратите внимание на совместимость версий, если используете отличную.

pip install telethon==1.38.0

Скрипт

Данный скрипт обрабатывает только текстовые сообщения и посты с фото. Из-за особенностей telethon посты с фото “разбиваются” на несколько.

Полученные посты сохраняются в файл messages.json, в нем содержится:

Фото к сообщениям сохраняются в папку images. Предварительно создайте ее.

Код также доступен на Github!

from telethon import TelegramClient
from telethon.helpers import TotalList
import asyncio
import json
from telethon.types import Message, Photo, MessageMediaPhoto
from telethon.tl.types import PeerChannel
import logging

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

logging.basicConfig(level=logging.INFO)

app = TelegramClient(
    session="userbot",
    api_id=0,  # INSERT API_ID
    api_hash="YOUR_HASH",
)

CHANNEL_ID = 0  # YOUR CHANNEL
payload: list[dict] = []


async def main(client: TelegramClient):
    min_id = 0
    max_id = 100

    while True:
        logging.info("Getting messages from {} to {}".format(min_id, max_id))
        res: TotalList | None | Message = await client.get_messages(
            PeerChannel(CHANNEL_ID),
            limit=(max_id - min_id),
            max_id=max_id,
            min_id=min_id,
        )
        assert isinstance(res, TotalList)
        logging.info("Got {} messages".format(len(res)))

        if len(res) == 0:
            break

        message: Message
        for message in reversed(res):
            media: str | None = None
            date: float | None = None
            message_text = ""
            if message.date is not None:
                date = message.date.timestamp()

            if isinstance(message.message, str) and len(message.message) > 0:
                message_text += message.message

            if (
                isinstance(message.media, MessageMediaPhoto)
                and message.media.photo is not None
                and isinstance(message.media.photo, Photo)
            ):
                path = "./images/{}.jpg".format(message.id)
                await client.download_media(message.media, file=path)  # type: ignore
                media = path
            elif message.media is not None:
                pass

            payload.append(
                {"id": message.id, "text": message_text, "media": media, "date": date}
            )
        with open("messages.json", "w", encoding="utf-8") as file:
            file.write(json.dumps(payload))
        max_id += 100
        min_id += 100


if __name__ == "__main__":
    with app as client:
        loop.run_until_complete(main(client))

Объяснение работы скрипта

Запуск функции

Создаем event loop для обработки нашей асинхронной функции и устанавливаем его.

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

Выставляем уровень логгирования чтобы видеть прогресс выполнения и создаем объект клиента, через который будем работать. Вставьте свои параметры приложения Telegram, имя сессии менять опционально.

logging.basicConfig(level=logging.INFO)

app = TelegramClient(
    session="userbot",
    api_id=0,  # INSERT API_ID
    api_hash="YOUR_HASH",
)

Создаем переменную, где хранится ID канала, который мы “читаем” и переменную, куда будут записываться все сообщения

CHANNEL_ID = 0  # YOUR CHANNEL
payload: list[dict] = []

В конце скрипта добавляем открытие подключения клиентом при помощи with и запускаем нашу функцию с переданным клиентом в ранее созданном event loop’e.

if __name__ == "__main__":
    with app as client:
        loop.run_until_complete(main(client))

Функция обработки

Для соблюдения rate-лимитов и большей гибкости скрипт получает сообщения по 100 сообщений максимум (если посты не были удалены ранее, в таком случае меньше). Стандартная редакция скрипта начинает получения от постов с ID от 0 до 100 (т.е первые 100 сообщений в канале).

ID поста можно получить, скопировав ссылку на него.

24async def main(client: TelegramClient):
25 min_id = 0
26 max_id = 100
27
28 while True:
29 logging.info("Getting messages from {} to {}".format(min_id, max_id))
30 res: TotalList | None | Message = await client.get_messages(
31 PeerChannel(CHANNEL_ID),
32 limit=(max_id - min_id),
33 max_id=max_id,
34 min_id=min_id,
35 )
36 assert isinstance(res, TotalList)
37 logging.info("Got {} messages".format(len(res)))

Если список сообщений пуст, скрипт завершает свою работу. Если 100 постов были удалены подряд, это также может сказаться на их количестве и вызвать ложное завершение скрипта, обратите внимание и отредактируйте поведение скрипта по необходимости.

38if len(res) == 0:
39 break

Посты идут в обратном хронолическом порядке по умолчанию. Если у объекта сообщения есть текст и дата, они записываются в переменные для дальнейшей записи в payload.

42message: Message
43for message in reversed(res):
44 media: str | None = None
45 date: float | None = None
46 message_text = ""
47 if message.date is not None:
48 date = message.date.timestamp()
49
50 if isinstance(message.message, str) and len(message.message) > 0:
51 message_text += message.message

Далее идет обработка медиавложений, с которой все сложнее. Поле media объекта Message может быть None, а также иметь следующий тип:

TypeMessageMedia = Union[
    MessageMediaEmpty,
	MessageMediaPhoto,
	MessageMediaGeo,
	MessageMediaContact,
	MessageMediaUnsupported,
	MessageMediaDocument,
	MessageMediaWebPage,
	MessageMediaVenue,
	MessageMediaGame,
	MessageMediaInvoice,
	MessageMediaGeoLive,
	MessageMediaPoll,
	MessageMediaDice,
	MessageMediaStory,
	MessageMediaGiveaway,
	MessageMediaGiveawayResults,
	MessageMediaPaidMedia
    ]

Поэтому, если вы хотите обработать отличные от изображений вложения, требуется проверка на то, чем является поле и дальнейшая обработка класса.

Обработка изображений также усложняется из-за системы типов, поскольку тип MessageMediaPhoto имеет опциональное поле photo, содержащее в себе тип TypePhoto выглядящий следующим образом:

TypePhoto = Union[PhotoEmpty, Photo]

Таким образом, проверка ожидает что у сообщения есть медиавложение в виде фото, в котором информация о фото присутствует. В таком случае клиент скачивает медиафайл в папку images с именем в виде ID сообщения. После, вся полученная информация добавляется в массив payload.

53if (
54 isinstance(message.media, MessageMediaPhoto)
55 and message.media.photo is not None
56 and isinstance(message.media.photo, Photo)
57):
58 path = "./images/{}.jpg".format(message.id)
59 await client.download_media(message.media, file=path) # type: ignore
60 media = path
61elif message.media is not None:
62 pass
63
64payload.append(
65 {"id": message.id, "text": message_text, "media": media, "date": date}
66)

В конце, мы сдвигаем порог ID постов на 100 и записываем имеющиеся данные в файл, чтобы они не “потерялись” в случае ошибки.

67with open("messages.json", "w", encoding="utf-8") as file:
68 file.write(json.dumps(payload))
69max_id += 100
70min_id += 100

Заключение

Данный скрипт может развиваться под различные применения, поэтому я надеюсь что он полслужит отправной точкой для вашей задачи и поможет совладать с тяжелой структурой Telethon'а. Удачи!