269 lines
10 KiB
Python
269 lines
10 KiB
Python
|
import asyncio
|
||
|
import logging
|
||
|
import slixmpp
|
||
|
import requests
|
||
|
import json
|
||
|
import datetime
|
||
|
import time
|
||
|
import configparser
|
||
|
import os
|
||
|
|
||
|
def json_to_key_value_string(json_data):
|
||
|
# Parse JSON data
|
||
|
data = json.loads(json_data)
|
||
|
|
||
|
# Initialize an empty list to store key-value pairs
|
||
|
result_lines = []
|
||
|
|
||
|
# Recursively process the JSON data
|
||
|
def process_item(item, prefix=''):
|
||
|
if isinstance(item, dict):
|
||
|
for key, value in item.items():
|
||
|
if isinstance(value, (dict, list)):
|
||
|
process_item(value, f"{prefix}{key}.")
|
||
|
else:
|
||
|
result_lines.append(f"{prefix}{key}: {value}")
|
||
|
elif isinstance(item, list):
|
||
|
for i, value in enumerate(item):
|
||
|
process_item(value, f"{prefix}[{i}].")
|
||
|
|
||
|
process_item(data)
|
||
|
|
||
|
return "\n".join(result_lines)
|
||
|
|
||
|
|
||
|
class QBittorrentAPICaller():
|
||
|
def __init__(self, url, username, password):
|
||
|
self.url = self.clean_url(url)
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
self.session = requests.Session()
|
||
|
self.login()
|
||
|
|
||
|
def __del__(self):
|
||
|
self.logout()
|
||
|
|
||
|
|
||
|
def clean_url(self, url):
|
||
|
to_ret = url
|
||
|
# Prepend protocol (https by default)
|
||
|
if not to_ret.startswith(('http://', 'https://')):
|
||
|
to_ret = 'https://' + to_ret
|
||
|
|
||
|
# Remove trailing /
|
||
|
if url.endswith('/'):
|
||
|
to_ret = to_ret[:-1]
|
||
|
|
||
|
return to_ret
|
||
|
|
||
|
def truncate_string(self, s, max_length=25):
|
||
|
return s[:max_length] + '...' if len(s) > max_length else s
|
||
|
|
||
|
def login(self):
|
||
|
the_url = self.url + "/" + "api/v2/auth/login"
|
||
|
data={"username":self.username, "password":self.password}
|
||
|
|
||
|
resp = self.session.post(the_url, data=data)
|
||
|
|
||
|
def logout(self):
|
||
|
the_url = self.url + "/" + "api/v2/auth/logout"
|
||
|
resp = self.session.post(the_url)
|
||
|
|
||
|
def version(self):
|
||
|
the_url = self.url + "/" + "api/v2/app/version"
|
||
|
resp = self.session.get(the_url)
|
||
|
if resp.status_code != 200:
|
||
|
return "Got status" + str(resp.status_code)
|
||
|
return resp.text
|
||
|
|
||
|
def webapiVersion(self):
|
||
|
the_url = self.url + "/" + "api/v2/app/webapiVersion"
|
||
|
resp = self.session.get(the_url)
|
||
|
if resp.status_code != 200:
|
||
|
return "Got status" + str(resp.status_code)
|
||
|
return resp.text
|
||
|
|
||
|
def buildInfo(self):
|
||
|
the_url = self.url + "/" + "api/v2/app/buildInfo"
|
||
|
resp = self.session.get(the_url)
|
||
|
if resp.status_code != 200:
|
||
|
return "Got status" + str(resp.status_code)
|
||
|
|
||
|
return json_to_key_value_string(resp.text)
|
||
|
|
||
|
def torrentList(self, modifier=""):
|
||
|
the_url = self.url + "/" + "api/v2/torrents/info"
|
||
|
to_ret = ""
|
||
|
for f in ["downloading", "completed"]:
|
||
|
resp = self.session.post(the_url, data={"filter":f})
|
||
|
if resp.status_code != 200:
|
||
|
return "Got status" + str(resp.status_code)
|
||
|
|
||
|
parsed = json.loads(resp.text)
|
||
|
to_ret += f.upper() + "\n"
|
||
|
if f == "downloading":
|
||
|
for p in parsed:
|
||
|
to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n"
|
||
|
else:
|
||
|
for p in parsed:
|
||
|
to_ret += p["name"] + "\n"
|
||
|
|
||
|
to_ret += "--\n"
|
||
|
|
||
|
return to_ret
|
||
|
|
||
|
def add(self, url, username):
|
||
|
if not url.startswith("magnet:"):
|
||
|
return "Please supply a magnet link (begins with \"magnet:\")"
|
||
|
|
||
|
the_url = self.url + "/" + "api/v2/torrents/add"
|
||
|
|
||
|
resp = self.session.post(the_url, data={"urls":url,"category":username})
|
||
|
if (resp.status_code == 415):
|
||
|
return "Torrent file not valid"
|
||
|
|
||
|
if (resp.status_code == 200):
|
||
|
# Get the hash
|
||
|
magnet_hash = url.split(":")[3].split("&")[0].lower()
|
||
|
# Make request to torrentlist with particular hash
|
||
|
the_url2 = self.url + "/" + "api/v2/torrents/info"
|
||
|
time.sleep(5)
|
||
|
resp2 = self.session.post(the_url2)
|
||
|
if (resp2.status_code != 200):
|
||
|
return "Could not verify if torrent was added"
|
||
|
parsed = json.loads(resp2.text)
|
||
|
for p in parsed:
|
||
|
if p["hash"] == magnet_hash:
|
||
|
if p["num_seeds"] == 0:
|
||
|
return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds"
|
||
|
else:
|
||
|
return "Successfully added " + p["name"]
|
||
|
|
||
|
return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")"
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
class QBBot(slixmpp.ClientXMPP):
|
||
|
|
||
|
def __init__(self, jid, password, room, nick, api_url, api_username, api_password):
|
||
|
slixmpp.ClientXMPP.__init__(self, jid, password)
|
||
|
|
||
|
self.room = room
|
||
|
self.nick = nick
|
||
|
|
||
|
self.api_caller = QBittorrentAPICaller(api_url, api_username, api_password)
|
||
|
|
||
|
self.add_event_handler("session_start", self.session_start)
|
||
|
self.add_event_handler("message", self.message)
|
||
|
# The groupchat_message event is triggered whenever a message
|
||
|
# stanza is received from any chat room. If you also also
|
||
|
# register a handler for the 'message' event, MUC messages
|
||
|
# will be processed by both handlers.
|
||
|
self.add_event_handler("groupchat_message", self.muc_message)
|
||
|
|
||
|
# If you wanted more functionality, here's how to register plugins:
|
||
|
# self.register_plugin('xep_0030') # Service Discovery
|
||
|
# self.register_plugin('xep_0199') # XMPP Ping
|
||
|
|
||
|
# Here's how to access plugins once you've registered them:
|
||
|
# self['xep_0030'].add_feature('echo_demo')
|
||
|
|
||
|
async def session_start(self, event):
|
||
|
await self.get_roster()
|
||
|
self.send_presence()
|
||
|
self.plugin['xep_0045'].join_muc(self.room, self.nick)
|
||
|
|
||
|
# Most get_*/set_* methods from plugins use Iq stanzas, which
|
||
|
# are sent asynchronously. You can almost always provide a
|
||
|
# callback that will be executed when the reply is received.
|
||
|
|
||
|
def message(self, msg):
|
||
|
if msg['type'] in ('chat', 'normal'):
|
||
|
msg.reply("I was not designed to take direct messages, please use me in a MUC").send()
|
||
|
|
||
|
def muc_message(self, msg):
|
||
|
"""
|
||
|
Process incoming message stanzas from any chat room. Be aware
|
||
|
that if you also have any handlers for the 'message' event,
|
||
|
message stanzas may be processed by both handlers, so check
|
||
|
the 'type' attribute when using a 'message' event handler.
|
||
|
|
||
|
Whenever the bot's nickname is mentioned, respond to
|
||
|
the message.
|
||
|
|
||
|
IMPORTANT: Always check that a message is not from yourself,
|
||
|
otherwise you will create an infinite loop responding
|
||
|
to your own messages.
|
||
|
|
||
|
This handler will reply to messages that mention
|
||
|
the bot's nickname.
|
||
|
|
||
|
Arguments:
|
||
|
msg -- The received message stanza. See the documentation
|
||
|
for stanza objects and the Message stanza to see
|
||
|
how it may be used.
|
||
|
"""
|
||
|
#if msg['mucnick'] != self.nick and self.nick in msg['body']:
|
||
|
if msg['mucnick'] != self.nick and msg['body'].startswith(self.nick):
|
||
|
message = ""
|
||
|
tokens = msg['body'].split()
|
||
|
|
||
|
match tokens[1]:
|
||
|
case "info":
|
||
|
#message += "URL: " + self.api_caller.url + "\n"
|
||
|
message += "QBittorrent Version: " + self.api_caller.version() + "\n"
|
||
|
message += "Web API Version: " + self.api_caller.webapiVersion() + "\n"
|
||
|
message += self.api_caller.buildInfo()
|
||
|
|
||
|
# don't know why, but this behavior is reversed. Too lazy to figure it out
|
||
|
case "fulllist":
|
||
|
message += self.api_caller.torrentList("")
|
||
|
case "list":
|
||
|
message += self.api_caller.torrentList("full")
|
||
|
|
||
|
case "add":
|
||
|
message += self.api_caller.add(tokens[2], msg['mucnick'])
|
||
|
|
||
|
case "help"|_:
|
||
|
message += "Commands\n"
|
||
|
message += "info: Displays information about QBittorrent server" + "\n"
|
||
|
message += "help: Displays this help" + "\n"
|
||
|
message += "list: Lists torrents, downloading torrents' names truncated to 25 characters\n"
|
||
|
message += "fulllist: Lists torrents, no name truncation\n"
|
||
|
message += "add [MAGNET_URL]: Adds torrent corresponding to MAGNET_URL to the download list. Note that this will take about 5 seconds, as there's a check for 0 seeds after 5 seconds as a warning"
|
||
|
|
||
|
self.send_message(mto=msg['from'].bare,
|
||
|
mbody=message,
|
||
|
mtype='groupchat')
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
# Ideally use optparse or argparse to get JID,
|
||
|
# password, and log level.
|
||
|
|
||
|
logging.basicConfig(level=logging.ERROR,
|
||
|
format='%(levelname)-8s %(message)s')
|
||
|
|
||
|
config = configparser.ConfigParser()
|
||
|
if not os.path.exists("./config.conf"):
|
||
|
logging.error("Could not find ./config.conf, please ensure it exists")
|
||
|
exit
|
||
|
|
||
|
config.read('config.conf')
|
||
|
config_default = config['DEFAULT']
|
||
|
xmpp = QBBot(config_default['qb_jid'].strip('\"'),
|
||
|
config_default['qb_pass'].strip('\"'),
|
||
|
config_default['qb_muc'].strip('\"'),
|
||
|
config_default['qb_nick'].strip('\"'),
|
||
|
config_default['api_url'].strip('\"'),
|
||
|
config_default['api_user'].strip('\"'),
|
||
|
config_default['api_pass'].strip('\"'))
|
||
|
|
||
|
|
||
|
xmpp.register_plugin('xep_0030') # Service Discovery
|
||
|
xmpp.register_plugin('xep_0045') # Multi-User Chat
|
||
|
xmpp.register_plugin('xep_0199') # XMPP Ping
|
||
|
xmpp.connect()
|
||
|
xmpp.process()
|