import asyncio import logging import slixmpp import requests import json import datetime import time import configparser import os import threading def json_to_key_value_string(json_data): # Parse JSON data data = json.loads(json_data) # Initialize an empty list to store key-value pairs result_lines = [] # Recursively process the JSON data def process_item(item, prefix=''): if isinstance(item, dict): for key, value in item.items(): if isinstance(value, (dict, list)): process_item(value, f"{prefix}{key}.") else: result_lines.append(f"{prefix}{key}: {value}") elif isinstance(item, list): for i, value in enumerate(item): process_item(value, f"{prefix}[{i}].") process_item(data) return "\n".join(result_lines) class QBittorrentAPICaller(): def __init__(self, url, username, password): self.url = self.clean_url(url) self.username = username self.password = password self.session = requests.Session() self.reestablish_session() def __del__(self): self.logout() def clean_url(self, url): to_ret = url # Prepend protocol (https by default) if not to_ret.startswith(('http://', 'https://')): to_ret = 'https://' + to_ret # Remove trailing / if url.endswith('/'): to_ret = to_ret[:-1] return to_ret # We need to reestablish a connection to the server periodically, this does that def reestablish_session(self): # Every 10 minutes, run this threading.Timer(600, self.reestablish_session).start() self.logout() self.login() logging.info("Reestablished connection to QBittorrent server") def truncate_string(self, s, max_length=25): return s[:max_length] + '...' if len(s) > max_length else s def login(self): the_url = self.url + "/" + "api/v2/auth/login" data={"username":self.username, "password":self.password} resp = self.session.post(the_url, data=data) def logout(self): the_url = self.url + "/" + "api/v2/auth/logout" resp = self.session.post(the_url) def version(self): the_url = self.url + "/" + "api/v2/app/version" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def webapiVersion(self): the_url = self.url + "/" + "api/v2/app/webapiVersion" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def buildInfo(self): the_url = self.url + "/" + "api/v2/app/buildInfo" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return json_to_key_value_string(resp.text) def torrentList(self, modifier=""): the_url = self.url + "/" + "api/v2/torrents/info" to_ret = "```\n" for f in ["downloading", "completed"]: resp = self.session.post(the_url, data={"filter":f}) if resp.status_code != 200: return "Got status" + str(resp.status_code) parsed = json.loads(resp.text) to_ret += f.upper() + "\n" if f == "downloading": for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n" else: for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n" to_ret += "--\n" to_ret += "```" return to_ret def add(self, url, username): if not url.startswith("magnet:"): return "Please supply a magnet link (begins with \"magnet:\")" the_url = self.url + "/" + "api/v2/torrents/add" resp = self.session.post(the_url, data={"urls":url,"category":username}) if (resp.status_code == 415): return "Torrent file not valid" if (resp.status_code == 200): # Get the hash magnet_hash = url.split(":")[3].split("&")[0].lower() # Make request to torrentlist with particular hash the_url2 = self.url + "/" + "api/v2/torrents/info" time.sleep(5) resp2 = self.session.post(the_url2) if (resp2.status_code != 200): return "Could not verify if torrent was added" parsed = json.loads(resp2.text) for p in parsed: if p["hash"] == magnet_hash: if p["num_seeds"] == 0: return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds" else: return "Successfully added " + p["name"] return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")" def get_search_plugins(self): the_url = self.url + "/" + "api/v2/search/plugins" return self.session.post(the_url) def search_start(self, searchstring, category="all"): the_url = self.url + "/" + "api/v2/search/start" return self.session.post(the_url, data={"pattern":searchstring, "plugins":"enabled", "category":category}) def search_status(self, search_id): the_url = self.url + "/" + "api/v2/search/status" return self.session.post(the_url, data={"id":search_id}) def search_stop(self, search_id): the_url = self.url + "/" + "api/v2/search/stop" return self.session.post(the_url, data={"id":search_id}) def search_results(self, search_id): the_url = self.url + "/" + "api/v2/search/results" return self.session.post(the_url, data={"id":search_id}) def search_delete(self, search_id): the_url = self.url + "/" + "api/v2/search/delete" return self.session.post(the_url, data={"id":search_id}) class QBBot(slixmpp.ClientXMPP): def __init__(self, jid, password, room, nick, api_url, api_username, api_password): slixmpp.ClientXMPP.__init__(self, jid, password) self.room = room self.nick = nick self.api_caller = QBittorrentAPICaller(api_url, api_username, api_password) self.searchselects = {} self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) # The groupchat_message event is triggered whenever a message # stanza is received from any chat room. If you also also # register a handler for the 'message' event, MUC messages # will be processed by both handlers. self.add_event_handler("groupchat_message", self.muc_message) # If you wanted more functionality, here's how to register plugins: # self.register_plugin('xep_0030') # Service Discovery # self.register_plugin('xep_0199') # XMPP Ping # Here's how to access plugins once you've registered them: # self['xep_0030'].add_feature('echo_demo') async def session_start(self, event): await self.get_roster() self.send_presence() self.plugin['xep_0045'].join_muc(self.room, self.nick) avatar_file = "./qbittorrent_logo.png" try: avatar_file = open(avatar_file, 'rb') except IOError: logging.error("Could not find avatar file") return self.disconnect() avatar = avatar_file.read() avatar_type = 'image/png' avatar_id = self['xep_0084'].generate_id(avatar) avatar_bytes = len(avatar) avatar_file.close() used_xep84 = False logging.info('Publish XEP-0084 avatar data') result = await self['xep_0084'].publish_avatar(avatar) if isinstance(result, slixmpp.exceptions.XMPPError): logging.warning('Could not publish XEP-0084 avatar') else: used_xep84 = True logging.info('Update vCard with avatar') result = await self['xep_0153'].set_avatar(avatar=avatar, mtype=avatar_type) if isinstance(result, slixmpp.exceptions.XMPPError): print('Could not set vCard avatar') if used_xep84: logging.info('Advertise XEP-0084 avatar metadata') result = await self['xep_0084'].publish_avatar_metadata([ {'id': avatar_id, 'type': avatar_type, 'bytes': avatar_bytes} # We could advertise multiple avatars to provide # options in image type, source (HTTP vs pubsub), # size, etc. # {'id': ....} ]) if isinstance(result, slixmpp.exceptions.XMPPError): logging.warning('Could not publish XEP-0084 metadata') logging.info('Wait for presence updates to propagate...') #self.schedule('end', 5, self.disconnect, kwargs={'wait': True}) # Most get_*/set_* methods from plugins use Iq stanzas, which # are sent asynchronously. You can almost always provide a # callback that will be executed when the reply is received. def message(self, msg): if msg['type'] in ('chat', 'normal'): msg.reply("I was not designed to take direct messages, please use me in a MUC").send() def muc_message(self, msg): """ Process incoming message stanzas from any chat room. Be aware that if you also have any handlers for the 'message' event, message stanzas may be processed by both handlers, so check the 'type' attribute when using a 'message' event handler. Whenever the bot's nickname is mentioned, respond to the message. IMPORTANT: Always check that a message is not from yourself, otherwise you will create an infinite loop responding to your own messages. This handler will reply to messages that mention the bot's nickname. Arguments: msg -- The received message stanza. See the documentation for stanza objects and the Message stanza to see how it may be used. """ #if msg['mucnick'] != self.nick and self.nick in msg['body']: if msg['mucnick'] != self.nick and msg['body'].startswith(self.nick): message = "" tokens = msg['body'].split() match tokens[1]: case "info": #message += "URL: " + self.api_caller.url + "\n" message += "QBittorrent Version: " + self.api_caller.version() + "\n" message += "Web API Version: " + self.api_caller.webapiVersion() + "\n" message += self.api_caller.buildInfo() # don't know why, but this behavior is reversed. Too lazy to figure it out case "fulllist": message += self.api_caller.torrentList("") case "list": message += self.api_caller.torrentList("full") case "add": message += self.api_caller.add(tokens[2], msg['mucnick']) case "searchplugins": resp = self.api_caller.get_search_plugins() if (resp.status_code != 200): message += "get_search_plugins() returned " + str(resp.status_code) logging.warning("get_search_plugins() returned" + str(resp.status_code)) else: parsed = json.loads(resp.text) message += "```\n" message += "\n".join([p["fullName"] + " (" + p["url"] + ")" + " v" + p["version"] + " | " + ("enabled" if p["enabled"] else "disabled") + " | " + ", ".join([c["id"] for c in p["supportedCategories"]]) for p in parsed]) message += "\n```" case "search": category = "all" search_token_start=2 if tokens[2].startswith("c="): category = tokens[2].removeprefix("c=") search_token_start=3 search_string = " ".join(tokens[search_token_start:]) resp = self.api_caller.search_start(search_string, category) if resp.status_code == 409: message += "Server reported too many searches!" else: search_id = json.loads(resp.text)["id"] # 30 second timeout count = 6 while (count >= 0 and json.loads(self.api_caller.search_status(search_id).text)[0]["status"] != "Stopped"): time.sleep(5) count -= 1 if count == 0: self.api_caller.search_stop(search_id) message += "Search took longer than 30 seconds!" res = self.api_caller.search_results(search_id) # Delete the search, we already have the data self.api_caller.search_delete(search_id) parsed_res = json.loads(res.text) message += "```\n" message += "Total results for \"" + search_string + "\": " + str(parsed_res["total"]) + "\n" the_list = [[r["fileName"], r["fileUrl"], str(r["nbSeeders"]), r["fileSize"]] for r in parsed_res["results"]] # Remove torrents with no seeds from the search results the_list = [i for i in the_list if int(i[2]) != 0] message += "\n".join([str(i) + ". " + l[0] + ", " "seeds: " + str(l[2]) + ", " "size: " + '{0:.2f}'.format(int(l[3])/1024/1024/1024) + " GB" for i, l in enumerate(the_list)]) message += "\n```" # Register the users's latest search in the searchselects structure user = msg['mucnick'] self.searchselects[user] = the_list case "searchselect": user = msg['mucnick'] if self.searchselects[user] is None: message += "Please initiate a search first." else: selection = self.searchselects[user] index = int(tokens[2]) if index < len(selection): link = selection[index][1] self.api_caller.add(link, user) message += "Successfully added " + selection[index][0] else: message += "Error: index out of range" case "searchhelp": message += "Conducting a search\n" message += "-------------------\n" message += "The whole search process is done with 2 commands.\n First, `search` is used to obtain a list of results.\n The optional `c=CATEGORY` selects a category, by default it is \"all\".\n The full list of categories can be obtained from `searchplugins`, 3rd column.\n Note that only enabled plugins are utilized by this feature.\n I highly recommend using a category, or the search can take a really long time.\n Once the search process concludes (it takes at most 30 seconds), you will receive a list of indices, along with names, number of seeders, and file size.\n Once you choose the torrent you want, note the index and invoke `searchselect INDEX`.\n This will add the torrent to the queue.\n The following is an example usage of the search functionality.\n" message += "```\n" message += self.nick + " search c=software ubuntu 16.04\n\n" message += "Total results for \"ubuntu 16.04\": 18\n" message += "0. Ubuntu MATE 16.04.2 [MATE][armhf][img.xz][Uzerus], seeds: 260, size: 1.10 GB\n" message += "1. Ubuntu 16.04.1 LTS Desktop 64-bit, seeds: 55, size: 1.40 GB\n" message += "2. Ubuntu 16.04.5 LTS [Xenial Xerus][Unity][x64 x86_64 amd64][Server][ISO][Uzerus], seeds: 8, size: 0.60 GB\n" message += "...\n\n" message += self.nick + " searchselect 0\n\n" message += "Successfully added Ubuntu MATE 16.04.2 [MATE][armhf][img.xz][Uzerus]\n" message += "```\n" message += "Note: .torrent files are not supported right now, some results may return .torrent file links rather than magnet links\n" case "help"|_: message += "```\n" message += "Commands\n" message += "info: Displays information about QBittorrent server" + "\n" message += "help: Displays this help" + "\n" message += "list: Lists torrents, names truncated to 25 characters\n" message += "fulllist: Lists torrents, no name truncation\n" message += "add MAGNET_URL: Adds torrent corresponding to MAGNET_URL to the download list. Note that this will take about 5 seconds, as there's a check for 0 seeds after 5 seconds as a warning\n" message += "searchplugins: List the installed search plugins\n" message += "search [c=CATEGORY] search_string: Search from all enabled plugins for search_string, with optional category CATEGORY. Valid categories can be found from the searchplugins command.\n" message += "searchselect: Selects a torrent from a previous search to download\n" message += "searchhelp: Shows the in-depth process of utilizing search" message += "\n```" self.send_message(mto=msg['from'].bare, mbody=message, mtype='groupchat') if __name__ == '__main__': # Ideally use optparse or argparse to get JID, # password, and log level. logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(message)s') config = configparser.ConfigParser() if not os.path.exists("./config.conf"): logging.error("Could not find ./config.conf, please ensure it exists") exit config.read('config.conf') config_default = config['DEFAULT'] xmpp = QBBot(config_default['qb_jid'].strip('\"'), config_default['qb_pass'].strip('\"'), config_default['qb_muc'].strip('\"'), config_default['qb_nick'].strip('\"'), config_default['api_url'].strip('\"'), config_default['api_user'].strip('\"'), config_default['api_pass'].strip('\"')) xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0045') # Multi-User Chat xmpp.register_plugin('xep_0199') # XMPP Ping xmpp.register_plugin('xep_0084') # vCard avatar xmpp.register_plugin('xep_0153') # Something required for vCard xmpp.connect() xmpp.process()