pyrogram dl link generator
public
Mar 23, 2025
Never
39
1 import os 2 import hmac 3 from fastapi import FastAPI, HTTPException, Request 4 from fastapi.responses import StreamingResponse 5 from pyrogram import Client, filters 6 from pyrogram.types import Message 7 import socket 8 import logging 9 import asyncio 10 11 logging.basicConfig( 12 filename='app.log', 13 level=logging.DEBUG, 14 format='%(asctime)s - %(levelname)s - %(message)s' 15 ) 16 logger = logging.getLogger(__name__) 17 18 app = FastAPI() 19 NAME = 'https://website.ru' 20 API_ID = 1234 21 API_HASH = 'yuuuu' 22 BOT_TOKEN = "88888888:hahahaja" 23 SECRET_KEY = "huuijvftjjj6900jjj" 24 25 26 bot = Client("my_account", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN) 27 28 def find_free_port(): 29 with socket.socket() as s: 30 s.bind(('', 0)) 31 return s.getsockname()[1] 32 33 def generate_signature(file_id: str, size: int, name: str, mime: str) -> str: 34 try: 35 message = f"{file_id}{size}{name}{mime}".encode() 36 return hmac.new(SECRET_KEY.encode(), message, "sha256").hexdigest() 37 except Exception as e: 38 logger.error(f"Err {e}", exc_info=True) 39 raise 40 41 def extract_media_info(message: Message) -> dict: 42 try: 43 media = next(( 44 msg_obj for msg_type in [ 45 'document', 'video', 'audio', 'voice', 46 'video_note', 'photo', 'sticker' 47 ] if (msg_obj := getattr(message, msg_type, None)) 48 ), None) 49 50 if not media: 51 raise ValueError("Unsupported media type") 52 53 return { 54 "file_id": media.file_id, 55 "size": media.file_size, 56 "mime": getattr(media, "mime_type", "image/jpeg"), 57 "name": getattr(media, "file_name", 58 f"file_{message.id}.{media.mime_type.split('/')[-1]}" 59 if hasattr(media, "mime_type") else f"file_{message.id}") 60 } 61 except Exception as e: 62 logger.error(f"Err {e}", exc_info=True) 63 raise 64 65 @app.get("/dl") 66 async def generate_direct_link(request: Request, chat_id: int, message_id: int): 67 async with bot: 68 try: 69 70 message = await bot.get_messages(chat_id, message_id) 71 media_info = extract_media_info(message) 72 73 signature = generate_signature( 74 media_info["file_id"], 75 media_info["size"], 76 media_info["name"], 77 media_info["mime"] 78 ) 79 80 return { 81 "url": f"{request.base_url}file?" 82 f"id={media_info['file_id']}" 83 f"&size={media_info['size']}" 84 f"&name={media_info['name']}" 85 f"&mime={media_info['mime']}" 86 f"&sig={signature}" 87 } 88 except Exception as e: 89 logger.error(f"Err: {e}", exc_info=True) 90 raise HTTPException(status_code=400, detail=str(e)) 91 92 93 @app.get("/file") 94 async def serve_file( 95 request: Request, 96 id: str, 97 size: int, 98 name: str, 99 mime: str, 100 sig: str 101 ): 102 try: 103 expected_sig = generate_signature(id, size, name, mime) 104 if not hmac.compare_digest(sig, expected_sig): 105 raise HTTPException(status_code=403, detail="Invalid signature") 106 107 async def media_stream(): 108 async with bot: 109 async for chunk in bot.stream_media(id, chunk_size=1024*1024): 110 yield chunk 111 112 return StreamingResponse(media_stream(), headers={ 113 "Content-Disposition": f'attachment; filename="{name}"', 114 "Content-Type": mime, 115 "Content-Length": str(size), 116 "Accept-Ranges": "bytes" 117 }, media_type=mime) 118 except Exception as e: 119 logger.error(f"Error serving file: {e}", exc_info=True) 120 raise HTTPException(status_code=500, detail="Internal server error") 121 122 @bot.on_message(filters.command("getlink")) 123 async def handle_getlink(client: Client, message: Message): 124 try: 125 if not message.reply_to_message: 126 await message.reply("Please reply to a media message.") 127 return 128 129 # Get actual chat ID where the message exists 130 chat_id = message.reply_to_message.chat.id 131 message_id = message.reply_to_message.id 132 133 link = f"{NAME}/dl?chat_id={chat_id}&message_id={message_id}" 134 await message.reply(f"Permanent Download Link:\n{link}") 135 except Exception as e: 136 logger.error(f"Error handling getlink command: {e}", exc_info=True) 137 await message.reply(f"Error: {str(e)}") 138 139 140 async def main(): 141 try: 142 await bot.start() 143 logger.info("Bot started successfully") 144 print(await bot.get_me()) 145 146 import uvicorn 147 config = uvicorn.Config( 148 app, 149 host="0.0.0.0", 150 port=find_free_port(), 151 log_level="info" 152 ) 153 server = uvicorn.Server(config) 154 await server.serve() 155 156 except Exception as e: 157 logger.critical(f"Critical error during startup: {e}", exc_info=True) 158 finally: 159 await bot.stop() 160 161 162 Loop = asyncio.get_event_loop() 163 Loop.run_until_complete(main())