xmpp-qbittorrent-bot/bot.py

322 lines
12 KiB
Python

import asyncio
import logging
import slixmpp
import requests
import json
import datetime
import time
import configparser
import os
import threading
def json_to_key_value_string(json_data):
# Parse JSON data
data = json.loads(json_data)
# Initialize an empty list to store key-value pairs
result_lines = []
# Recursively process the JSON data
def process_item(item, prefix=''):
if isinstance(item, dict):
for key, value in item.items():
if isinstance(value, (dict, list)):
process_item(value, f"{prefix}{key}.")
else:
result_lines.append(f"{prefix}{key}: {value}")
elif isinstance(item, list):
for i, value in enumerate(item):
process_item(value, f"{prefix}[{i}].")
process_item(data)
return "\n".join(result_lines)
class QBittorrentAPICaller():
def __init__(self, url, username, password):
self.url = self.clean_url(url)
self.username = username
self.password = password
self.session = requests.Session()
self.reestablish_session()
def __del__(self):
self.logout()
def clean_url(self, url):
to_ret = url
# Prepend protocol (https by default)
if not to_ret.startswith(('http://', 'https://')):
to_ret = 'https://' + to_ret
# Remove trailing /
if url.endswith('/'):
to_ret = to_ret[:-1]
return to_ret
# We need to reestablish a connection to the server periodically, this does that
def reestablish_session(self):
# Every 10 minutes, run this
threading.Timer(600, self.reestablish_session).start()
self.logout()
self.login()
logging.info("Reestablished connection to QBittorrent server")
def truncate_string(self, s, max_length=25):
return s[:max_length] + '...' if len(s) > max_length else s
def login(self):
the_url = self.url + "/" + "api/v2/auth/login"
data={"username":self.username, "password":self.password}
resp = self.session.post(the_url, data=data)
def logout(self):
the_url = self.url + "/" + "api/v2/auth/logout"
resp = self.session.post(the_url)
def version(self):
the_url = self.url + "/" + "api/v2/app/version"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return resp.text
def webapiVersion(self):
the_url = self.url + "/" + "api/v2/app/webapiVersion"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return resp.text
def buildInfo(self):
the_url = self.url + "/" + "api/v2/app/buildInfo"
resp = self.session.get(the_url)
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
return json_to_key_value_string(resp.text)
def torrentList(self, modifier=""):
the_url = self.url + "/" + "api/v2/torrents/info"
to_ret = "```\n"
for f in ["downloading", "completed"]:
resp = self.session.post(the_url, data={"filter":f})
if resp.status_code != 200:
return "Got status" + str(resp.status_code)
parsed = json.loads(resp.text)
to_ret += f.upper() + "\n"
if f == "downloading":
for p in parsed:
to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + " | " + "{:.2f}".format(float(p["progress"])*100) + "% | " + "{:.2f}".format(float(p["dlspeed"])/1000000) + " MB/s" + " | " + str(datetime.timedelta(seconds=int(p["eta"]))) + "\n"
else:
for p in parsed:
to_ret += (self.truncate_string(p["name"]) if modifier == "full" else p["name"]) + "\n"
to_ret += "--\n"
to_ret += "```"
return to_ret
def add(self, url, username):
if not url.startswith("magnet:"):
return "Please supply a magnet link (begins with \"magnet:\")"
the_url = self.url + "/" + "api/v2/torrents/add"
resp = self.session.post(the_url, data={"urls":url,"category":username})
if (resp.status_code == 415):
return "Torrent file not valid"
if (resp.status_code == 200):
# Get the hash
magnet_hash = url.split(":")[3].split("&")[0].lower()
# Make request to torrentlist with particular hash
the_url2 = self.url + "/" + "api/v2/torrents/info"
time.sleep(5)
resp2 = self.session.post(the_url2)
if (resp2.status_code != 200):
return "Could not verify if torrent was added"
parsed = json.loads(resp2.text)
for p in parsed:
if p["hash"] == magnet_hash:
if p["num_seeds"] == 0:
return "Successfully added " + p["name"] + "\nWarning: torrent has 0 seeds after 5 seconds"
else:
return "Successfully added " + p["name"]
return "Could not add torrent, please double check the magnet link (hash=" + magnet_hash + ")"
class QBBot(slixmpp.ClientXMPP):
def __init__(self, jid, password, room, nick, api_url, api_username, api_password):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.room = room
self.nick = nick
self.api_caller = QBittorrentAPICaller(api_url, api_username, api_password)
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
# The groupchat_message event is triggered whenever a message
# stanza is received from any chat room. If you also also
# register a handler for the 'message' event, MUC messages
# will be processed by both handlers.
self.add_event_handler("groupchat_message", self.muc_message)
# If you wanted more functionality, here's how to register plugins:
# self.register_plugin('xep_0030') # Service Discovery
# self.register_plugin('xep_0199') # XMPP Ping
# Here's how to access plugins once you've registered them:
# self['xep_0030'].add_feature('echo_demo')
async def session_start(self, event):
await self.get_roster()
self.send_presence()
self.plugin['xep_0045'].join_muc(self.room, self.nick)
avatar_file = "./qbittorrent_logo.png"
try:
avatar_file = open(avatar_file, 'rb')
except IOError:
logging.error("Could not find avatar file")
return self.disconnect()
avatar = avatar_file.read()
avatar_type = 'image/png'
avatar_id = self['xep_0084'].generate_id(avatar)
avatar_bytes = len(avatar)
avatar_file.close()
used_xep84 = False
logging.info('Publish XEP-0084 avatar data')
result = await self['xep_0084'].publish_avatar(avatar)
if isinstance(result, slixmpp.exceptions.XMPPError):
logging.warning('Could not publish XEP-0084 avatar')
else:
used_xep84 = True
logging.info('Update vCard with avatar')
result = await self['xep_0153'].set_avatar(avatar=avatar, mtype=avatar_type)
if isinstance(result, slixmpp.exceptions.XMPPError):
print('Could not set vCard avatar')
if used_xep84:
logging.info('Advertise XEP-0084 avatar metadata')
result = await self['xep_0084'].publish_avatar_metadata([
{'id': avatar_id,
'type': avatar_type,
'bytes': avatar_bytes}
# We could advertise multiple avatars to provide
# options in image type, source (HTTP vs pubsub),
# size, etc.
# {'id': ....}
])
if isinstance(result, slixmpp.exceptions.XMPPError):
logging.warning('Could not publish XEP-0084 metadata')
logging.info('Wait for presence updates to propagate...')
#self.schedule('end', 5, self.disconnect, kwargs={'wait': True})
# Most get_*/set_* methods from plugins use Iq stanzas, which
# are sent asynchronously. You can almost always provide a
# callback that will be executed when the reply is received.
def message(self, msg):
if msg['type'] in ('chat', 'normal'):
msg.reply("I was not designed to take direct messages, please use me in a MUC").send()
def muc_message(self, msg):
"""
Process incoming message stanzas from any chat room. Be aware
that if you also have any handlers for the 'message' event,
message stanzas may be processed by both handlers, so check
the 'type' attribute when using a 'message' event handler.
Whenever the bot's nickname is mentioned, respond to
the message.
IMPORTANT: Always check that a message is not from yourself,
otherwise you will create an infinite loop responding
to your own messages.
This handler will reply to messages that mention
the bot's nickname.
Arguments:
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
"""
#if msg['mucnick'] != self.nick and self.nick in msg['body']:
if msg['mucnick'] != self.nick and msg['body'].startswith(self.nick):
message = ""
tokens = msg['body'].split()
match tokens[1]:
case "info":
#message += "URL: " + self.api_caller.url + "\n"
message += "QBittorrent Version: " + self.api_caller.version() + "\n"
message += "Web API Version: " + self.api_caller.webapiVersion() + "\n"
message += self.api_caller.buildInfo()
# don't know why, but this behavior is reversed. Too lazy to figure it out
case "fulllist":
message += self.api_caller.torrentList("")
case "list":
message += self.api_caller.torrentList("full")
case "add":
message += self.api_caller.add(tokens[2], msg['mucnick'])
case "help"|_:
message += "Commands\n"
message += "info: Displays information about QBittorrent server" + "\n"
message += "help: Displays this help" + "\n"
message += "list: Lists torrents, downloading torrents' names truncated to 25 characters\n"
message += "fulllist: Lists torrents, no name truncation\n"
message += "add [MAGNET_URL]: Adds torrent corresponding to MAGNET_URL to the download list. Note that this will take about 5 seconds, as there's a check for 0 seeds after 5 seconds as a warning"
self.send_message(mto=msg['from'].bare,
mbody=message,
mtype='groupchat')
if __name__ == '__main__':
# Ideally use optparse or argparse to get JID,
# password, and log level.
logging.basicConfig(level=logging.INFO,
format='%(levelname)-8s %(message)s')
config = configparser.ConfigParser()
if not os.path.exists("./config.conf"):
logging.error("Could not find ./config.conf, please ensure it exists")
exit
config.read('config.conf')
config_default = config['DEFAULT']
xmpp = QBBot(config_default['qb_jid'].strip('\"'),
config_default['qb_pass'].strip('\"'),
config_default['qb_muc'].strip('\"'),
config_default['qb_nick'].strip('\"'),
config_default['api_url'].strip('\"'),
config_default['api_user'].strip('\"'),
config_default['api_pass'].strip('\"'))
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0045') # Multi-User Chat
xmpp.register_plugin('xep_0199') # XMPP Ping
xmpp.register_plugin('xep_0084') # vCard avatar
xmpp.register_plugin('xep_0153') # Something required for vCard
xmpp.connect()
xmpp.process()