2024-07-28 01:26:36 -04:00
import asyncio
import logging
import slixmpp
import requests
import json
import datetime
import time
import configparser
import os
def json_to_key_value_string ( json_data ) :
# Parse JSON data
data = json . loads ( json_data )
# Initialize an empty list to store key-value pairs
result_lines = [ ]
# Recursively process the JSON data
def process_item ( item , prefix = ' ' ) :
if isinstance ( item , dict ) :
for key , value in item . items ( ) :
if isinstance ( value , ( dict , list ) ) :
process_item ( value , f " { prefix } { key } . " )
else :
result_lines . append ( f " { prefix } { key } : { value } " )
elif isinstance ( item , list ) :
for i , value in enumerate ( item ) :
process_item ( value , f " { prefix } [ { i } ]. " )
process_item ( data )
return " \n " . join ( result_lines )
class QBittorrentAPICaller ( ) :
def __init__ ( self , url , username , password ) :
self . url = self . clean_url ( url )
self . username = username
self . password = password
self . session = requests . Session ( )
self . login ( )
def __del__ ( self ) :
self . logout ( )
def clean_url ( self , url ) :
to_ret = url
# Prepend protocol (https by default)
if not to_ret . startswith ( ( ' http:// ' , ' https:// ' ) ) :
to_ret = ' https:// ' + to_ret
# Remove trailing /
if url . endswith ( ' / ' ) :
to_ret = to_ret [ : - 1 ]
return to_ret
def truncate_string ( self , s , max_length = 25 ) :
return s [ : max_length ] + ' ... ' if len ( s ) > max_length else s
def login ( self ) :
the_url = self . url + " / " + " api/v2/auth/login "
data = { " username " : self . username , " password " : self . password }
resp = self . session . post ( the_url , data = data )
def logout ( self ) :
the_url = self . url + " / " + " api/v2/auth/logout "
resp = self . session . post ( the_url )
def version ( self ) :
the_url = self . url + " / " + " api/v2/app/version "
resp = self . session . get ( the_url )
if resp . status_code != 200 :
return " Got status " + str ( resp . status_code )
return resp . text
def webapiVersion ( self ) :
the_url = self . url + " / " + " api/v2/app/webapiVersion "
resp = self . session . get ( the_url )
if resp . status_code != 200 :
return " Got status " + str ( resp . status_code )
return resp . text
def buildInfo ( self ) :
the_url = self . url + " / " + " api/v2/app/buildInfo "
resp = self . session . get ( the_url )
if resp . status_code != 200 :
return " Got status " + str ( resp . status_code )
return json_to_key_value_string ( resp . text )
def torrentList ( self , modifier = " " ) :
the_url = self . url + " / " + " api/v2/torrents/info "
2024-07-28 13:00:36 -04:00
to_ret = " ``` "
2024-07-28 01:26:36 -04:00
for f in [ " downloading " , " completed " ] :
resp = self . session . post ( the_url , data = { " filter " : f } )
if resp . status_code != 200 :
return " Got status " + str ( resp . status_code )
parsed = json . loads ( resp . text )
to_ret + = f . upper ( ) + " \n "
if f == " downloading " :
for p in parsed :
to_ret + = ( self . truncate_string ( p [ " name " ] ) if modifier == " full " else p [ " name " ] ) + " | " + " {:.2f} " . format ( float ( p [ " progress " ] ) * 100 ) + " % | " + " {:.2f} " . format ( float ( p [ " dlspeed " ] ) / 1000000 ) + " MB/s " + " | " + str ( datetime . timedelta ( seconds = int ( p [ " eta " ] ) ) ) + " \n "
else :
for p in parsed :
2024-07-28 13:00:36 -04:00
to_ret + = ( self . truncate_string ( p [ " name " ] ) if modifier == " full " else p [ " name " ] ) + " \n "
2024-07-28 01:26:36 -04:00
2024-07-28 13:00:36 -04:00
to_ret + = " -- \n ``` "
2024-07-28 01:26:36 -04:00
return to_ret
def add ( self , url , username ) :
if not url . startswith ( " magnet: " ) :
return " Please supply a magnet link (begins with \" magnet: \" ) "
the_url = self . url + " / " + " api/v2/torrents/add "
resp = self . session . post ( the_url , data = { " urls " : url , " category " : username } )
if ( resp . status_code == 415 ) :
return " Torrent file not valid "
if ( resp . status_code == 200 ) :
# Get the hash
magnet_hash = url . split ( " : " ) [ 3 ] . split ( " & " ) [ 0 ] . lower ( )
# Make request to torrentlist with particular hash
the_url2 = self . url + " / " + " api/v2/torrents/info "
time . sleep ( 5 )
resp2 = self . session . post ( the_url2 )
if ( resp2 . status_code != 200 ) :
return " Could not verify if torrent was added "
parsed = json . loads ( resp2 . text )
for p in parsed :
if p [ " hash " ] == magnet_hash :
if p [ " num_seeds " ] == 0 :
return " Successfully added " + p [ " name " ] + " \n Warning: torrent has 0 seeds after 5 seconds "
else :
return " Successfully added " + p [ " name " ]
return " Could not add torrent, please double check the magnet link (hash= " + magnet_hash + " ) "
class QBBot ( slixmpp . ClientXMPP ) :
def __init__ ( self , jid , password , room , nick , api_url , api_username , api_password ) :
slixmpp . ClientXMPP . __init__ ( self , jid , password )
self . room = room
self . nick = nick
self . api_caller = QBittorrentAPICaller ( api_url , api_username , api_password )
self . add_event_handler ( " session_start " , self . session_start )
self . add_event_handler ( " message " , self . message )
# The groupchat_message event is triggered whenever a message
# stanza is received from any chat room. If you also also
# register a handler for the 'message' event, MUC messages
# will be processed by both handlers.
self . add_event_handler ( " groupchat_message " , self . muc_message )
# If you wanted more functionality, here's how to register plugins:
# self.register_plugin('xep_0030') # Service Discovery
# self.register_plugin('xep_0199') # XMPP Ping
# Here's how to access plugins once you've registered them:
# self['xep_0030'].add_feature('echo_demo')
async def session_start ( self , event ) :
await self . get_roster ( )
self . send_presence ( )
self . plugin [ ' xep_0045 ' ] . join_muc ( self . room , self . nick )
# Most get_*/set_* methods from plugins use Iq stanzas, which
# are sent asynchronously. You can almost always provide a
# callback that will be executed when the reply is received.
def message ( self , msg ) :
if msg [ ' type ' ] in ( ' chat ' , ' normal ' ) :
msg . reply ( " I was not designed to take direct messages, please use me in a MUC " ) . send ( )
def muc_message ( self , msg ) :
"""
Process incoming message stanzas from any chat room . Be aware
that if you also have any handlers for the ' message ' event ,
message stanzas may be processed by both handlers , so check
the ' type ' attribute when using a ' message ' event handler .
Whenever the bot ' s nickname is mentioned, respond to
the message .
IMPORTANT : Always check that a message is not from yourself ,
otherwise you will create an infinite loop responding
to your own messages .
This handler will reply to messages that mention
the bot ' s nickname.
Arguments :
msg - - The received message stanza . See the documentation
for stanza objects and the Message stanza to see
how it may be used .
"""
#if msg['mucnick'] != self.nick and self.nick in msg['body']:
if msg [ ' mucnick ' ] != self . nick and msg [ ' body ' ] . startswith ( self . nick ) :
message = " "
tokens = msg [ ' body ' ] . split ( )
match tokens [ 1 ] :
case " info " :
#message += "URL: " + self.api_caller.url + "\n"
message + = " QBittorrent Version: " + self . api_caller . version ( ) + " \n "
message + = " Web API Version: " + self . api_caller . webapiVersion ( ) + " \n "
message + = self . api_caller . buildInfo ( )
# don't know why, but this behavior is reversed. Too lazy to figure it out
case " fulllist " :
message + = self . api_caller . torrentList ( " " )
case " list " :
message + = self . api_caller . torrentList ( " full " )
case " add " :
message + = self . api_caller . add ( tokens [ 2 ] , msg [ ' mucnick ' ] )
case " help " | _ :
message + = " Commands \n "
message + = " info: Displays information about QBittorrent server " + " \n "
message + = " help: Displays this help " + " \n "
message + = " list: Lists torrents, downloading torrents ' names truncated to 25 characters \n "
message + = " fulllist: Lists torrents, no name truncation \n "
message + = " add [MAGNET_URL]: Adds torrent corresponding to MAGNET_URL to the download list. Note that this will take about 5 seconds, as there ' s a check for 0 seeds after 5 seconds as a warning "
self . send_message ( mto = msg [ ' from ' ] . bare ,
mbody = message ,
mtype = ' groupchat ' )
if __name__ == ' __main__ ' :
# Ideally use optparse or argparse to get JID,
# password, and log level.
logging . basicConfig ( level = logging . ERROR ,
format = ' %(levelname)-8s %(message)s ' )
config = configparser . ConfigParser ( )
if not os . path . exists ( " ./config.conf " ) :
logging . error ( " Could not find ./config.conf, please ensure it exists " )
exit
config . read ( ' config.conf ' )
config_default = config [ ' DEFAULT ' ]
xmpp = QBBot ( config_default [ ' qb_jid ' ] . strip ( ' \" ' ) ,
config_default [ ' qb_pass ' ] . strip ( ' \" ' ) ,
config_default [ ' qb_muc ' ] . strip ( ' \" ' ) ,
config_default [ ' qb_nick ' ] . strip ( ' \" ' ) ,
config_default [ ' api_url ' ] . strip ( ' \" ' ) ,
config_default [ ' api_user ' ] . strip ( ' \" ' ) ,
config_default [ ' api_pass ' ] . strip ( ' \" ' ) )
xmpp . register_plugin ( ' xep_0030 ' ) # Service Discovery
xmpp . register_plugin ( ' xep_0045 ' ) # Multi-User Chat
xmpp . register_plugin ( ' xep_0199 ' ) # XMPP Ping
xmpp . connect ( )
xmpp . process ( )