This commit is contained in:
emdee 2022-11-17 12:08:38 +00:00
parent f30a1038ad
commit f83d1c97b9
2 changed files with 158 additions and 152 deletions

View file

@ -1,48 +1,51 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
import sys
import os
import socket
import select
import re
import pickle
import logging
import ctypes
import logging
import os
import pickle
import re
import select
import socket
import sys
import traceback
from time import sleep
from threading import Thread
from random import shuffle
from errno import errorcode
from random import shuffle
from time import sleep
from OpenSSL import SSL
import warnings
warnings.filterwarnings('ignore')
import wrapper
import wrapper.toxcore_enums_and_consts as enums
import wrapper_tests
from wrapper.tox import Tox
from wrapper.toxav import ToxAV
import wrapper.toxcore_enums_and_consts as enums
from wrapper.toxcore_enums_and_consts import \
TOX_CONNECTION, TOX_USER_STATUS, TOX_MESSAGE_TYPE, \
TOX_SECRET_KEY_SIZE, TOX_FILE_CONTROL, TOX_ADDRESS_SIZE, \
TOX_GROUP_PRIVACY_STATE, TOX_GROUP_ROLE
from wrapper.toxcore_enums_and_consts import (TOX_ADDRESS_SIZE, TOX_CONNECTION,
TOX_FILE_CONTROL,
TOX_GROUP_PRIVACY_STATE,
TOX_GROUP_ROLE, TOX_MESSAGE_TYPE,
TOX_SECRET_KEY_SIZE,
TOX_USER_STATUS)
from wrapper_tests import socks
try:
import support_testing as ts
except ImportError:
import wrapper_tests.support_testing as ts
import wrapper.toxencryptsave as tox_encrypt_save
import wrapper.toxencryptsave as tox_encrypt_save
global LOG
LOG = logging.getLogger('app.'+'ts')
import warnings
warnings.filterwarnings('ignore')
class SyniToxError(BaseException): pass
NAME = 'SyniTox'
sMSG = 'MSG'
sMSG = 'PRIVMSG'
SSL_TOR_RANGE = '172.'
# possible CA locations picks the first one
lCAs = [# debian and gentoo
@ -138,10 +141,10 @@ class SyniTox(Tox):
opts = oTOX_OPTIONS
self._opts = opts
self._oArgs = oArgs
self._oargs = oArgs
# self._oArgs.profile
self.load_profile(self._opts, self._oArgs, self._oArgs.password)
# self._oargs.profile
self.load_profile(self._opts, self._oargs, self._oargs.password)
Tox.__init__(self, tox_options=self._opts)
self._address = self.self_get_address()
@ -170,7 +173,6 @@ class SyniTox(Tox):
def load_profile(self, tox_options, oArgs, password=''):
if oArgs.profile and os.path.exists(oArgs.profile):
data = open(oArgs.profile, 'rb').read()
else:
data = None
if data and self.has_password():
@ -191,20 +193,20 @@ class SyniTox(Tox):
data = self.pass_encrypt(data)
try:
suf = f"{os.getpid()}"
with open(self._oArgs.profile+suf, 'wb') as fl:
with open(self._oargs.profile+suf, 'wb') as fl:
fl.write(data)
stat = os.stat(self._oArgs.profile+suf)
stat = os.stat(self._oargs.profile+suf)
if hasattr(stat, 'st_blocks'):
assert stat.st_blocks > 0, f"Zero length file {self._oArgs.profile+suf}"
os.rename(self._oArgs.profile+suf, self._oArgs.profile)
LOG.info('Profile saved successfully to' +self._oArgs.profile)
assert stat.st_blocks > 0, f"Zero length file {self._oargs.profile+suf}"
os.rename(self._oargs.profile+suf, self._oargs.profile)
LOG.info('Profile saved successfully to' +self._oargs.profile)
except Exception as e:
LOG.warn(f"Profile save failed to {self._oArgs.profile}\n{e}")
LOG.warn(f"Profile save failed to {self._oargs.profile}\n{e}")
def start(self):
self._tox = self
self._toxes = tox_encrypt_save.ToxEncryptSave()
self.self_set_name(self._oArgs.bot_name)
self.self_set_name(self._oargs.bot_name)
self.self_set_status_message("Send me a message with the word 'invite'")
LOG.info('Our ToxID: %s' % self.self_get_toxid())
@ -220,17 +222,17 @@ class SyniTox(Tox):
try:
OP_NO_TLSv1_3 = SSL._lib.SSL_OP_NO_TLSv1_3
except AttributeError:
if self._oArgs.irc_ssl == 'tlsv1.3':
if self._oargs.irc_ssl == 'tlsv1.3':
LOG.warning("SSL._lib.SSL_OP_NO_TLSv1_3 is not supported")
LOG.warning("Downgrading SSL to tlsv1.2 ")
self._oArgs.irc_ssl = 'tlsv1.2'
self._oargs.irc_ssl = 'tlsv1.2'
else:
LOG.debug("SSL._lib.SSL_OP_NO_TLSv1_3 is not supported")
else:
LOG.debug("SSL._lib.SSL_OP_NO_TLSv1_3 is supported")
if self._oArgs.irc_connect.endswith('.onion') or \
self._oArgs.irc_connect.startswith(SSL_TOR_RANGE):
if self._oargs.irc_connect.endswith('.onion') or \
self._oargs.irc_connect.startswith(SSL_TOR_RANGE):
override = True
else:
override = False
@ -239,17 +241,17 @@ class SyniTox(Tox):
# SSL.OP_NO_TLSv1_1 is allowed
context.set_options(SSL.OP_NO_SSLv2|SSL.OP_NO_SSLv3|SSL.OP_NO_TLSv1)
if self._oArgs.irc_crt and self._oArgs.irc_key:
assert os.path.exists(key), key
if self._oargs.irc_crt and self._oargs.irc_key:
val = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
LOG.info('Using keyfile: %s' % key)
if True: # required!
key = self._oArgs.irc_crt
key = self._oargs.irc_crt
assert os.path.exists(key), key
LOG.info('Using keyfile: %s' % key)
context.use_certificate_file(key, filetype=SSL.FILETYPE_PEM)
if True: # required!
key = self._oArgs.irc_key
key = self._oargs.irc_key
assert os.path.exists(key), key
LOG.info('Using keyfile: %s' % key)
context.use_privatekey_file(key, filetype=SSL.FILETYPE_PEM)
#? load_client_ca
def SSL_hands_cb(oConn,iLine,iRet):
@ -264,17 +266,17 @@ class SyniTox(Tox):
val = SSL.VERIFY_PEER
context.set_verify(val, ssl_verify_cb(HOST, override))
if self._oArgs.irc_cafile:
# context.load_verify_locations(capath=self._oArgs.irc_ca)
context.load_verify_locations(self._oArgs.irc_cafile, capath=self._oArgs.irc_cadir)
elif self._oArgs.irc_cadir:
context.load_verify_locations(None, capath=self._oArgs.irc_cadir)
if self._oArgs.irc_ssl == 'tlsv1.1':
if self._oargs.irc_cafile:
# context.load_verify_locations(capath=self._oargs.irc_ca)
context.load_verify_locations(self._oargs.irc_cafile, capath=self._oargs.irc_cadir)
elif self._oargs.irc_cadir:
context.load_verify_locations(None, capath=self._oargs.irc_cadir)
if self._oargs.irc_ssl == 'tlsv1.1':
context.set_min_proto_version(SSL.TLS1_1_VERSION)
elif self._oArgs.irc_ssl == 'tlsv1.2':
elif self._oargs.irc_ssl == 'tlsv1.2':
context.set_cipher_list(bytes(':'.join(['DEFAULT@SECLEVEL=1']+lOPENSSL_12_CIPHERS), 'UTF-8'))
context.set_min_proto_version(SSL.TLS1_2_VERSION)
elif self._oArgs.irc_ssl == 'tlsv1.3':
elif self._oargs.irc_ssl == 'tlsv1.3':
context.set_cipher_list(bytes(':'.join(['DEFAULT@SECLEVEL=1']+lOPENSSL_13_CIPHERS), 'UTF-8'))
context.set_min_proto_version(SSL.TLS1_3_VERSION)
self._ssl_context = context
@ -282,7 +284,7 @@ class SyniTox(Tox):
return self._ssl_context
def bRouted(self):
if self._oArgs.network in ['local']:
if self._oargs.network in ['local']:
return True
b = ts.bAreWeConnected()
if b is None:
@ -295,9 +297,9 @@ class SyniTox(Tox):
return b
def test_net(self, lElts=None, oThread=None, iMax=4):
LOG.debug("test_net network=" +self._oArgs.network )
LOG.debug("test_net network=" +self._oargs.network )
# bootstrap
lNodes = ts.generate_nodes(oArgs=self._oArgs,
lNodes = ts.generate_nodes(oArgs=self._oargs,
ipv='ipv4',
udp_not_tcp=True)
self._settings['current_nodes_udp'] = ts.lDNSClean(lNodes)
@ -306,7 +308,7 @@ class SyniTox(Tox):
else:
LOG.info(f'Called generate_nodes: udp {len(lNodes)}')
lNodes = ts.generate_nodes(oArgs=self._oArgs,
lNodes = ts.generate_nodes(oArgs=self._oargs,
ipv='ipv4',
udp_not_tcp=False)
self._settings['current_nodes_tcp'] = ts.lDNSClean(lNodes)
@ -341,8 +343,8 @@ class SyniTox(Tox):
# The code in tests_wrapper need extending and then
# wiring up to here.
#
if self._oArgs.group_invite:
pk = self._oArgs.group_invite
if self._oargs.group_invite:
pk = self._oargs.group_invite
if pk not in self.self_get_friend_list():
friend_number = self.add_friend(pk)
else:
@ -351,8 +353,8 @@ class SyniTox(Tox):
LOG.info(f"A PK to invite to the group {b}")
return True
if self._oArgs.group_moderator:
pk = self._oArgs.group_moderator
if self._oargs.group_moderator:
pk = self._oargs.group_moderator
if pk not in self.self_get_friend_list():
friend_number = self.add_friend(pk)
else:
@ -364,8 +366,8 @@ class SyniTox(Tox):
LOG.info("A PK to invite to the group as moderator {b}")
return True
if self._oArgs.group_ignore:
pk = self._oArgs.group_ignore
if self._oargs.group_ignore:
pk = self._oargs.group_ignore
if pk not in self.self_get_friend_list():
friend_number = self.add_friend(pk)
else:
@ -379,21 +381,18 @@ class SyniTox(Tox):
return None
def create_group(self):
privacy_state = TOX_GROUP_PRIVACY_STATE[self._oArgs.group_state.upper()]
nick = self._oArgs.group_nick
group_name = self._oArgs.group_name
if not group_name:
group_name = self._oArgs.bot_name +self._oArgs.irc_chan
self._oArgs.group_name = group_name
privacy_state = TOX_GROUP_PRIVACY_STATE[self._oargs.group_state.upper()]
nick = self._oargs.group_nick
status = TOX_USER_STATUS['NONE']
group_name = self._oargs.group_name
num = self.group_new(privacy_state, group_name, nick, status)
assert num >= 0, num
self.group_set_topic(num, f"{group_name} IRC on {self._oArgs.irc_host}" )
self.group_set_topic(num, f"{group_name} IRC on {self._oargs.irc_host}" )
# self.tox_group_id = self.group_invite_accept(b'', friendid, nick)
chat_id = self.group_get_chat_id(num)
if self._oArgs.profile and os.path.exists(os.path.dirname(self._oArgs.profile)):
f = os.path.splitext(self._oArgs.profile)[0] +'.chatid'
if self._oargs.profile and os.path.exists(os.path.dirname(self._oargs.profile)):
f = os.path.splitext(self._oargs.profile)[0] +'.chatid'
open(f, 'rt').write(chat_id)
LOG.info(f"Chat Id: {chat_id} written to {f}")
else:
@ -401,16 +400,16 @@ class SyniTox(Tox):
# dunno
if self.self_get_friend_list():
friendid = self.self_get_friend_list()[0]
i = on_group_invite(friendid, b'', 0)
i = self.on_group_invite(friendid, b'', 0)
assert i
self.tox_group_id = i
return num
def join_group(self):
password = self._oArgs.group_pass
nick = self._oArgs.group_nick
password = self._oargs.group_pass
nick = self._oargs.group_nick
# is the chat_id the pk?
chat_id = self._oArgs.group_chatid
chat_id = self._oargs.group_chatid
if not chat_id: return -1
num = self.group_join(chat_id, password, nick, status='')
self.sGROUP_BOT_NUM = num
@ -418,8 +417,7 @@ class SyniTox(Tox):
return num
def init_groups(self):
LOG.debug(f"init_groups proxy={self._oArgs.proxy_type}")
group_name = self._oArgs.bot_name +' Test ' +self._oArgs.irc_chan
LOG.debug(f"init_groups proxy={self._oargs.proxy_type}")
if not self.bRouted(): return
try:
if self.sGROUP_BOT_NUM < 0:
@ -458,7 +456,7 @@ class SyniTox(Tox):
self.callback_group_invite(gi_wrapped, 0)
def scs_wrapped(iTox, friendid, status, *args):
LOG.debug(f'on_connection_status {friendId} {status}.')
LOG.debug(f'on_connection_status {friendid} {status}.')
self.on_connection_status(friendid, status)
self.callback_self_connection_status(scs_wrapped)
@ -505,62 +503,59 @@ class SyniTox(Tox):
LOG.debug(f"{cert.get_subject().CN} {cert.get_issuer()}")
cipher_name = irc.get_cipher_name()
if self._oArgs.irc_ssl == 'tlsv1.2':
if self._oargs.irc_ssl == 'tlsv1.2':
assert cipher_name in lOPENSSL_12_CIPHERS or \
cipher_name in lOPENSSL_13_CIPHERS, cipher_name
elif self._oArgs.irc_ssl == 'tlsv1.3':
elif self._oargs.irc_ssl == 'tlsv1.3':
assert cipher_name in lOPENSSL_13_CIPHERS, cipher_name
got = irc.get_protocol_version_name().lower()
if got > self._oArgs.irc_ssl:
LOG.debug(f"Got: {irc.get_protocol_version_name().lower()} asked for {self._oArgs.irc_ssl}")
elif got < self._oArgs.irc_ssl:
LOG.warn(f"Got: {irc.get_protocol_version_name().lower()} asked for {self._oArgs.irc_ssl}")
if got > self._oargs.irc_ssl:
LOG.debug(f"Got: {irc.get_protocol_version_name().lower()} asked for {self._oargs.irc_ssl}")
elif got < self._oargs.irc_ssl:
LOG.warn(f"Got: {irc.get_protocol_version_name().lower()} asked for {self._oargs.irc_ssl}")
LOG.info(f"diagnose_ciphers {str(irc.get_state_string(), 'UTF-8')}")
def irc_init(self):
global iSocks5Error
if not self.bRouted(): return
nick = self._oArgs.irc_nick
realname = self._oArgs.irc_name
ident = self._oArgs.irc_ident
LOG.info(f"irc_init proxy={self._oArgs.proxy_type} SSL={self._oArgs.irc_ssl}")
nick = self._oargs.irc_nick
realname = self._oargs.irc_name
ident = self._oargs.irc_ident
LOG.info(f"irc_init proxy={self._oargs.proxy_type} SSL={self._oargs.irc_ssl}")
try:
if self._oArgs.proxy_type == 2:
if self._oargs.proxy_type == 2:
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5,
self._oArgs.proxy_host,
self._oArgs.proxy_port)
self._oargs.proxy_host,
self._oargs.proxy_port)
irc = socks.socksocket()
iTIMEOUT = 15
elif self._oArgs.proxy_type == 1:
elif self._oargs.proxy_type == 1:
socks.setdefaultproxy(socks.PROXY_TYPE_HTTP,
self._oArgs.proxy_host,
self._oArgs.proxy_port)
self._oargs.proxy_host,
self._oargs.proxy_port)
irc = socks.socksocket()
iTIMEOUT = 15
else:
irc = socket.socket()
iTIMEOUT = 10
try:
ip = ts.sDNSLookup(self._oArgs.irc_connect)
ip = ts.sDNSLookup(self._oargs.irc_connect)
except Exception as e:
LOG.warn(f"{self._oArgs.irc_host} errored in resolve {e}")
ip = self._oArgs.irc_connect
LOG.warn(f"{self._oargs.irc_host} errored in resolve {e}")
ip = self._oargs.irc_connect
else:
if not ip:
LOG.warn(f"{self._oArgs.irc_host} did not resolve.")
ip = self._oArgs.irc_connect
LOG.warn(f"{self._oargs.irc_host} did not resolve.")
ip = self._oargs.irc_connect
# https://github.com/pyca/pyopenssl/issues/168
if self._oArgs.irc_ssl:
if self._oargs.irc_ssl:
if not self._ssl_context:
self.start_ssl(self._oArgs.irc_connect)
self.start_ssl(self._oargs.irc_connect)
irc = SSL.Connection(self._ssl_context, irc)
irc.connect((ip, self._oArgs.irc_port))
if ip.endswith('.onion'):
irc.set_tlsext_host_name(None)
else:
irc.set_tlsext_host_name(bytes(self._oArgs.irc_host, 'UTF-8'))
irc.connect((ip, self._oargs.irc_port))
irc.set_tlsext_host_name(bytes(self._oargs.irc_host, 'UTF-8'))
while True:
try:
irc.do_handshake()
@ -569,15 +564,15 @@ class SyniTox(Tox):
if not rd:
raise socket.timeout('timeout')
continue
except SSL.Error as e:
except SSL.Error as e: # noqa
raise
break
self.diagnose_ciphers(irc)
else:
irc.connect((ip, self._oArgs.irc_port))
LOG.info(f"IRC SSL={self._oArgs.irc_ssl} connected ")
irc.connect((ip, self._oargs.irc_port))
LOG.info(f"IRC SSL={self._oargs.irc_ssl} connected ")
except (wrapper_tests.socks.GeneralProxyError, wrapper_tests.socks.Socks5Error) as e:
except (wrapper_tests.socks.GeneralProxyError, wrapper_tests.socks.Socks5Error) as e: # noqa
iSocks5Error += 1
if iSocks5Error >= iSocks5ErrorMax:
raise SyniToxError(f"{e.args}")
@ -622,13 +617,13 @@ class SyniTox(Tox):
self.irc.send(bytes('CAP ' + 'LS' + '\r\n', 'UTF-8' ))
self.irc.send(bytes('CAP ' + 'REQ :multi-prefix' + '\r\n', 'UTF-8'))
self.irc.send(bytes('CAP ' + 'END' + '\r\n', 'UTF-8' ))
# withh or without self._oArgs.irc_pem:
# withh or without self._oargs.irc_pem:
LOG.info("Sent CAP sending NICK and USER")
self.irc.send(bytes('NICK ' + nick + '\r\n', 'UTF-8' ))
self.irc.send(bytes('USER %s %s bla :%s\r\n' % (
self._oArgs.irc_ident,
self._oArgs.irc_host,
self._oArgs.irc_name), 'UTF-8'))
self._oargs.irc_ident,
self._oargs.irc_host,
self._oargs.irc_name), 'UTF-8'))
# OSError: [Errno 9] Bad file descriptor
@ -638,7 +633,7 @@ class SyniTox(Tox):
self.test_net()
lNodes = self._settings['current_nodes_udp']
shuffle(lNodes)
if self._oArgs.proxy_type == 0:
if self._oargs.proxy_type == 0:
ts.bootstrap_udp(lNodes[:6], [self])
else:
if self._bRouted is None:
@ -654,7 +649,7 @@ class SyniTox(Tox):
def get_all_groups(self):
try:
group_numbers = range(self._tox.group_get_number_groups())
except Exception as e:
except Exception as e: # noqa
return None
groups = map(lambda n: self.get_group_by_number(n), group_numbers)
@ -730,10 +725,10 @@ class SyniTox(Tox):
raise SyniToxError(line)
def irc_readlines(self):
nick = self._oArgs.irc_nick
pwd = self._oArgs.irc_pass
fp = self._oArgs.irc_fp
email = self._oArgs.irc_email
nick = self._oargs.irc_nick
pwd = self._oargs.irc_pass
fp = self._oargs.irc_fp
email = self._oargs.irc_email
self.readbuffer += self.irc.recv(4096)
lines = self.readbuffer.split(b'\n')
@ -754,7 +749,7 @@ class SyniTox(Tox):
print(line[i+1:])
rx = re.match(r':(.*?)!.*? PRIVMSG %s :(.*?)\r' %
self._oArgs.irc_chan, line, re.S)
self._oargs.irc_chan, line, re.S)
if l[0] == 'QUIT':
LOG.info('QUIT')
return
@ -770,7 +765,7 @@ class SyniTox(Tox):
pass
elif l[1] in ['433']:
# maybe should be an outright fail
if self._oArgs.irc_ssl:
if self._oargs.irc_ssl:
LOG.warn("Maybe the certificate was not received")
#? raise SyniToxError(line)
# sometimes but not always:
@ -783,13 +778,14 @@ class SyniTox(Tox):
# app.ts ERROR SSL error: (32, 'EPIPE')
pass
elif l[1] in ['451', '462', '477']:
if self._oArgs.irc_crt and self._oArgs.irc_key:
if self._oargs.irc_crt and self._oargs.irc_key:
LOG.warn("Maybe the certificate was not received")
raise SyniToxError(line)
elif l[1] in ['376']:
# :End of /MOTD command
if self._oArgs.irc_crt and self._oArgs.irc_key:
pass
if self._oargs.irc_crt and self._oargs.irc_key:
LOG.info(bytes(sMSG+' NickServ IDENTIFY %s %s\r\n'
% (nick, pwd,), 'UTF-8'))
elif email == '' and pwd:
LOG.info(bytes(sMSG+' NickServ IDENTIFY %s %s\r\n'
% (nick, pwd,), 'UTF-8'))
@ -805,8 +801,8 @@ class SyniTox(Tox):
raise RuntimeError("you must provide a password to register")
try:
self.irc.send(bytes(sMSG+' NickServ set cloak on\r\n', 'UTF-8'))
if self._oArgs.irc_chan:
self.irc.send(bytes('JOIN %s\r\n' % self._oArgs.irc_chan, 'UTF-8'))
if self._oargs.irc_chan:
self.irc.send(bytes('JOIN %s\r\n' % self._oargs.irc_chan, 'UTF-8'))
except BrokenPipeError:
raise SyniToxError('BrokenPipeError')
@ -869,13 +865,13 @@ class SyniTox(Tox):
iCount = 0
iDelay = 10
nick = self._oArgs.irc_nick
realname = self._oArgs.irc_name
ident = self._oArgs.irc_ident
pwd = self._oArgs.irc_pass
email = self._oArgs.irc_email
nick = self._oargs.irc_nick
realname = self._oargs.irc_name
ident = self._oargs.irc_ident
pwd = self._oargs.irc_pass
email = self._oargs.irc_email
LOG.info(f"Looping for Tox and IRC connections")
if iCount < self._oArgs.max_sleep:
if iCount < self._oargs.max_sleep:
while True:
iCount += 1
# LOG.debug(f"Looping {iCount}")
@ -909,7 +905,7 @@ class SyniTox(Tox):
#? self.bid = self.friend_by_public_key(self.sGROUP_BOT_PK)
r = self.group_reconnect(self.sGROUP_BOT_NUM)
LOG.info(f'Connected to group {r}')
except ctypes.ArgumentError as e:
except ctypes.ArgumentError as e: # noqa
self.bid = None
if self.bid == None:
@ -933,7 +929,7 @@ class SyniTox(Tox):
continue
LOG.info(f'Waiting on IRC to {self._oArgs.irc_host} on {self._oArgs.irc_port}')
LOG.info(f'Waiting on IRC to {self._oargs.irc_host} on {self._oargs.irc_port}')
readable = self.spin(20)
if not readable or not readable[0]:
@ -993,7 +989,7 @@ class SyniTox(Tox):
def on_group_invite(self, friendid, invite_data, user_data):
if not self.joined:
self.joined = True
nick = self._oArgs.group_nick
nick = self._oargs.group_nick
self.tox_group_id = self.group_invite_accept(invite_data, friendid, nick)
LOG.info('Joined groupchat.')
@ -1007,9 +1003,8 @@ class SyniTox(Tox):
print('TOX> %s: %s' % (name, message))
if message.startswith('>'):
message = '\x0309%s\x03' % message
self.irc_send(bsMSG+' %s :[%s]: %s\r\n' %
(self._oArgs.irc_chan, name, message))
self.irc_send(sMSG+' %s :[%s]: %s\r\n' %
(self._oargs.irc_chan, name, message))
if message.startswith('^'):
self.handle_command(message)
@ -1020,8 +1015,8 @@ class SyniTox(Tox):
print('TOX> %s: %s' % (name, action))
if action.startswith('>'):
action = '\x0309%s\x03' % action
self.irc_send(bytes(sMSG+' %s :\x01ACTION [%s]: %s\x01\r\n' %
(self._oArgs.irc_chan, name, action), 'UTF-8'))
self.irc_send(bytes(sMSG' %s :\x01ACTION [%s]: %s\x01\r\n' %
(self._oargs.irc_chan, name, action), 'UTF-8'))
def on_friend_request(self, pk, message):
LOG.info('Friend request from %s: %s' % (pk, message))
@ -1043,7 +1038,7 @@ class SyniTox(Tox):
def send_both(self, content):
type_ = TOX_MESSAGE_TYPE['NORMAL']
self.ensure_exe(self.group_send_message, self.sGROUP_BOT_NUM, type_, content)
self.irc_send(bytes(sMSG+' %s :%s\r\n' % (self._oArgs.irc_chan, content), 'UTF-8'))
self.irc_send(bytes(sMSG+' %s :%s\r\n' % (self._oargs.irc_chan, content), 'UTF-8'))
def handle_command(self, cmd):
cmd = cmd[1:]
@ -1067,13 +1062,13 @@ class SyniTox(Tox):
return len(data) > 0 and self._toxes.is_data_encrypted(data)
def pass_encrypt(self, data):
return self._toxes.pass_encrypt(data, self._oArgs.password)
return self._toxes.pass_encrypt(data, self._oargs.password)
def has_password(self):
return self._oArgs.password
return self._oargs.password
def pass_decrypt(self, data):
return self._toxes.pass_decrypt(data, self._oArgs.password)
return self._toxes.pass_decrypt(data, self._oargs.password)
def iMain(oArgs, oOpts):
@ -1091,7 +1086,7 @@ def iMain(oArgs, oOpts):
except ( SSL.Error, ) as e:
LOG.error(f"SSL error: {e.args}")
ret = 1
except (SSL.SysCallError, ) as e:
except (SSL.SysCallError,) as e:
# OpenSSL.SSL.SysCallError: (9, 'EBADF')
LOG.error(f"SSL error: {e.args}")
ret = 1
@ -1140,6 +1135,25 @@ def oToxygenToxOptions(oArgs):
return tox_options
def vInitializeOargs():
global oTOX_OARGS
assert oTOX_OARGS.irc_host or oTOX_OARGS.irc_connect
if not oTOX_OARGS.irc_connect:
oTOX_OARGS.irc_connect = oTOX_OARGS.irc_host
if oTOX_OARGS.irc_cadir:
assert os.path.isdir(oTOX_OARGS.irc_cadir)
if oTOX_OARGS.irc_cafile:
assert os.path.isfile(oTOX_OARGS.irc_cafile)
if oTOX_OARGS.irc_crt:
assert os.path.isfile(oTOX_OARGS.irc_crt)
assert oTOX_OARGS.irc_key
if oTOX_OARGS.irc_key:
assert os.path.isfile(oTOX_OARGS.irc_key)
assert oTOX_OARGS.irc_crt
if not oTOX_OARGS.group_name:
group_name = oTOX_OARGS.bot_name +oTOX_OARGS.irc_chan
oTOX_OARGS.group_name = group_name
def oArgparse(lArgv):
parser = ts.oMainArgparser()
parser.add_argument('profile', type=str, nargs='?', default=None,
@ -1179,7 +1193,7 @@ def oArgparse(lArgv):
help="TLS version; empty is no SSL",
choices=['', 'tlsv1.1', 'tlsv1.2', 'tlsv1.3'])
parser.add_argument('--irc_cafile', type=str,
help="Certificate Authority file",
help="Certificate Authority file (in PEM)",
default=CAfs[0])
parser.add_argument('--irc_cadir', type=str,
help="Certificate Authority directory",
@ -1243,14 +1257,7 @@ def main(lArgs=None):
oTOX_OARGS = oArgparse(lArgs)
ts.clean_booleans(oTOX_OARGS)
assert oTOX_OARGS.irc_host or oTOX_OARGS.irc_connect
if not oTOX_OARGS.irc_connect:
oTOX_OARGS.irc_connect = oTOX_OARGS.irc_host
if oTOX_OARGS.irc_cadir:
assert os.path.isdir(oTOX_OARGS.irc_cadir)
if oTOX_OARGS.irc_cafile:
assert os.path.isfile(oTOX_OARGS.irc_cafile)
vInitializeOargs()
global oTOX_OPTIONS
oTOX_OPTIONS = oToxygenToxOptions(oTOX_OARGS)

View file

@ -119,7 +119,6 @@ if [ "$TLS" -ne 0 ] ; then
fi
ls -l -s $SD/$NICK.pem
fi
exit 0
declare -a RARGS
if [ "$DEBUG" = 1 ] ; then