import asyncio import logging import slixmpp import requests import json import datetime import time import configparser import os import threading def json_to_key_value_string(json_data): # Parse JSON data data = json.loads(json_data) # Initialize an empty list to store key-value pairs result_lines = [] # Recursively process the JSON data def process_item(item, prefix=''): if isinstance(item, dict): for key, value in item.items(): if isinstance(value, (dict, list)): process_item(value, f"{prefix}{key}.") else: result_lines.append(f"{prefix}{key}: {value}") elif isinstance(item, list): for i, value in enumerate(item): process_item(value, f"{prefix}[{i}].") process_item(data) return "\n".join(result_lines) class QBittorrentAPICaller(): def __init__(self, url, username, password): self.url = self.clean_url(url) self.username = username self.password = password self.session = requests.Session() # self.reestablish_session() self.run_session_maintainer() def __del__(self): self.logout() def clean_url(self, url): to_ret = url # Prepend protocol (https by default) if not to_ret.startswith(('http://', 'https://')): to_ret = 'https://' + to_ret # Remove trailing / if url.endswith('/'): to_ret = to_ret[:-1] return to_ret # We need to reestablish a connection to the server periodically, this does that #def reestablish_session(self): # Every 10 minutes, run this #threading.Timer(600, self.reestablish_session).start() # self.logout() # self.login() # logging.info("Reestablished connection to QBittorrent server") def run_session_maintainer(self): def loop(): while True: try: self.logout() self.login() logging.info("Reestablished connection to QBittorrent server") except Exception as e: logging.error(f"Session maintainer error: {e}", exc_info=True) time.sleep(600) # 10 minutes threading.Thread(target=loop, daemon=True).start() def truncate_string(self, s, max_length=25): return s[:max_length] + '...' if len(s) > max_length else s def safe_get(self, path, params=None): try: # resp = self.session.get(self.url + path, params=params, timeout=5) resp = self.session.get(path, params=params, timeout=5) resp.raise_for_status() return resp except requests.exceptions.RequestException as e: logging.warning(f"GET {path} failed: {e}") return None def safe_post(self, path, data=None): try: # resp = self.session.post(self.url + path, data=data, timeout=5) resp = self.session.post(path, data=data, timeout=5) resp.raise_for_status() return resp except requests.exceptions.RequestException as e: logging.warning(f"POST {path} failed: {e}") return None def login(self): the_url = self.url + "/" + "api/v2/auth/login" data={"username":self.username, "password":self.password} try: resp = self.session.post(the_url, data=data, timeout=5) resp.raise_for_status() except requests.exceptions.RequestException as e: logging.warning(f"Login failed: {e}") #resp = self.session.post(the_url, data=data) def logout(self): the_url = self.url + "/" + "api/v2/auth/logout" # resp = self.session.post(the_url) try: resp = self.session.post(the_url, timeout=5) resp.raise_for_status() except requests.exceptions.RequestException as e: logging.warning(f"Logout failed: {e}") couldnt_connect_text = "Could not connect to qbittorrent server" def version(self): the_url = self.url + "/" + "api/v2/app/version" # resp = self.session.get(the_url) # if resp.status_code != 200: # return "Got status" + str(resp.status_code) # return resp.text resp = self.safe_get(the_url) if not resp: return self.couldnt_connect_text else: return resp.text def webapiVersion(self): the_url = self.url + "/" + "api/v2/app/webapiVersion" resp = self.safe_get(the_url) if not resp: return self.couldnt_connect_text else: return resp.text # try: # resp = self.session.get(the_url) # resp.raise_for_status() # return resp.text # except requests.exceptions.RequestException as e: # logging.warning(f"Couldn't get version") # if resp.status_code != 200: # return "Got status" + str(resp.status_code) def buildInfo(self): the_url = self.url + "/" + "api/v2/app/buildInfo" resp = self.safe_get(the_url) if not resp: return self.couldnt_connect_text else: return json_to_key_value_string(resp.text) # resp = self.session.get(the_url) # if resp.status_code != 200: # return "Got status" + str(resp.status_code) # return json_to_key_value_string(resp.text) def torrentList(self, modifier=""): the_url = self.url + "/" + "api/v2/torrents/info" to_ret = "```\n" for f in ["downloading", "completed"]: # resp = self.session.post(the_url, data={"filter":f}) # if resp.status_code != 200: # return "Got status" + str(resp.status_code) resp = self.safe_post(the_url, data={"filter":f}) if not resp: return self.couldnt_connect_text parsed = json.loads(resp.text) to_ret += f.upper() + "\n" if f == "downloading": for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n" else: for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n" to_ret += "--\n" to_ret += "```" return to_ret # Too lazy to make this safe, hopefully will be fine idk def add(self, url, username, category="unknown"): if not url.startswith("magnet:"): return "Please supply a magnet link (begins with \"magnet:\")" the_url = self.url + "/" + "api/v2/torrents/add" # resp = self.session.post(the_url, data={"urls":url,"category":category,"tags":username}) resp = self.safe_post(the_url, data={"urls":url,"category":category,"tags":username}) if not resp: return self.couldnt_connect_text if (resp.status_code == 415): return "Torrent file not valid" if (resp.status_code == 200): # Get the hash magnet_hash = url.split(":")[3].split("&")[0].lower() # Make request to torrentlist with particular hash the_url2 = self.url + "/" + "api/v2/torrents/info" time.sleep(5) # resp2 = self.session.post(the_url2) resp2 = self.safe_post(the_url2) if not resp2: return self.couldnt_connect_text if (resp2.status_code != 200): return "Could not verify if torrent was added" parsed = json.loads(resp2.text) for p in parsed: if p["hash"] == magnet_hash: if p["num_seeds"] == 0: return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds" else: return "Successfully added " + p["name"] return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")" def get_search_plugins(self): the_url = self.url + "/" + "api/v2/search/plugins" # return self.session.post(the_url) resp = self.safe_post(the_url) return resp if resp else self.couldnt_connect_text def search_start(self, searchstring, category="all"): the_url = self.url + "/" + "api/v2/search/start" resp = self.safe_post(the_url, data={"pattern":searchstring, "plugins":"enabled", "category":category}) return resp if resp else self.couldnt_connect_text # return self.session.post(the_url, data={"pattern":searchstring, "plugins":"enabled", "category":category}) def search_status(self, search_id): the_url = self.url + "/" + "api/v2/search/status" resp = self.safe_post(the_url, data={"id":search_id}) return resp if resp else self.couldnt_connect_text # return self.session.post(the_url, data={"id":search_id}) def search_stop(self, search_id): the_url = self.url + "/" + "api/v2/search/stop" resp = self.safe_post(the_url, data={"id":search_id}) return resp if resp else self.couldnt_connect_text # return self.session.post(the_url, data={"id":search_id}) def search_results(self, search_id): the_url = self.url + "/" + "api/v2/search/results" resp = self.safe_post(the_url, data={"id":search_id}) return resp if resp else self.couldnt_connect_text # return self.session.post(the_url, data={"id":search_id}) def search_delete(self, search_id): the_url = self.url + "/" + "api/v2/search/delete" resp = self.safe_post(the_url, data={"id":search_id}) return resp if resp else self.couldnt_connect_text # return self.session.post(the_url, data={"id":search_id}) class QBBot(slixmpp.ClientXMPP): def __init__(self, jid, password, room, nick, api_url, api_username, api_password): slixmpp.ClientXMPP.__init__(self, jid, password) self.room = room self.nick = nick self.api_caller = QBittorrentAPICaller(api_url, api_username, api_password) self.searchselects = {} self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) # The groupchat_message event is triggered whenever a message # stanza is received from any chat room. If you also also # register a handler for the 'message' event, MUC messages # will be processed by both handlers. self.add_event_handler("groupchat_message", self.muc_message) # If you wanted more functionality, here's how to register plugins: # self.register_plugin('xep_0030') # Service Discovery self.register_plugin('xep_0199') # XMPP Ping self['xep_0199'].enable_keepalive(interval=60, timeout=10) # Here's how to access plugins once you've registered them: # self['xep_0030'].add_feature('echo_demo') async def session_start(self, event): await self.get_roster() self.send_presence() self.plugin['xep_0045'].join_muc(self.room, self.nick) avatar_file = "./qbittorrent_logo.png" try: avatar_file = open(avatar_file, 'rb') except IOError: logging.error("Could not find avatar file") return self.disconnect() avatar = avatar_file.read() avatar_type = 'image/png' avatar_id = self['xep_0084'].generate_id(avatar) avatar_bytes = len(avatar) avatar_file.close() used_xep84 = False logging.info('Publish XEP-0084 avatar data') result = await self['xep_0084'].publish_avatar(avatar) if isinstance(result, slixmpp.exceptions.XMPPError): logging.warning('Could not publish XEP-0084 avatar') else: used_xep84 = True logging.info('Update vCard with avatar') result = await self['xep_0153'].set_avatar(avatar=avatar, mtype=avatar_type) if isinstance(result, slixmpp.exceptions.XMPPError): print('Could not set vCard avatar') if used_xep84: logging.info('Advertise XEP-0084 avatar metadata') result = await self['xep_0084'].publish_avatar_metadata([ {'id': avatar_id, 'type': avatar_type, 'bytes': avatar_bytes} # We could advertise multiple avatars to provide # options in image type, source (HTTP vs pubsub), # size, etc. # {'id': ....} ]) if isinstance(result, slixmpp.exceptions.XMPPError): logging.warning('Could not publish XEP-0084 metadata') logging.info('Wait for presence updates to propagate...') #self.schedule('end', 5, self.disconnect, kwargs={'wait': True}) # Most get_*/set_* methods from plugins use Iq stanzas, which # are sent asynchronously. You can almost always provide a # callback that will be executed when the reply is received. def message(self, msg): if msg['type'] in ('chat', 'normal'): msg.reply("I was not designed to take direct messages, please use me in a MUC").send() def muc_message(self, msg): """ Process incoming message stanzas from any chat room. Be aware that if you also have any handlers for the 'message' event, message stanzas may be processed by both handlers, so check the 'type' attribute when using a 'message' event handler. Whenever the bot's nickname is mentioned, respond to the message. IMPORTANT: Always check that a message is not from yourself, otherwise you will create an infinite loop responding to your own messages. This handler will reply to messages that mention the bot's nickname. Arguments: msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see how it may be used. """ #if msg['mucnick'] != self.nick and self.nick in msg['body']: if msg['mucnick'] != self.nick and msg['body'].startswith(self.nick): message = "" tokens = msg['body'].split() match tokens[1]: case "info": #message += "URL: " + self.api_caller.url + "\n" message += "QBittorrent Version: " + self.api_caller.version() + "\n" message += "Web API Version: " + self.api_caller.webapiVersion() + "\n" message += self.api_caller.buildInfo() # don't know why, but this behavior is reversed. Too lazy to figure it out case "fulllist": message += self.api_caller.torrentList("") case "list": message += self.api_caller.torrentList("full") case "add": category = tokens[2] if category not in ["all", "anime", "games", "movies", "music", "software", "tv", "unknown"]: message = "Please specify a category" else: string_start = 3 message += self.api_caller.add(tokens[string_start], msg['mucnick'], category) case "searchplugins": resp = self.api_caller.get_search_plugins() if (resp.status_code != 200): message += "get_search_plugins() returned " + str(resp.status_code) logging.warning("get_search_plugins() returned" + str(resp.status_code)) else: parsed = json.loads(resp.text) message += "```\n" message += "\n".join([p["fullName"] + " (" + p["url"] + ")" + " v" + p["version"] + " | " + ("enabled" if p["enabled"] else "disabled") + " | " + ", ".join([c["id"] for c in p["supportedCategories"]]) for p in parsed]) message += "\n```" case "search": if len(tokens) < 4: message = "Please specify a category and search term" else: category = tokens[2] if category not in ["all", "anime", "games", "movies", "music", "software", "tv"]: message = "Invalid category: " + category else: search_token_start = 3 # qb search CATEGORY ... search_string = " ".join(tokens[search_token_start:]) resp = self.api_caller.search_start(search_string, category) if resp.status_code == 409: message += "Server reported too many searches!" else: search_id = json.loads(resp.text)["id"] # 30 second timeout count = 6 while (count >= 0 and json.loads(self.api_caller.search_status(search_id).text)[0]["status"] != "Stopped"): time.sleep(5) count -= 1 if count == 0: self.api_caller.search_stop(search_id) message += "Search took longer than 30 seconds!" res = self.api_caller.search_results(search_id) # Delete the search, we already have the data self.api_caller.search_delete(search_id) parsed_res = json.loads(res.text) message += "```\n" the_list = [[r["fileName"], r["fileUrl"], str(r["nbSeeders"]), r["fileSize"]] for r in parsed_res["results"]] # Remove torrents with no seeds from the search results the_list = [i for i in the_list if int(i[2]) != 0] # Report the number of torrents in the search results message += "Total results for \"" + search_string + "\" under category " + category + ": " + str(len(the_list)) + "\n" message += "\n".join([str(i) + ". " + l[0] + ", " "seeds: " + str(l[2]) + ", " "size: " + '{0:.2f}'.format(int(l[3])/1024/1024/1024) + " GB" for i, l in enumerate(the_list)]) message += "\n```" # Register the users's latest search in the searchselects structure user = msg['mucnick'] self.searchselects[user] = [category if category != "all" else "unknown", the_list] case "searchselect": user = msg['mucnick'] if self.searchselects[user] is None: message += "Please initiate a search first." else: selection = self.searchselects[user][1] index = int(tokens[2]) if index < len(selection): link = selection[index][1] self.api_caller.add(link, user, self.searchselects[user][0]) message += "Successfully added " + selection[index][0] else: message += "Error: index out of range" case "searchhelp": message += "Conducting a search\n" message += "-------------------\n" message += "The whole search process is done with 2 commands.\n First, `search` is used to obtain a list of results.\n The required CATEGORY selects a category.\n The full list of categories can be obtained from `searchplugins`, 3rd column.\n Note that only enabled plugins are utilized by this feature.\n Once the search process concludes (it takes at most 30 seconds), you will receive a list of indices, along with names, number of seeders, and file size.\n Once you choose the torrent you want, note the index and invoke `searchselect INDEX`.\n This will add the torrent to the queue.\n The following is an example usage of the search functionality.\n" message += "```\n" message += self.nick + " search software ubuntu 16.04\n\n" message += "Total results for \"ubuntu 16.04\" in category software: 18\n" message += "0. Ubuntu MATE 16.04.2 [MATE][armhf][img.xz][Uzerus], seeds: 260, size: 1.10 GB\n" message += "1. Ubuntu 16.04.1 LTS Desktop 64-bit, seeds: 55, size: 1.40 GB\n" message += "2. Ubuntu 16.04.5 LTS [Xenial Xerus][Unity][x64 x86_64 amd64][Server][ISO][Uzerus], seeds: 8, size: 0.60 GB\n" message += "...\n\n" message += self.nick + " searchselect 0\n\n" message += "Successfully added Ubuntu MATE 16.04.2 [MATE][armhf][img.xz][Uzerus]\n" message += "```\n" message += "Note: .torrent files are not supported right now, some results may return .torrent file links rather than magnet links\n" case "help"|_: message += "```\n" message += "Commands\n" message += "info: Displays information about QBittorrent server" + "\n" message += "help: Displays this help" + "\n" message += "list: Lists torrents, names truncated to 25 characters\n" message += "fulllist: Lists torrents, no name truncation\n" message += "add CATEGORY MAGNET_URL: Adds torrent corresponding to MAGNET_URL to the download list with category CATEGORY.\n\tNote that this will take about 5 seconds, as there's a check for 0 seeds after 5 seconds as a warning.\n\tValid CATEGORY values can be found with the searchplugins command.\n" message += "searchplugins: List the installed search plugins\n" message += "search CATEGORY SEARCH_STRING: Search from all enabled plugins for SEARCH_STRING in CATEGORY. Valid CATEGORY values can be found from the searchplugins command.\n" message += "searchselect: Selects a torrent from a previous search to download\n" message += "searchhelp: Shows the in-depth process of utilizing search" message += "\n```" self.send_message(mto=msg['from'].bare, mbody=message, mtype='groupchat') if __name__ == '__main__': # Ideally use optparse or argparse to get JID, # password, and log level. logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(message)s') config = configparser.ConfigParser() if not os.path.exists("./config.conf"): logging.error("Could not find ./config.conf, please ensure it exists") exit config.read('config.conf') config_default = config['DEFAULT'] xmpp = QBBot(config_default['qb_jid'].strip('\"'), config_default['qb_pass'].strip('\"'), config_default['qb_muc'].strip('\"'), config_default['qb_nick'].strip('\"'), config_default['api_url'].strip('\"'), config_default['api_user'].strip('\"'), config_default['api_pass'].strip('\"')) xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.register_plugin('xep_0084') # vCard avatar xmpp.register_plugin('xep_0153') # Something required for vCard xmpp.connect() xmpp.process()