G

pyrogram dl link generator

public
Guest Mar 23, 2025 Never 39
Clone
1
import os
2
import hmac
3
from fastapi import FastAPI, HTTPException, Request
4
from fastapi.responses import StreamingResponse
5
from pyrogram import Client, filters
6
from pyrogram.types import Message
7
import socket
8
import logging
9
import asyncio
10
11
logging.basicConfig(
12
filename='app.log',
13
level=logging.DEBUG,
14
format='%(asctime)s - %(levelname)s - %(message)s'
15
)
16
logger = logging.getLogger(__name__)
17
18
app = FastAPI()
19
NAME = 'https://website.ru'
20
API_ID = 1234
21
API_HASH = 'yuuuu'
22
BOT_TOKEN = "88888888:hahahaja"
23
SECRET_KEY = "huuijvftjjj6900jjj"
24
25
26
bot = Client("my_account", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)
27
28
def find_free_port():
29
with socket.socket() as s:
30
s.bind(('', 0))
31
return s.getsockname()[1]
32
33
def generate_signature(file_id: str, size: int, name: str, mime: str) -> str:
34
try:
35
message = f"{file_id}{size}{name}{mime}".encode()
36
return hmac.new(SECRET_KEY.encode(), message, "sha256").hexdigest()
37
except Exception as e:
38
logger.error(f"Err {e}", exc_info=True)
39
raise
40
41
def extract_media_info(message: Message) -> dict:
42
try:
43
media = next((
44
msg_obj for msg_type in [
45
'document', 'video', 'audio', 'voice',
46
'video_note', 'photo', 'sticker'
47
] if (msg_obj := getattr(message, msg_type, None))
48
), None)
49
50
if not media:
51
raise ValueError("Unsupported media type")
52
53
return {
54
"file_id": media.file_id,
55
"size": media.file_size,
56
"mime": getattr(media, "mime_type", "image/jpeg"),
57
"name": getattr(media, "file_name",
58
f"file_{message.id}.{media.mime_type.split('/')[-1]}"
59
if hasattr(media, "mime_type") else f"file_{message.id}")
60
}
61
except Exception as e:
62
logger.error(f"Err {e}", exc_info=True)
63
raise
64
65
@app.get("/dl")
66
async def generate_direct_link(request: Request, chat_id: int, message_id: int):
67
async with bot:
68
try:
69
70
message = await bot.get_messages(chat_id, message_id)
71
media_info = extract_media_info(message)
72
73
signature = generate_signature(
74
media_info["file_id"],
75
media_info["size"],
76
media_info["name"],
77
media_info["mime"]
78
)
79
80
return {
81
"url": f"{request.base_url}file?"
82
f"id={media_info['file_id']}"
83
f"&size={media_info['size']}"
84
f"&name={media_info['name']}"
85
f"&mime={media_info['mime']}"
86
f"&sig={signature}"
87
}
88
except Exception as e:
89
logger.error(f"Err: {e}", exc_info=True)
90
raise HTTPException(status_code=400, detail=str(e))
91
92
93
@app.get("/file")
94
async def serve_file(
95
request: Request,
96
id: str,
97
size: int,
98
name: str,
99
mime: str,
100
sig: str
101
):
102
try:
103
expected_sig = generate_signature(id, size, name, mime)
104
if not hmac.compare_digest(sig, expected_sig):
105
raise HTTPException(status_code=403, detail="Invalid signature")
106
107
async def media_stream():
108
async with bot:
109
async for chunk in bot.stream_media(id, chunk_size=1024*1024):
110
yield chunk
111
112
return StreamingResponse(media_stream(), headers={
113
"Content-Disposition": f'attachment; filename="{name}"',
114
"Content-Type": mime,
115
"Content-Length": str(size),
116
"Accept-Ranges": "bytes"
117
}, media_type=mime)
118
except Exception as e:
119
logger.error(f"Error serving file: {e}", exc_info=True)
120
raise HTTPException(status_code=500, detail="Internal server error")
121
122
@bot.on_message(filters.command("getlink"))
123
async def handle_getlink(client: Client, message: Message):
124
try:
125
if not message.reply_to_message:
126
await message.reply("Please reply to a media message.")
127
return
128
129
# Get actual chat ID where the message exists
130
chat_id = message.reply_to_message.chat.id
131
message_id = message.reply_to_message.id
132
133
link = f"{NAME}/dl?chat_id={chat_id}&message_id={message_id}"
134
await message.reply(f"Permanent Download Link:\n{link}")
135
except Exception as e:
136
logger.error(f"Error handling getlink command: {e}", exc_info=True)
137
await message.reply(f"Error: {str(e)}")
138
139
140
async def main():
141
try:
142
await bot.start()
143
logger.info("Bot started successfully")
144
print(await bot.get_me())
145
146
import uvicorn
147
config = uvicorn.Config(
148
app,
149
host="0.0.0.0",
150
port=find_free_port(),
151
log_level="info"
152
)
153
server = uvicorn.Server(config)
154
await server.serve()
155
156
except Exception as e:
157
logger.critical(f"Critical error during startup: {e}", exc_info=True)
158
finally:
159
await bot.stop()
160
161
162
Loop = asyncio.get_event_loop()
163
Loop.run_until_complete(main())