import asyncio import logging import slixmpp import requests import json import datetime import time import configparser import os def json_to_key_value_string(json_data): # Parse JSON data data = json.loads(json_data) # Initialize an empty list to store key-value pairs result_lines = [] # Recursively process the JSON data def process_item(item, prefix=''): if isinstance(item, dict): for key, value in item.items(): if isinstance(value, (dict, list)): process_item(value, f"{prefix}{key}.") else: result_lines.append(f"{prefix}{key}: {value}") elif isinstance(item, list): for i, value in enumerate(item): process_item(value, f"{prefix}[{i}].") process_item(data) return "\n".join(result_lines) class QBittorrentAPICaller(): def __init__(self, url, username, password): self.url = self.clean_url(url) self.username = username self.password = password self.session = requests.Session() self.login() def __del__(self): self.logout() def clean_url(self, url): to_ret = url # Prepend protocol (https by default) if not to_ret.startswith(('http://', 'https://')): to_ret = 'https://' + to_ret # Remove trailing / if url.endswith('/'): to_ret = to_ret[:-1] return to_ret def truncate_string(self, s, max_length=25): return s[:max_length] + '...' if len(s) > max_length else s def login(self): the_url = self.url + "/" + "api/v2/auth/login" data={"username":self.username, "password":self.password} resp = self.session.post(the_url, data=data) def logout(self): the_url = self.url + "/" + "api/v2/auth/logout" resp = self.session.post(the_url) def version(self): the_url = self.url + "/" + "api/v2/app/version" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def webapiVersion(self): the_url = self.url + "/" + "api/v2/app/webapiVersion" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def buildInfo(self): the_url = self.url + "/" + "api/v2/app/buildInfo" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return json_to_key_value_string(resp.text) def torrentList(self, modifier=""): the_url = self.url + "/" + "api/v2/torrents/info" to_ret = "```\n" for f in ["downloading", "completed"]: resp = self.session.post(the_url, data={"filter":f}) if resp.status_code != 200: return "Got status" + str(resp.status_code) parsed = json.loads(resp.text) to_ret += f.upper() + "\n" if f == "downloading": for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n" else: for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n" to_ret += "--\n" to_ret += "```" return to_ret def add(self, url, username): if not url.startswith("magnet:"): return "Please supply a magnet link (begins with \"magnet:\")" the_url = self.url + "/" + "api/v2/torrents/add" resp = self.session.post(the_url, data={"urls":url,"category":username}) if (resp.status_code == 415): return "Torrent file not valid" if (resp.status_code == 200): # Get the hash magnet_hash = url.split(":")[3].split("&")[0].lower() # Make request to torrentlist with particular hash the_url2 = self.url + "/" + "api/v2/torrents/info" time.sleep(5) resp2 = self.session.post(the_url2) if (resp2.status_code != 200): return "Could not verify if torrent was added" parsed = json.loads(resp2.text) for p in parsed: if p["hash"] == magnet_hash: if p["num_seeds"] == 0: return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds" else: return "Successfully added " + p["name"] return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")" class QBBot(slixmpp.ClientXMPP): def __init__(self, jid, password, room, nick, api_url, api_username, api_password): slixmpp.ClientXMPP.__init__(self, jid, password) self.room = room self.nick = nick self.api_caller = QBittorrentAPICaller(api_url, api_username, api_password) self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) # The groupchat_message event is triggered whenever a message # stanza is received from any chat room. If you also also # register a handler for the 'message' event, MUC messages # will be processed by both handlers. self.add_event_handler("groupchat_message", self.muc_message) # If you wanted more functionality, here's how to register plugins: # self.register_plugin('xep_0030') # Service Discovery # self.register_plugin('xep_0199') # XMPP Ping # Here's how to access plugins once you've registered them: # self['xep_0030'].add_feature('echo_demo') async def session_start(self, event): await self.get_roster() self.send_presence() self.plugin['xep_0045'].join_muc(self.room, self.nick) # Most get_*/set_* methods from plugins use Iq stanzas, which # are sent asynchronously. You can almost always provide a # callback that will be executed when the reply is received. def message(self, msg): if msg['type'] in ('chat', 'normal'): msg.reply("I was not designed to take direct messages, please use me in a MUC").send() def muc_message(self, msg): """ Process incoming message stanzas from any chat room. Be aware that if you also have any handlers for the 'message' event, message stanzas may be processed by both handlers, so check the 'type' attribute when using a 'message' event handler. Whenever the bot's nickname is mentioned, respond to the message. IMPORTANT: Always check that a message is not from yourself, otherwise you will create an infinite loop responding to your own messages. This handler will reply to messages that mention the bot's nickname. Arguments: msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see how it may be used. """ #if msg['mucnick'] != self.nick and self.nick in msg['body']: if msg['mucnick'] != self.nick and msg['body'].startswith(self.nick): message = "" tokens = msg['body'].split() match tokens[1]: case "info": #message += "URL: " + self.api_caller.url + "\n" message += "QBittorrent Version: " + self.api_caller.version() + "\n" message += "Web API Version: " + self.api_caller.webapiVersion() + "\n" message += self.api_caller.buildInfo() # don't know why, but this behavior is reversed. Too lazy to figure it out case "fulllist": message += self.api_caller.torrentList("") case "list": message += self.api_caller.torrentList("full") case "add": message += self.api_caller.add(tokens[2], msg['mucnick']) case "help"|_: message += "Commands\n" message += "info: Displays information about QBittorrent server" + "\n" message += "help: Displays this help" + "\n" message += "list: Lists torrents, downloading torrents' names truncated to 25 characters\n" message += "fulllist: Lists torrents, no name truncation\n" message += "add [MAGNET_URL]: Adds torrent corresponding to MAGNET_URL to the download list. Note that this will take about 5 seconds, as there's a check for 0 seeds after 5 seconds as a warning" self.send_message(mto=msg['from'].bare, mbody=message, mtype='groupchat') if __name__ == '__main__': # Ideally use optparse or argparse to get JID, # password, and log level. logging.basicConfig(level=logging.ERROR, format='%(levelname)-8s %(message)s') config = configparser.ConfigParser() if not os.path.exists("./config.conf"): logging.error("Could not find ./config.conf, please ensure it exists") exit config.read('config.conf') config_default = config['DEFAULT'] xmpp = QBBot(config_default['qb_jid'].strip('\"'), config_default['qb_pass'].strip('\"'), config_default['qb_muc'].strip('\"'), config_default['qb_nick'].strip('\"'), config_default['api_url'].strip('\"'), config_default['api_user'].strip('\"'), config_default['api_pass'].strip('\"')) xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.connect() xmpp.process()