import configparser import requests import logging import json import os import subprocess class QBittorrentAPICaller(): def __init__(self, url, username, password): self.url = self.clean_url(url) self.username = username self.password = password self.session = requests.Session() #def __del__(self): #self.logout() def clean_url(self, url): to_ret = url # Prepend protocol (https by default) if not to_ret.startswith(('http://', 'https://')): to_ret = 'https://' + to_ret # Remove trailing / if url.endswith('/'): to_ret = to_ret[:-1] return to_ret def truncate_string(self, s, max_length=25): return s[:max_length] + '...' if len(s) > max_length else s def login(self): the_url = self.url + "/" + "api/v2/auth/login" data={"username":self.username, "password":self.password} resp = self.session.post(the_url, data=data) return resp; def logout(self): the_url = self.url + "/" + "api/v2/auth/logout" resp = self.session.post(the_url) def version(self): the_url = self.url + "/" + "api/v2/app/version" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def webapiVersion(self): the_url = self.url + "/" + "api/v2/app/webapiVersion" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return resp.text def buildInfo(self): the_url = self.url + "/" + "api/v2/app/buildInfo" resp = self.session.get(the_url) if resp.status_code != 200: return "Got status" + str(resp.status_code) return json_to_key_value_string(resp.text) def torrentList(self, modifier=""): the_url = self.url + "/" + "api/v2/torrents/info" to_ret = "```\n" for f in ["downloading", "completed"]: resp = self.session.post(the_url, data={"filter":f}) if resp.status_code != 200: return "Got status" + str(resp.status_code) parsed = json.loads(resp.text) to_ret += f.upper() + "\n" if f == "downloading": for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n" else: for p in parsed: to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n" to_ret += "--\n" to_ret += "```" return to_ret def torrentListComplete(self): the_url = self.url + "/" + "api/v2/torrents/info" resp = self.session.post(the_url, data={"filter":"completed"}) return resp def add(self, url, username, category="unknown"): if not url.startswith("magnet:"): return "Please supply a magnet link (begins with \"magnet:\")" the_url = self.url + "/" + "api/v2/torrents/add" resp = self.session.post(the_url, data={"urls":url,"category":category,"tags":username}) if (resp.status_code == 415): return "Torrent file not valid" if (resp.status_code == 200): # Get the hash magnet_hash = url.split(":")[3].split("&")[0].lower() # Make request to torrentlist with particular hash the_url2 = self.url + "/" + "api/v2/torrents/info" time.sleep(5) resp2 = self.session.post(the_url2) if (resp2.status_code != 200): return "Could not verify if torrent was added" parsed = json.loads(resp2.text) for p in parsed: if p["hash"] == magnet_hash: if p["num_seeds"] == 0: return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds" else: return "Successfully added " + p["name"] return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")" def get_search_plugins(self): the_url = self.url + "/" + "api/v2/search/plugins" return self.session.post(the_url) def search_start(self, searchstring, category="all"): the_url = self.url + "/" + "api/v2/search/start" return self.session.post(the_url, data={"pattern":searchstring, "plugins":"enabled", "category":category}) def search_status(self, search_id): the_url = self.url + "/" + "api/v2/search/status" return self.session.post(the_url, data={"id":search_id}) def search_stop(self, search_id): the_url = self.url + "/" + "api/v2/search/stop" return self.session.post(the_url, data={"id":search_id}) def search_results(self, search_id): the_url = self.url + "/" + "api/v2/search/results" return self.session.post(the_url, data={"id":search_id}) def search_delete(self, search_id): the_url = self.url + "/" + "api/v2/search/delete" return self.session.post(the_url, data={"id":search_id}) def torrents_delete(self, the_hash, deleteFiles): the_url = self.url + "/" + "api/v2/torrents/delete" return self.session.post(the_url, data={"hashes":the_hash, "deleteFiles":deleteFiles}) if __name__ == '__main__': logging.basicConfig(filename="./log.log", level=logging.INFO, format='%(levelname)-8s %(message)s') config = configparser.ConfigParser() if not os.path.exists("./config.conf"): logging.error("Could not find ./config.conf, please ensure it exists") exit config.read('config.conf') config_default = config['API'] config_url = "https://" + config_default['api_url'].strip("\"") if config_default['api_url'] is not None else "" config_user = config_default['api_user'].strip("\"") if config_default['api_url'] is not None else "" config_pass = config_default['api_pass'].strip("\"") if config_default['api_url'] is not None else "" caller = QBittorrentAPICaller(config_url, config_user, config_pass) resp = caller.login() if resp.status_code != 200: logging.error("Could not login to API") exit resp = caller.torrentListComplete() if (resp.status_code != 200): logging.error("Could not query torrent list") exit config_cat = config['CATEGORIES'] torrents = json.loads(resp.text) for t in torrents: if config.has_option("CATEGORIES", t["category"]): logging.info("Downloading " + t["name"] + " to " + config_cat[t["category"]]) the_hash = t["hash"] subprocess.call(["scp", "-r", config_user + "@" + config_default['ssh_url'].strip("\"") + ":" + t["content_path"], config_cat[t["category"]]]) # Delete torrent caller.torrents_delete(the_hash, "true") else: logging.warning("Warning: no path configured for torrent with category " + t["category"] + " (" + t["name"] + ")") caller.logout() exit