qbittorrent-downloader/main.py

203 lines
7.4 KiB
Python

import configparser
import requests
import logging
import json
import os
import subprocess
class QBittorrentAPICaller():
def __init__(self, url, username, password):
self.url = self.clean_url(url)
self.username = username
self.password = password
self.session = requests.Session()
#def __del__(self):
#self.logout()
def clean_url(self, url):
to_ret = url
# Prepend protocol (https by default)
if not to_ret.startswith(('http://', 'https://')):
to_ret = 'https://' + to_ret
# Remove trailing /
if url.endswith('/'):
to_ret = to_ret[:-1]
return to_ret
def truncate_string(self, s, max_length=25):
return s[:max_length] + '...' if len(s) > max_length else s
def login(self):
the_url = self.url + "/" + "api/v2/auth/login"
data={"username":self.username, "password":self.password}
resp = self.session.post(the_url, data=data)
return resp;
def logout(self):
the_url = self.url + "/" + "api/v2/auth/logout"
resp = self.session.post(the_url)
def version(self):
the_url = self.url + "/" + "api/v2/app/version"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return resp.text
def webapiVersion(self):
the_url = self.url + "/" + "api/v2/app/webapiVersion"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return resp.text
def buildInfo(self):
the_url = self.url + "/" + "api/v2/app/buildInfo"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return json_to_key_value_string(resp.text)
def torrentList(self, modifier=""):
the_url = self.url + "/" + "api/v2/torrents/info"
to_ret = "```\n"
for f in ["downloading", "completed"]:
resp = self.session.post(the_url, data={"filter":f})
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
parsed = json.loads(resp.text)
to_ret += f.upper() + "\n"
if f == "downloading":
for p in parsed:
to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n"
else:
for p in parsed:
to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n"
to_ret += "--\n"
to_ret += "```"
return to_ret
def torrentListComplete(self):
the_url = self.url + "/" + "api/v2/torrents/info"
resp = self.session.post(the_url, data={"filter":"completed"})
return resp
def add(self, url, username, category="unknown"):
if not url.startswith("magnet:"):
return "Please supply a magnet link (begins with \"magnet:\")"
the_url = self.url + "/" + "api/v2/torrents/add"
resp = self.session.post(the_url, data={"urls":url,"category":category,"tags":username})
if (resp.status_code == 415):
return "Torrent file not valid"
if (resp.status_code == 200):
# Get the hash
magnet_hash = url.split(":")[3].split("&")[0].lower()
# Make request to torrentlist with particular hash
the_url2 = self.url + "/" + "api/v2/torrents/info"
time.sleep(5)
resp2 = self.session.post(the_url2)
if (resp2.status_code != 200):
return "Could not verify if torrent was added"
parsed = json.loads(resp2.text)
for p in parsed:
if p["hash"] == magnet_hash:
if p["num_seeds"] == 0:
return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds"
else:
return "Successfully added " + p["name"]
return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")"
def get_search_plugins(self):
the_url = self.url + "/" + "api/v2/search/plugins"
return self.session.post(the_url)
def search_start(self, searchstring, category="all"):
the_url = self.url + "/" + "api/v2/search/start"
return self.session.post(the_url, data={"pattern":searchstring, "plugins":"enabled", "category":category})
def search_status(self, search_id):
the_url = self.url + "/" + "api/v2/search/status"
return self.session.post(the_url, data={"id":search_id})
def search_stop(self, search_id):
the_url = self.url + "/" + "api/v2/search/stop"
return self.session.post(the_url, data={"id":search_id})
def search_results(self, search_id):
the_url = self.url + "/" + "api/v2/search/results"
return self.session.post(the_url, data={"id":search_id})
def search_delete(self, search_id):
the_url = self.url + "/" + "api/v2/search/delete"
return self.session.post(the_url, data={"id":search_id})
def torrents_delete(self, the_hash, deleteFiles):
the_url = self.url + "/" + "api/v2/torrents/delete"
return self.session.post(the_url, data={"hashes":the_hash, "deleteFiles":deleteFiles})
if __name__ == '__main__':
logging.basicConfig(filename="./log.log", level=logging.INFO,
format='%(levelname)-8s %(message)s')
config = configparser.ConfigParser()
if not os.path.exists("./config.conf"):
logging.error("Could not find ./config.conf, please ensure it exists")
exit
config.read('config.conf')
config_default = config['API']
config_url = "https://" + config_default['api_url'].strip("\"") if config_default['api_url'] is not None else ""
config_user = config_default['api_user'].strip("\"") if config_default['api_url'] is not None else ""
config_pass = config_default['api_pass'].strip("\"") if config_default['api_url'] is not None else ""
caller = QBittorrentAPICaller(config_url, config_user, config_pass)
resp = caller.login()
if resp.status_code != 200:
logging.error("Could not login to API")
exit
resp = caller.torrentListComplete()
if (resp.status_code != 200):
logging.error("Could not query torrent list")
exit
config_cat = config['CATEGORIES']
torrents = json.loads(resp.text)
for t in torrents:
if config.has_option("CATEGORIES", t["category"]):
logging.info("Downloading " + t["name"] + " to " + config_cat[t["category"]])
the_hash = t["hash"]
subprocess.call(["scp",
"-r",
config_user + "@" + config_default['ssh_url'].strip("\"") + ":" + t["content_path"],
config_cat[t["category"]]])
# Delete torrent
caller.torrents_delete(the_hash, "true")
else:
logging.warning("Warning: no path configured for torrent with category " + t["category"] + " (" + t["name"] + ")")
caller.logout()
exit