@ -1,43 +1,56 @@
import sys
import os
import socket
import string
import select
import re
import pickle
from pytox import Tox , ToxAV
import logging
import readline
import ctypes
from time import sleep
from os . path import exists
from threading import Thread
from random import shuffle
SERVER = [ ' 54.199.139.199 ' , 33445 , ' 7F9C31FE850E97CEFD4C4591DF93FC757C7C12549DDD55F8EEAECC34FE76C029 ' ]
GROUP_BOT = ' 56A1ADE4B65B86BCD51CC73E2CD4E542179F47959FE3E0E21B4B0ACDADE51855D34D34D37CB5 '
PWD = ' '
import wrapper
from wrapper . tox import Tox
from wrapper . toxav import ToxAV
import wrapper . toxcore_enums_and_consts as enums
from wrapper . toxcore_enums_and_consts import \
TOX_CONNECTION , TOX_USER_STATUS , TOX_MESSAGE_TYPE , \
TOX_SECRET_KEY_SIZE , TOX_FILE_CONTROL , TOX_ADDRESS_SIZE , \
TOX_GROUP_PRIVACY_STATE , TOX_GROUP_ROLE
try :
import support_testing as ts
except ImportError :
import wrapper_tests . support_testing as ts
IRC_HOST = ' irc.freenode.net '
IRC_PORT = 6667
NAME = NICK = IDENT = REALNAME = ' SyncBot '
global LOG
LOG = logging . getLogger ( ' app. ' + ' ts ' )
CHANNEL = ' #tox-ontopic '
MEMORY_DB = ' memory.pickle '
PWD = ' '
NAME = NICK = IDENT = REALNAME = ' SyniTox '
class AV ( ToxAV ) :
def __init__ ( self , core , max_calls ) :
self . core = self . get_tox ( )
def __init__ ( self , core ):
self . core = core
self . cs = None
self . call_type = self . TypeAudio
self . call_type = None
def on_invite ( self , idx ) :
self . cs = self . get_peer_csettings ( idx , 0 )
self . call_type = self . cs [ ' call_type ' ]
print ( ' Incoming %s call from %d : %s ... ' % (
LOG . info ( ' Incoming %s call from %d : %s ... ' % (
' video ' if self . call_type == self . TypeVideo else ' audio ' , idx ,
self . core . get_name ( self . get_peer_id ( idx , 0 ) ) ) )
self . answer ( idx , self . call_type )
print ( ' Answered, in call... ' )
LOG . info ( ' Answered, in call... ' )
def on_start ( self , idx ) :
self . change_settings ( idx , { ' max_video_width ' : 1920 ,
@ -48,7 +61,7 @@ class AV(ToxAV):
def on_end ( self , idx ) :
self . kill_transmission ( )
print ( ' Call ended ' )
LOG . info ( ' Call ended ' )
def on_peer_timeout ( self , idx ) :
self . stop_call ( )
@ -63,40 +76,195 @@ class AV(ToxAV):
sys . stdout . flush ( )
self . send_video ( idx , width , height , data )
bot_toxname = ' SyncBot '
class SyncBot ( Tox ) :
def __init__ ( self ) :
if exists ( ' data ' ) :
self . load_from_file ( ' data ' )
self . av = AV ( self , 10 )
self . connect ( )
self . set_name ( bot_toxname )
self . set_status_message ( " Send me a message with the word ' invite ' " )
print ( ' ID: %s ' % self . get_address ( ) )
self . readbuffer = ' '
bot_toxname = ' SyniTox '
class SyniTox ( Tox ) :
def __init__ ( self , opts ,
sChannel = ' #tor ' ,
sIRC_HOST = ' irc.oftc.net ' ,
iIRC_PORT = 6667 ,
GROUP_BOT_PK = ' ' ,
sMEMORY_DB = ' '
) :
Tox . __init__ ( self , tox_options = opts )
self . _address = self . self_get_address ( )
self . _opts = opts
self . _app = None
self . _settings = { }
self . _sChannel = sChannel
self . sIRC_HOST = sIRC_HOST
self . iIRC_PORT = iIRC_PORT
self . sGROUP_BOT_PK = GROUP_BOT_PK
self . sMEMORY_DB = sMEMORY_DB
global oTOX_OARGS
self . _oArgs = oTOX_OARGS
data = self . _oArgs . profile
if data and os . path . exists ( data ) :
self . load_from_file ( data )
self . av = self . AV
self . irc = None
self . bid = - 1
self . _bRouted = None
def start ( self ) :
self . self_set_name ( bot_toxname )
self . self_set_status_message ( " Send me a message with the word ' invite ' " )
LOG . info ( ' Our ToxID: %s ' % self . self_get_toxid ( ) )
self . readbuffer = b ' '
self . tox_group_id = None
self . irc_init ( )
self . group_init ( )
self . memory = { }
if exists ( MEMORY_DB ) :
with open ( MEMORY_DB , ' r ' ) as f :
if os . path . exists ( self . sMEMORY_DB ) :
with open ( self . sMEMORY_DB , ' r ' ) as f :
self . memory = pickle . load ( f )
self . irc_init ( )
b = self . test_net ( )
if b :
self . dht_init ( )
def bRouted ( self ) :
if self . _oArgs . network not in [ ' local ' , ' localnew ' , ' newlocal ' ] :
b = ts . bAreWeConnected ( )
if b is None :
i = os . system ( ' ip route|grep ^def ' )
if i > 0 :
b = False
else :
b = True
if not b :
LOG . warn ( " No default route for network " + self . _oArgs . network )
return False
return b
return True
def test_net ( self , lElts = None , oThread = None , iMax = 4 ) :
# bootstrap
lNodes = ts . generate_nodes ( oArgs = self . _oArgs ,
ipv = ' ipv4 ' ,
udp_not_tcp = True )
self . _settings [ ' current_nodes_udp ' ] = lNodes . copy ( )
if not lNodes :
LOG . warn ( ' empty generate_nodes udp ' )
else :
LOG . debug ( f ' Called generate_nodes: udp { len ( lNodes ) } ' )
lNodes = ts . generate_nodes ( oArgs = self . _oArgs ,
ipv = ' ipv4 ' ,
udp_not_tcp = False )
self . _settings [ ' current_nodes_tcp ' ] = lNodes
if not lNodes :
LOG . warn ( ' empty generate_nodes tcp ' )
else :
LOG . debug ( f ' Called generate_nodes: tcp { len ( lNodes ) } ' )
# if oThread and oThread._stop_thread: return
LOG . debug ( " test_net network= " + self . _oArgs . network + ' iMax= ' + str ( iMax ) )
return True
def group_init ( self ) :
LOG . debug ( f " group_init proxy= { self . _oArgs . proxy_type } " )
group_name = bot_toxname + ' Test ' + self . _sChannel
if not self . sGROUP_BOT_PK :
privacy_state = TOX_GROUP_PRIVACY_STATE [ ' PUBLIC ' ]
nick = bot_toxname + self . _sChannel
status = TOX_USER_STATUS [ ' NONE ' ]
num = self . group_new ( privacy_state , group_name , nick , status )
assert num > = 0 , num
pk = self . group_self_get_public_key ( num )
assert pk , pk
self . sGROUP_BOT_PK = pk
self . sGROUP_NUM = num
self . group_set_topic ( num , bot_toxname + " IRC " )
LOG . info ( f " group_init GROUP_BOT_PK= { self . sGROUP_BOT_PK } " )
#? self.tox_group_id = self.bid
self . group_send_message ( num , TOX_MESSAGE_TYPE [ ' NORMAL ' ] , " hi " )
# TOX_GROUP_ROLE['FOUNDER']
self . init_callbacks ( )
def init_callbacks ( self ) :
def gi_wrapped ( iTox , friendid , invite_data , invite_len , * args ) :
invite_data = str ( invite_data , ' UTF-8 ' )
self . on_group_invite ( friendid , invite_data )
self . callback_group_invite ( gi_wrapped , 0 )
def scs_wrapped ( iTox , friendid , * args ) :
self . on_connection_status ( self , scs_wrapped )
self . callback_self_connection_status ( scs_wrapped )
def gm_wrapped ( iTox , groupnumber , peer_id , type_ , message , mlen , * args ) :
message = str ( message , ' UTF-8 ' )
self . on_group_message ( groupnumber , peer_id , message )
self . callback_group_message ( gm_wrapped , 0 )
def ga_wrapped ( iTox , groupnumber , peer_id , type_ , action , mlen , * args ) :
self . on_group_action ( groupnumber , peer_id , action )
#? self.callback_group_action(ga_wrapped, 0)
def fr_wrapped ( iTox , pk , message , mlen , * args ) :
message = str ( message , ' UTF-8 ' )
self . on_friend_request ( self , pk , message )
self . callback_friend_request ( fr_wrapped )
def fm_wrapped ( iTox , peer_id , message , mlen , * args ) :
message = str ( message , ' UTF-8 ' )
self . on_friend_request ( self , peer_id , message )
self . callback_friend_request ( fm_wrapped )
def del_callbacks ( self ) :
self . callback_group_invite ( None , 0 )
self . callback_self_connection_status ( None )
self . callback_group_message ( None , 0 )
# self.callback_group_action(None, 0)
self . callback_friend_request ( None )
self . callback_friend_request ( None )
def irc_init ( self ) :
self . irc = socket . socket ( )
self . irc . connect ( ( IRC_HOST , IRC_PORT ) )
self . irc . send ( ' NICK %s \r \n ' % NICK )
self . irc . send ( ' USER %s %s bla : %s \r \n ' % ( IDENT , IRC_HOST , REALNAME ) )
def connect ( self ) :
print ( ' connecting... ' )
self . bootstrap_from_address ( SERVER [ 0 ] , SERVER [ 1 ] , SERVER [ 2 ] )
def ensure_exe ( self , func , args ) :
if not self . bRouted ( ) : return
LOG . info ( f " irc_init proxy= { self . _oArgs . proxy_type } " )
if self . _oArgs . proxy_type == 2 :
from wrapper_tests import socks
socks . setdefaultproxy ( socks . PROXY_TYPE_SOCKS5 ,
self . _oArgs . proxy_host ,
self . _oArgs . proxy_port )
irc = socks . socksocket ( )
else :
irc = socket . socket ( )
try :
irc . connect ( ( self . sIRC_HOST , self . iIRC_PORT ) )
irc . send ( bytes ( ' NICK ' + NICK + ' \r \n ' , ' UTF-8 ' ) )
irc . send ( bytes ( ' USER %s %s bla : %s \r \n ' % ( IDENT , self . sIRC_HOST , REALNAME ) ,
' UTF-8 ' ) )
except Exception as e :
LOG . warn ( f ' IRC error { e } ' )
else :
LOG . info ( ' IRC connected ' + ' NICK = ' + NICK )
self . irc = irc
def dht_init ( self ) :
if not self . bRouted ( ) : return
if ' current_nodes_udp ' not in self . _settings :
self . test_net ( )
lNodes = self . _settings [ ' current_nodes_udp ' ]
shuffle ( lNodes )
if self . _oArgs . proxy_type == 0 :
ts . bootstrap_good ( lNodes [ : 4 ] , [ self ] )
else :
if self . _bRouted == None :
LOG . info ( f ' DHT bootstapping 1 ' )
ts . bootstrap_good ( [ lNodes [ 0 ] ] , [ self ] )
if ' current_nodes_tcp ' not in self . _settings :
self . test_net ( )
lNodes = self . _settings [ ' current_nodes_tcp ' ]
shuffle ( lNodes )
ts . bootstrap_tcp ( lNodes [ : 4 ] , [ self ] )
def ensure_exe ( self , func , * args ) :
count = 0
THRESHOLD = 50
@ -106,42 +274,115 @@ class SyncBot(Tox):
except :
assert count < THRESHOLD
count + = 1
for i in range ( 10 ) :
self . do ( )
sleep ( 0.02 )
self . do ( )
def loop ( self ) :
def do ( self , n = 50 ) :
interval = self . iteration_interval ( )
for i in range ( n ) :
self . iterate ( )
sleep ( interval / 1000.0 * 10 )
def unroute ( self ) :
if self . irc :
try : irc . close ( )
except : pass
self . irc = None
def irc_check ( self , lines ) :
if b ' NOTICE AUTH ' in lines [ 0 ] :
for line in lines [ : 99 ] :
if b ' NOTICE AUTH ' not in line : return
line = str ( line , ' UTF-8 ' ) . strip ( )
print ( line )
else :
for line in lines [ : 5 ] :
line = str ( line , ' UTF-8 ' ) . strip ( ) . lower ( )
if ' banned ' in line :
raise RuntimeError ( line )
if ' error ' in line and ' closing ' in line :
raise RuntimeError ( line )
def iLoop ( self ) :
checked = False
self . joined = False
self . request = False
count = 0
try :
count = count + 1
while True :
status = self . isconnected ( )
if not checked and status :
print ( ' Connected to DHT. ' )
b = self . bRouted ( )
if not b :
self . unroute ( )
checked = False
if self . _bRouted is None or self . _bRouted != b :
self . _bRouted = b
if count % 6 == 1 :
LOG . info ( f ' Not routed { count } ' )
sleep ( 10 )
continue
else :
if self . _bRouted is None :
self . _bRouted = True
self . irc_send ( ' . ' )
if self . _bRouted is None or self . _bRouted != b :
self . _bRouted = b
LOG . debug ( f ' Routed { count } ' )
status = self . self_get_connection_status ( )
if not status :
if count % 6 == 1 :
LOG . info ( f ' Not connected { count } ' )
self . dht_init ( )
if b and not checked and status :
LOG . info ( ' Connected to DHT. ' )
checked = True
try :
self . bid = self . get_friend_id ( GROUP_BOT )
except :
self . ensure_exe ( self . add_friend , ( GROUP_BOT , ' Hi ' ) )
self . bid = self . get_friend_id ( GROUP_BOT )
if checked and not status :
print ( ' Disconnected from DHT. ' )
self . connect ( )
self . bid = self . friend_by_public_key ( self . sGROUP_BOT_PK )
LOG . info ( f ' Connected to group { self . bid } ' )
except ctypes . ArgumentError as e :
self . bid = None
if self . bid == None :
self . ensure_exe ( self . friend_add_norequest , self . sGROUP_BOT_PK )
LOG . info ( f ' friend_add_n to group { self . sGROUP_BOT_PK [ : 8 ] } ' )
self . bid = self . friend_by_public_key ( self . sGROUP_BOT_PK )
LOG . info ( f ' Added to group { self . bid } ' )
num = self . sGROUP_NUM
my_pk = self . group_self_get_public_key ( num )
LOG . info ( f ' Connected to group as { my_pk [ : 8 ] } ' )
if b and checked and not status :
LOG . info ( ' Disconnected from DHT. ' )
self . dht_init ( )
checked = False
readable , _ , _ = select . select ( [ self . irc ] , [ ] , [ ] , 0.01 )
if readable :
if not self . irc :
LOG . info ( ' Disconnected from IRC. ' )
self . irc_init ( )
if not self . irc :
sleep ( 10 )
continue
LOG . info ( ' Waiting on IRC. ' )
readable , _ , _ = select . select ( [ self . irc ] , [ ] , [ ] , 0.1 )
if not readable :
LOG . info ( ' Waited on IRC but nothing to read. ' )
else :
self . readbuffer + = self . irc . recv ( 4096 )
lines = self . readbuffer . split ( ' \n ' )
lines = self . readbuffer . split ( b ' \n ' )
self . irc_check ( lines )
LOG . info ( f ' Waited on IRC and got { len ( lines ) } lines. ' )
self . readbuffer = lines . pop ( )
for line in lines :
line = str ( line , ' UTF-8 ' )
i = line . find ( ' ' )
print ( line [ i + 1 : ] )
l = line . rstrip ( ) . split ( )
rx = re . match ( r ' :(.*?)!.*? PRIVMSG %s :(.*?) \ r ' %
CHANNEL , line , re . S )
self . _sChannel , line , re . S )
if rx :
print ( ' IRC> %s : %s ' % rx . groups ( ) )
msg = ' [ %s ]: %s ' % rx . groups ( )
@ -151,31 +392,55 @@ class SyncBot(Tox):
action = ' [ %s ]: %s ' % ( rx . group ( 1 ) ,
rx . group ( 2 ) [ 8 : - 1 ] )
self . ensure_exe ( self . group_action_send ,
( self . tox_group_id , action ) )
self . tox_group_id , action )
elif self . tox_group_id != None :
self . ensure_exe ( self . group_message_send ,
( self . tox_group_id , msg ) )
self . tox_group_id , msg )
if content . startswith ( ' ^ ' ) :
self . handle_command ( content )
l = line . rstrip ( ) . split ( )
if l [ 0 ] == ' PING ' :
elif l [ 0 ] == ' PING ' :
self . irc_send ( ' PONG %s \r \n ' % l [ 1 ] )
if l [ 1 ] == ' 376 ' :
self . irc . send ( ' PRIVMSG NickServ :IDENTIFY %s %s \r \n '
% ( NICK , PWD ) )
self . irc . send ( ' JOIN %s \r \n ' % CHANNEL )
elif l [ 1 ] == ' 376 ' :
# :End of /MOTD command
self . irc . send ( bytes ( ' PRIVMSG NickServ :IDENTIFY %s %s \r \n '
% ( NICK , PWD , ) , ' UTF-8 ' ) )
self . irc . send ( bytes ( ' JOIN %s \r \n ' % self . _sChannel , ' UTF-8 ' ) )
elif l [ 1 ] == ' 421 ' :
# 421 SyniTox .PRIVMSG :Unknown command
pass
elif l [ 1 ] == ' 477 ' :
#477 SyniTox #tor :Cannot join channel (Need to be identified and verified to join this channel, '/msg NickServ help' to learn how to register and verify.)
self . irc . send ( bytes ( ' HELP \r \n ' , ' UTF-8 ' ) )
self . irc . send ( bytes ( ' MSG NickServ help \r \n ' , ' UTF-8 ' ) )
pass
self . do ( )
except KeyboardInterrupt :
self . save_to_file ( ' data ' )
ret = 0
except Exception as e :
LOG . exception ( f ' Error running program: \n { e } ' , exc_info = True )
ret = 1
else :
ret = 0
self . quit ( )
return ret
def quit ( self ) :
self . del_callbacks ( )
self . save_to_file ( )
def save_to_file ( self ) :
pass
def irc_send ( self , msg ) :
success = False
while not success :
try :
self . irc . send ( msg )
self . irc . send ( bytes ( msg , ' UTF-8 ' ) )
success = True
break
except socket . error :
@ -185,56 +450,57 @@ class SyncBot(Tox):
def on_connection_status ( self , friendId , status ) :
if not self . request and not self . joined \
and friendId == self . bid and status :
print ( ' Groupbot online, trying to join group chat. ' )
LOG . info ( ' Groupbot online, trying to join group chat. ' )
self . request = True
self . ensure_exe ( self . send_message , ( self . bid , ' invite ' ) )
self . ensure_exe ( self . send_message , self . bid , ' invite ' )
def on_group_invite ( self , friendid , type , data) :
def on_group_invite ( self , friendid , invite_data , user_ data) :
if not self . joined :
self . joined = True
self . tox_group_id = self . join_groupchat ( friendid , data )
print ( ' Joined groupchat. ' )
LOG . info ( ' Joined groupchat. ' )
def on_group_message ( self , groupnumber , friendgroupnumber , message ) :
name = self . group_peername ( groupnumber , friendgroupnumber )
def on_group_message ( self , groupnumber , peer_id , message ) :
name = self . group_peername ( groupnumber , peer_id )
if len ( name ) and name != NAME :
print ( ' TOX> %s : %s ' % ( name , message ) )
if message . startswith ( ' > ' ) :
message = ' \x03 09 %s \x03 ' % message
self . irc_send ( ' PRIVMSG %s :[ %s ]: %s \r \n ' %
( CHANNEL , name , message ) )
self . irc_send ( b ' PRIVMSG %s :[ %s ]: %s \r \n ' %
( self . _sChannel , name , message ) )
if message . startswith ( ' ^ ' ) :
self . handle_command ( message )
def on_group_action ( self , groupnumber , friendgroupnumber , action ) :
name = self . group_peername ( groupnumber , friendgroupnumber )
def on_group_action ( self , groupnumber , peer_id , action ) :
""" old? message type action? """
name = self . group_peername ( groupnumber , peer_id )
if len ( name ) and name != NAME :
print ( ' TOX> %s : %s ' % ( name , action ) )
if action . startswith ( ' > ' ) :
action = ' \x03 09 %s \x03 ' % action
self . irc_send ( ' PRIVMSG %s : \x01 ACTION [ %s ]: %s \x01 \r \n ' %
( CHANNEL , name , action ) )
( self . _sChannel , name , action ) )
def on_friend_request ( self , pk , message ) :
print ( ' Friend request from %s : %s ' % ( pk , message ) )
LOG . info ( ' Friend request from %s : %s ' % ( pk , message ) )
self . add_friend_norequest ( pk )
print ( ' Accepted. ' )
LOG . info ( ' Accepted. ' )
def on_friend_message ( self , friendid , message ) :
if message == ' invite ' :
if not self . tox_group_id is None :
print ( ' Inviting %s ' % self . get_name ( friendid ) )
LOG . info ( ' Inviting %s ' % self . get_name ( friendid ) )
self . invite_friend ( friendid , self . tox_group_id )
return
else :
message = ' Waiting for GroupBot, please try again in 1 min. '
self . ensure_exe ( self . send_message , ( friendid , message ) )
self . ensure_exe ( self . send_message , friendid , message )
def send_both ( self , content ) :
self . ensure_exe ( self . group_message_send , ( self . tox_group_id , content ) )
self . irc_send ( ' PRIVMSG %s : %s \r \n ' % ( CHANNEL , content ) )
self . ensure_exe ( self . group_message_send , self . tox_group_id , content )
self . irc_send ( ' PRIVMSG %s : %s \r \n ' % ( self . _sChannel , content ) )
def handle_command ( self , cmd ) :
cmd = cmd [ 1 : ]
@ -247,12 +513,111 @@ class SyncBot(Tox):
subject = args [ 0 ]
desc = ' ' . join ( args [ 1 : ] )
self . memory [ subject ] = desc
with open ( MEMORY_DB , ' w ' ) as f :
pickle . dump ( self . memory , f )
if self . sMEMORY_DB :
with open ( self . sMEMORY_DB , ' w ' ) as f :
pickle . dump ( self . memory , f )
self . send_both ( ' Remembering ^ %s : %s ' % ( subject , desc ) )
elif self . memory . has_key ( cmd ) :
self . send_both ( self . memory [ cmd ] )
t = SyncBot ( )
t . loop ( )
def iMain ( oArgs ) :
assert oTOX_OPTIONS
assert oTOX_OARGS
sChannel = oArgs . irc_chan
sIRC_HOST = oArgs . irc_host
iIRC_PORT = oArgs . irc_port
o = SyniTox ( oTOX_OPTIONS , sChannel , sIRC_HOST , iIRC_PORT )
o . start ( )
ret = o . iLoop ( )
return ret
def oToxygenToxOptions ( oArgs ) :
data = None
tox_options = wrapper . tox . Tox . options_new ( )
if oArgs . proxy_type :
tox_options . contents . proxy_type = int ( oArgs . proxy_type )
tox_options . contents . proxy_host = bytes ( oArgs . proxy_host , ' UTF-8 ' )
tox_options . contents . proxy_port = int ( oArgs . proxy_port )
tox_options . contents . udp_enabled = False
else :
tox_options . contents . udp_enabled = oArgs . udp_enabled
if not os . path . exists ( ' /proc/sys/net/ipv6 ' ) :
oArgs . ipv6_enabled = False
tox_options . contents . tcp_port = int ( oArgs . tcp_port )
# overrides
tox_options . contents . local_discovery_enabled = False
tox_options . contents . dht_announcements_enabled = True
tox_options . contents . hole_punching_enabled = False
tox_options . contents . experimental_thread_safety = False
# REQUIRED!!
if oArgs . ipv6_enabled and not os . path . exists ( ' /proc/sys/net/ipv6 ' ) :
LOG . warn ( ' Disabling IPV6 because /proc/sys/net/ipv6 does not exist ' + repr ( oArgs . ipv6_enabled ) )
tox_options . contents . ipv6_enabled = False
else :
tox_options . contents . ipv6_enabled = bool ( oArgs . ipv6_enabled )
if data : # load existing profile
tox_options . contents . savedata_type = enums . TOX_SAVEDATA_TYPE [ ' TOX_SAVE ' ]
tox_options . contents . savedata_data = c_char_p ( data )
tox_options . contents . savedata_length = len ( data )
else : # create new profile
tox_options . contents . savedata_type = enums . TOX_SAVEDATA_TYPE [ ' NONE ' ]
tox_options . contents . savedata_data = None
tox_options . contents . savedata_length = 0
#? tox_options.contents.log_callback = LOG
if tox_options . _options_pointer :
# LOG.debug("Adding logging to tox_options._options_pointer ")
ts . vAddLoggerCallback ( tox_options , ts . on_log )
else :
LOG . warn ( " No tox_options._options_pointer " + repr ( tox_options . _options_pointer ) )
return tox_options
def oArgparse ( lArgv ) :
parser = ts . oMainArgparser ( )
parser . add_argument ( ' profile ' , type = str , nargs = ' ? ' , default = None ,
help = ' Path to Tox profile ' )
# irc.libera.net #tox will not work over Tor
parser . add_argument ( ' --irc_host ' , type = str , default = ' irc.oftc.net ' )
parser . add_argument ( ' --irc_port ' , type = int , default = 6667 )
parser . add_argument ( ' --irc_chan ' , type = str , default = ' #tor ' )
oArgs = parser . parse_args ( lArgv )
for key in ts . lBOOLEANS :
if key not in oArgs : continue
val = getattr ( oArgs , key )
setattr ( oArgs , key , bool ( val ) )
if hasattr ( oArgs , ' sleep ' ) :
if oArgs . sleep == ' qt ' :
pass # broken or gevent.sleep(idle_period)
elif oArgs . sleep == ' gevent ' :
pass # broken or gevent.sleep(idle_period)
else :
oArgs . sleep = ' time '
return oArgs
def main ( lArgs = None ) :
global oTOX_OARGS
if lArgs is None : lArgs = [ ]
oArgs = oArgparse ( lArgs )
oTOX_OARGS = oArgs
global oTOX_OPTIONS
oTOX_OPTIONS = oToxygenToxOptions ( oArgs )
ts . vSetupLogging ( oArgs )
# ts.setup_logging(oArgs)
return iMain ( oArgs )
if __name__ == ' __main__ ' :
sys . exit ( main ( sys . argv [ 1 : ] ) )
# Ran 34 tests in 86.589s OK (skipped=12)