Migration to Qt6 using PySide6 and Python 3.

This is a bulk migraton of the codebase to work with Python3 and latest version
of Qt framework i.e. Qt6.
This commit is contained in:
Abhilash Raj 2021-05-31 21:02:25 -07:00
parent 8335009dae
commit 1f27a20e9e
No known key found for this signature in database
GPG key ID: 9D9B2BA061D0A67C
13 changed files with 408 additions and 190 deletions

View file

@ -20,10 +20,8 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
import qt_compat
QtCore = qt_compat.import_module('QtCore')
QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtCore
from PySide6 import QtWidgets as QtGui
class AboutDialog(QtGui.QDialog):
@ -44,7 +42,7 @@ class AboutDialog(QtGui.QDialog):
vbox = QtGui.QVBoxLayout()
for msg in messages:
label = QtGui.QLabel(msg.decode('utf-8'))
label = QtGui.QLabel(msg)
label.setAlignment(QtCore.Qt.AlignHCenter)
vbox.addWidget(label)
vbox.addLayout(hbox)

View file

@ -21,20 +21,20 @@
#
from pkg_resources import resource_filename
import qt_compat
from chat import ChatTextEdit
from input import InputLineEdit
import weechat.color as color
QtCore = qt_compat.import_module('QtCore')
QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtCore
from PySide6 import QtWidgets
from PySide6 import QtGui
class GenericListWidget(QtGui.QListWidget):
class GenericListWidget(QtWidgets.QListWidget):
"""Generic QListWidget with dynamic size."""
def __init__(self, *args):
QtGui.QListWidget.__init__(*(self,) + args)
super().__init__(*args)
self.setMaximumWidth(100)
self.setTextElideMode(QtCore.Qt.ElideNone)
self.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff)
@ -52,17 +52,17 @@ class GenericListWidget(QtGui.QListWidget):
def clear(self, *args):
"""Re-implement clear to set dynamic size after clear."""
QtGui.QListWidget.clear(*(self,) + args)
QtWidgets.QListWidget.clear(*(self,) + args)
self.auto_resize()
def addItem(self, *args):
"""Re-implement addItem to set dynamic size after add."""
QtGui.QListWidget.addItem(*(self,) + args)
QtWidgets.QListWidget.addItem(*(self,) + args)
self.auto_resize()
def insertItem(self, *args):
"""Re-implement insertItem to set dynamic size after insert."""
QtGui.QListWidget.insertItem(*(self,) + args)
QtWidgets.QListWidget.insertItem(*(self,) + args)
self.auto_resize()
@ -70,7 +70,7 @@ class BufferListWidget(GenericListWidget):
"""Widget with list of buffers."""
def __init__(self, *args):
GenericListWidget.__init__(*(self,) + args)
super().__init__(*args)
def switch_prev_buffer(self):
if self.currentRow() > 0:
@ -85,23 +85,23 @@ class BufferListWidget(GenericListWidget):
self.setCurrentRow(0)
class BufferWidget(QtGui.QWidget):
class BufferWidget(QtWidgets.QWidget):
"""
Widget with (from top to bottom):
title, chat + nicklist (optional) + prompt/input.
"""
def __init__(self, display_nicklist=False):
QtGui.QWidget.__init__(self)
super().__init__()
# title
self.title = QtGui.QLineEdit()
self.title = QtWidgets.QLineEdit()
self.title.setFocusPolicy(QtCore.Qt.NoFocus)
# splitter with chat + nicklist
self.chat_nicklist = QtGui.QSplitter()
self.chat_nicklist.setSizePolicy(QtGui.QSizePolicy.Expanding,
QtGui.QSizePolicy.Expanding)
self.chat_nicklist = QtWidgets.QSplitter()
self.chat_nicklist.setSizePolicy(QtWidgets.QSizePolicy.Expanding,
QtWidgets.QSizePolicy.Expanding)
self.chat = ChatTextEdit(debug=False)
self.chat_nicklist.addWidget(self.chat)
self.nicklist = GenericListWidget()
@ -110,16 +110,16 @@ class BufferWidget(QtGui.QWidget):
self.chat_nicklist.addWidget(self.nicklist)
# prompt + input
self.hbox_edit = QtGui.QHBoxLayout()
self.hbox_edit = QtWidgets.QHBoxLayout()
self.hbox_edit.setContentsMargins(0, 0, 0, 0)
self.hbox_edit.setSpacing(0)
self.input = InputLineEdit(self.chat)
self.hbox_edit.addWidget(self.input)
prompt_input = QtGui.QWidget()
prompt_input = QtWidgets.QWidget()
prompt_input.setLayout(self.hbox_edit)
# vbox with title + chat/nicklist + prompt/input
vbox = QtGui.QVBoxLayout()
vbox = QtWidgets.QVBoxLayout()
vbox.setContentsMargins(0, 0, 0, 0)
vbox.setSpacing(0)
vbox.addWidget(self.title)
@ -139,7 +139,7 @@ class BufferWidget(QtGui.QWidget):
if self.hbox_edit.count() > 1:
self.hbox_edit.takeAt(0)
if prompt is not None:
label = QtGui.QLabel(prompt)
label = QtWidgets.QLabel(prompt)
label.setContentsMargins(0, 0, 5, 0)
self.hbox_edit.insertWidget(0, label)
@ -147,7 +147,7 @@ class BufferWidget(QtGui.QWidget):
class Buffer(QtCore.QObject):
"""A WeeChat buffer."""
bufferInput = qt_compat.Signal(str, str)
bufferInput = QtCore.Signal(str, str)
def __init__(self, data={}):
QtCore.QObject.__init__(self)
@ -243,6 +243,6 @@ class Buffer(QtCore.QObject):
pixmap = QtGui.QPixmap(8, 8)
pixmap.fill()
icon = QtGui.QIcon(pixmap)
item = QtGui.QListWidgetItem(icon, nick['name'])
item = QtWidgets.QListWidgetItem(icon, nick['name'])
self.widget.nicklist.addItem(item)
self.widget.nicklist.setVisible(True)

View file

@ -21,19 +21,18 @@
#
import datetime
import qt_compat
import config
import weechat.color as color
QtCore = qt_compat.import_module('QtCore')
QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtCore
from PySide6 import QtWidgets, QtGui
class ChatTextEdit(QtGui.QTextEdit):
class ChatTextEdit(QtWidgets.QTextEdit):
"""Chat area."""
def __init__(self, debug, *args):
QtGui.QTextEdit.__init__(*(self,) + args)
QtWidgets.QTextEdit.__init__(*(self,) + args)
self.debug = debug
self.readOnly = True
self.setFocusPolicy(QtCore.Qt.NoFocus)
@ -77,9 +76,9 @@ class ChatTextEdit(QtGui.QTextEdit):
prefix = '\x01(F%s)%s' % (forcecolor, prefix)
text = '\x01(F%s)%s' % (forcecolor, text)
if prefix:
self._display_with_colors(str(prefix).decode('utf-8') + ' ')
self._display_with_colors(prefix + ' ')
if text:
self._display_with_colors(str(text).decode('utf-8'))
self._display_with_colors(text)
if text[-1:] != '\n':
self.insertPlainText('\n')
else:

View file

@ -20,7 +20,7 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
import ConfigParser
import configparser
import os
CONFIG_DIR = '%s/.qweechat' % os.getenv('HOME')
@ -91,7 +91,7 @@ config_color_options = []
def read():
"""Read config file."""
global config_color_options
config = ConfigParser.RawConfigParser()
config = configparser.RawConfigParser()
if os.path.isfile(CONFIG_FILENAME):
config.read(CONFIG_FILENAME)
@ -123,7 +123,7 @@ def write(config):
"""Write config file."""
if not os.path.exists(CONFIG_DIR):
os.mkdir(CONFIG_DIR, 0o0755)
with open(CONFIG_FILENAME, 'wb') as cfg:
with open(CONFIG_FILENAME, 'w') as cfg:
config.write(cfg)

View file

@ -20,29 +20,30 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
import qt_compat
# import qt_compat
QtGui = qt_compat.import_module('QtGui')
# QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtGui, QtWidgets
class ConnectionDialog(QtGui.QDialog):
class ConnectionDialog(QtWidgets.QDialog):
"""Connection window."""
def __init__(self, values, *args):
QtGui.QDialog.__init__(*(self,) + args)
super().__init__(*args)
self.values = values
self.setModal(True)
grid = QtGui.QGridLayout()
grid = QtWidgets.QGridLayout()
grid.setSpacing(10)
self.fields = {}
for line, field in enumerate(('server', 'port', 'password', 'lines')):
grid.addWidget(QtGui.QLabel(field.capitalize()), line, 0)
line_edit = QtGui.QLineEdit()
grid.addWidget(QtWidgets.QLabel(field.capitalize()), line, 0)
line_edit = QtWidgets.QLineEdit()
line_edit.setFixedWidth(200)
if field == 'password':
line_edit.setEchoMode(QtGui.QLineEdit.Password)
line_edit.setEchoMode(QtWidgets.QLineEdit.Password)
if field == 'lines':
validator = QtGui.QIntValidator(0, 2147483647, self)
line_edit.setValidator(validator)
@ -51,14 +52,14 @@ class ConnectionDialog(QtGui.QDialog):
grid.addWidget(line_edit, line, 1)
self.fields[field] = line_edit
if field == 'port':
ssl = QtGui.QCheckBox('SSL')
ssl = QtWidgets.QCheckBox('SSL')
ssl.setChecked(self.values['ssl'] == 'on')
grid.addWidget(ssl, line, 2)
self.fields['ssl'] = ssl
self.dialog_buttons = QtGui.QDialogButtonBox()
self.dialog_buttons = QtWidgets.QDialogButtonBox()
self.dialog_buttons.setStandardButtons(
QtGui.QDialogButtonBox.Ok | QtGui.QDialogButtonBox.Cancel)
QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel)
self.dialog_buttons.rejected.connect(self.close)
grid.addWidget(self.dialog_buttons, 4, 0, 1, 2)

View file

@ -20,11 +20,10 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
import qt_compat
from chat import ChatTextEdit
from input import InputLineEdit
QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtWidgets as QtGui
class DebugDialog(QtGui.QDialog):

View file

@ -20,21 +20,19 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
import qt_compat
QtCore = qt_compat.import_module('QtCore')
QtGui = qt_compat.import_module('QtGui')
from PySide6 import QtCore
from PySide6 import QtWidgets as QtGui
class InputLineEdit(QtGui.QLineEdit):
"""Input line."""
bufferSwitchPrev = qt_compat.Signal()
bufferSwitchNext = qt_compat.Signal()
textSent = qt_compat.Signal(str)
bufferSwitchPrev = QtCore.Signal()
bufferSwitchNext = QtCore.Signal()
textSent = QtCore.Signal(str)
def __init__(self, scroll_widget):
QtGui.QLineEdit.__init__(self)
super().__init__(scroll_widget)
self.scroll_widget = scroll_widget
self._history = []
self._history_index = -1

View file

@ -21,11 +21,12 @@
#
import struct
import qt_compat
import config
from PySide6 import QtCore, QtNetwork
from PySide6.QtCore import QObject, Signal
QtCore = qt_compat.import_module('QtCore')
QtNetwork = qt_compat.import_module('QtNetwork')
# QtCore = qt_compat.import_module('QtCore')
# QtNetwork = qt_compat.import_module('QtNetwork')
_PROTO_INIT_CMD = ['init password=%(password)s']
@ -47,11 +48,11 @@ _PROTO_SYNC_CMDS = [
class Network(QtCore.QObject):
"""I/O with WeeChat/relay."""
statusChanged = qt_compat.Signal(str, str)
messageFromWeechat = qt_compat.Signal(QtCore.QByteArray)
statusChanged = Signal(str, str)
messageFromWeechat = Signal(QtCore.QByteArray)
def __init__(self, *args):
QtCore.QObject.__init__(*(self,) + args)
super().__init__(*args)
self.status_disconnected = 'disconnected'
self.status_connecting = 'connecting...'
self.status_connected = 'connected'
@ -63,13 +64,14 @@ class Network(QtCore.QObject):
self._buffer = QtCore.QByteArray()
self._socket = QtNetwork.QSslSocket()
self._socket.connected.connect(self._socket_connected)
self._socket.error.connect(self._socket_error)
# self._socket.error.connect(self._socket_error)
self._socket.readyRead.connect(self._socket_read)
self._socket.disconnected.connect(self._socket_disconnected)
def _socket_connected(self):
"""Slot: socket connected."""
self.statusChanged.emit(self.status_connected, None)
print('Connected, now sending password.')
if self._password:
self.send_to_weechat('\n'.join(_PROTO_INIT_CMD + _PROTO_SYNC_CMDS)
% {'password': str(self._password),
@ -87,7 +89,7 @@ class Network(QtCore.QObject):
self._buffer.append(data)
while len(self._buffer) >= 4:
remainder = None
length = struct.unpack('>i', self._buffer[0:4])[0]
length = struct.unpack('>i', self._buffer[0:4].data())[0]
if len(self._buffer) < length:
# partial message, just wait for end of message
break
@ -108,7 +110,7 @@ class Network(QtCore.QObject):
self._server = None
self._port = None
self._ssl = None
self._password = None
self._password = ""
self.statusChanged.emit(self.status_disconnected, None)
def is_connected(self):
@ -122,6 +124,7 @@ class Network(QtCore.QObject):
def connect_weechat(self, server, port, ssl, password, lines):
"""Connect to WeeChat."""
self._server = server
print(f'Connecting to server {self._server}')
try:
self._port = int(port)
except ValueError:
@ -136,11 +139,13 @@ class Network(QtCore.QObject):
return
if self._socket.state() != QtNetwork.QAbstractSocket.UnconnectedState:
self._socket.abort()
self._socket.connectToHost(self._server, self._port)
if self._ssl:
self._socket.ignoreSslErrors()
self._socket.startClientEncryption()
self.statusChanged.emit(self.status_connecting, None)
self._socket.connectToHostEncrypted(self._server, self._port)
else:
self._socket.connectToHost(self._server, self._port)
print('Got SSL connection')
self.statusChanged.emit(self.status_connecting, "")
def disconnect_weechat(self):
"""Disconnect from WeeChat."""

View file

@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
#
# File downloaded from:
# https://github.com/epage/PythonUtils/blob/master/util/qt_compat.py
# Author: epage
# License: LGPL 2.1
#
from __future__ import with_statement
from __future__ import division
_TRY_PYSIDE = True
uses_pyside = False
try:
if not _TRY_PYSIDE:
raise ImportError()
import PySide.QtCore as _QtCore
QtCore = _QtCore
uses_pyside = True
except ImportError:
import sip
sip.setapi('QString', 2)
sip.setapi('QVariant', 2)
import PyQt4.QtCore as _QtCore
QtCore = _QtCore
uses_pyside = False
def _pyside_import_module(moduleName):
pyside = __import__('PySide', globals(), locals(), [moduleName], -1)
return getattr(pyside, moduleName)
def _pyqt4_import_module(moduleName):
pyside = __import__('PyQt4', globals(), locals(), [moduleName], -1)
return getattr(pyside, moduleName)
if uses_pyside:
import_module = _pyside_import_module
Signal = QtCore.Signal
Slot = QtCore.Slot
Property = QtCore.Property
else:
import_module = _pyqt4_import_module
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
Property = QtCore.pyqtProperty
if __name__ == "__main__":
pass

View file

@ -36,7 +36,7 @@ It requires requires WeeChat 0.3.7 or newer, running on local or remote host.
import sys
import traceback
from pkg_resources import resource_filename
import qt_compat
# import qt_compat
import config
import weechat.protocol as protocol
from network import Network
@ -46,8 +46,14 @@ from debug import DebugDialog
from about import AboutDialog
from version import qweechat_version
QtCore = qt_compat.import_module('QtCore')
QtGui = qt_compat.import_module('QtGui')
from PySide6.QtWidgets import (
QApplication, QLabel, QPushButton, QVBoxLayout, QWidget)
from PySide6.QtCore import Qt, Slot
from PySide6 import QtGui, QtWidgets, QtCore
# QtCore = qt_compat.import_module('QtCore')
# QtGui = qt_compat.import_module('QtGui')
NAME = 'QWeeChat'
AUTHOR = 'Sébastien Helleu'
@ -58,11 +64,11 @@ WEECHAT_SITE = 'https://weechat.org/'
DEBUG_NUM_LINES = 50
class MainWindow(QtGui.QMainWindow):
class MainWindow(QtWidgets.QMainWindow):
"""Main window."""
def __init__(self, *args):
QtGui.QMainWindow.__init__(*(self,) + args)
super().__init__()
self.config = config.read()
@ -87,11 +93,11 @@ class MainWindow(QtGui.QMainWindow):
# default buffer
self.buffers = [Buffer()]
self.stacked_buffers = QtGui.QStackedWidget()
self.stacked_buffers = QtWidgets.QStackedWidget()
self.stacked_buffers.addWidget(self.buffers[0].widget)
# splitter with buffers + chat/input
splitter = QtGui.QSplitter()
splitter = QtWidgets.QSplitter()
splitter.addWidget(self.list_buffers)
splitter.addWidget(self.stacked_buffers)
@ -146,7 +152,7 @@ class MainWindow(QtGui.QMainWindow):
menu_window.addAction(self.actions['debug'])
menu_help = self.menu.addMenu('&Help')
menu_help.addAction(self.actions['about'])
self.network_status = QtGui.QLabel()
self.network_status = QtWidgets.QLabel()
self.network_status.setFixedHeight(20)
self.network_status.setFixedWidth(200)
self.network_status.setContentsMargins(0, 0, 10, 0)
@ -312,24 +318,24 @@ class MainWindow(QtGui.QMainWindow):
def _network_weechat_msg(self, message):
"""Called when a message is received from WeeChat."""
self.debug_display(0, '==>',
'message (%d bytes):\n%s'
% (len(message),
protocol.hex_and_ascii(message, 20)),
forcecolor='#008800')
# self.debug_display(0, '==>',
# 'message (%d bytes):\n%s'
# % (len(message),
# protocol.hex_and_ascii(message, 20)),
# forcecolor='#008800')
try:
proto = protocol.Protocol()
message = proto.decode(str(message))
message = proto.decode(message.data())
if message.uncompressed:
self.debug_display(
0, '==>',
'message uncompressed (%d bytes):\n%s'
% (message.size_uncompressed,
protocol.hex_and_ascii(message.uncompressed, 20)),
forcecolor='#008800')
forcecolor='#008800')
self.debug_display(0, '', 'Message: %s' % message)
self.parse_message(message)
except: # noqa: E722
except Exception: # noqa: E722
print('Error while decoding message from WeeChat:\n%s'
% traceback.format_exc())
self.network.disconnect_weechat()
@ -339,6 +345,7 @@ class MainWindow(QtGui.QMainWindow):
for obj in message.objects:
if obj.objtype != 'hda' or obj.value['path'][-1] != 'buffer':
continue
print('listbuffers object', obj.objtype, obj.value['path'])
self.list_buffers.clear()
while self.stacked_buffers.count() > 0:
buf = self.stacked_buffers.widget(0)
@ -346,6 +353,7 @@ class MainWindow(QtGui.QMainWindow):
self.buffers = []
for item in obj.value['items']:
buf = self.create_buffer(item)
print(f'Creating buffer for {item}')
self.insert_buffer(len(self.buffers), buf)
self.list_buffers.setCurrentRow(0)
self.buffers[0].widget.input.setFocus()
@ -477,6 +485,7 @@ class MainWindow(QtGui.QMainWindow):
def parse_message(self, message):
"""Parse a WeeChat message."""
print(f'message.msgid = {message.msgid}')
if message.msgid.startswith('debug'):
self.debug_display(0, '', '(debug message, ignored)')
elif message.msgid == 'listbuffers':
@ -495,6 +504,8 @@ class MainWindow(QtGui.QMainWindow):
self.network.desync_weechat()
elif message.msgid == '_upgrade_ended':
self.network.sync_weechat()
else:
print(f"Unknown message with id {message.msgid}")
def create_buffer(self, item):
"""Create a new buffer."""
@ -511,7 +522,7 @@ class MainWindow(QtGui.QMainWindow):
self.buffers.insert(index, buf)
self.list_buffers.insertItem(index, '%d. %s'
% (buf.data['number'],
buf.data['full_name'].decode('utf-8')))
buf.data['full_name']))
self.stacked_buffers.insertWidget(index, buf.widget)
def remove_buffer(self, index):
@ -544,12 +555,13 @@ class MainWindow(QtGui.QMainWindow):
if self.debug_dialog:
self.debug_dialog.close()
config.write(self.config)
QtGui.QMainWindow.closeEvent(self, event)
QtWidgets.QMainWindow.closeEvent(self, event)
app = QtGui.QApplication(sys.argv)
app.setStyle(QtGui.QStyleFactory.create('Cleanlooks'))
app = QApplication(sys.argv)
app.setStyle(QtWidgets.QStyleFactory.create('Cleanlooks'))
app.setWindowIcon(QtGui.QIcon(
resource_filename(__name__, 'data/icons/weechat.png')))
resource_filename(__name__, 'data/icons/weechat.png')))
main = MainWindow()
main.show()
sys.exit(app.exec_())

View file

@ -34,15 +34,10 @@ import collections
import struct
import zlib
if hasattr(collections, 'OrderedDict'):
# python >= 2.7
class WeechatDict(collections.OrderedDict):
def __str__(self):
return '{%s}' % ', '.join(
['%s: %s' % (repr(key), repr(self[key])) for key in self])
else:
# python <= 2.6
WeechatDict = dict
class WeechatDict(collections.OrderedDict):
def __str__(self):
return '{%s}' % ', '.join(
['%s: %s' % (repr(key), repr(self[key])) for key in self])
class WeechatObject:
@ -151,7 +146,7 @@ class Protocol:
if len(self.data) < 3:
self.data = ''
return ''
objtype = str(self.data[0:3])
objtype = bytes(self.data[0:3])
self.data = self.data[3:]
return objtype
@ -196,14 +191,17 @@ class Protocol:
value = self._obj_len_data(1)
if value is None:
return None
return int(str(value))
return int(value)
def _obj_str(self):
"""Read a string in data (length on 4 bytes + content)."""
value = self._obj_len_data(4)
if value is None:
return None
return str(value)
try:
return value.decode()
except AttributeError:
return value
def _obj_buffer(self):
"""Read a buffer in data (length on 4 bytes + data)."""
@ -214,22 +212,22 @@ class Protocol:
value = self._obj_len_data(1)
if value is None:
return None
return '0x%s' % str(value)
return '0x%s' % value
def _obj_time(self):
"""Read a time in data (length on 1 byte + value as string)."""
value = self._obj_len_data(1)
if value is None:
return None
return int(str(value))
return int(value)
def _obj_hashtable(self):
"""
Read a hashtable in data
(type for keys + type for values + count + items).
"""
type_keys = self._obj_type()
type_values = self._obj_type()
type_keys = self._obj_type().decode()
type_values = self._obj_type().decode()
count = self._obj_int()
hashtable = WeechatDict()
for _ in range(count):
@ -248,7 +246,7 @@ class Protocol:
keys_types = []
dict_keys = WeechatDict()
for key in list_keys:
items = key.split(':')
items = list(item for item in key.split(':'))
keys_types.append(items)
dict_keys[items[0]] = items[1]
items = []
@ -259,6 +257,7 @@ class Protocol:
for _ in enumerate(list_path):
pointers.append(self._obj_ptr())
for key, objtype in keys_types:
objtype = objtype
item[key] = self._obj_cb[objtype]()
item['__path'] = pointers
items.append(item)
@ -296,7 +295,7 @@ class Protocol:
def _obj_array(self):
"""Read an array of values in data."""
type_values = self._obj_type()
type_values = self._obj_type().decode()
count_values = self._obj_int()
values = []
for _ in range(count_values):
@ -314,7 +313,7 @@ class Protocol:
if compression:
uncompressed = zlib.decompress(self.data[5:])
size_uncompressed = len(uncompressed) + 5
uncompressed = '%s%s%s' % (struct.pack('>i', size_uncompressed),
uncompressed = b'%s%s%s' % (struct.pack('>i', size_uncompressed),
struct.pack('b', 0), uncompressed)
self.data = uncompressed
else:
@ -328,7 +327,7 @@ class Protocol:
# read objects
objects = WeechatObjects(separator=separator)
while len(self.data) > 0:
objtype = self._obj_type()
objtype = self._obj_type().decode()
value = self._obj_cb[objtype]()
objects.append(WeechatObject(objtype, value, separator=separator))
return WeechatMessage(size, size_uncompressed, compression,
@ -344,13 +343,20 @@ def hex_and_ascii(data, bytes_per_line=10):
for i in range(num_lines):
str_hex = []
str_ascii = []
for char in data[i*bytes_per_line:(i*bytes_per_line)+bytes_per_line]:
for j in range(bytes_per_line): # data[i*bytes_per_line:(i*bytes_per_line)+bytes_per_line]:
# We can't easily iterate over individual bytes, so we are going to
# do it this way.
index = (i*bytes_per_line) + j
char = data[index:index+1]
if not char:
char = b'x'
byte = struct.unpack('B', char)[0]
str_hex.append('%02X' % int(byte))
str_hex.append(b'%02X' % int(byte))
if byte >= 32 and byte <= 127:
str_ascii.append(char)
else:
str_ascii.append('.')
fmt = '%%-%ds %%s' % ((bytes_per_line * 3) - 1)
lines.append(fmt % (' '.join(str_hex), ''.join(str_ascii)))
return '\n'.join(lines)
str_ascii.append(b'.')
fmt = b'%%-%ds %%s' % ((bytes_per_line * 3) - 1)
lines.append(fmt % (b' '.join(str_hex),
b''.join(str_ascii)))
return b'\n'.join(lines)

View file

@ -24,7 +24,7 @@
Command-line program for testing WeeChat/relay protocol.
"""
from __future__ import print_function
import argparse
import os
@ -37,7 +37,8 @@ import time
import traceback
import protocol # WeeChat/relay protocol
from .. version import qweechat_version
# from .. version import qweechat_version
qweechat_version = '1.1'
NAME = 'qweechat-testproto'
@ -75,11 +76,11 @@ class TestProto(object):
Return True if OK, False if error.
"""
try:
for msg in messages.split('\n'):
if msg == 'quit':
for msg in messages.split(b'\n'):
if msg == b'quit':
self.has_quit = True
self.sock.sendall(msg + '\n')
print('\x1b[33m<-- ' + msg + '\x1b[0m')
self.sock.sendall(msg + b'\n')
sys.stdout.write((b'\x1b[33m<-- ' + msg + b'\x1b[0m\n').decode())
except: # noqa: E722
traceback.print_exc()
print('Failed to send message')
@ -94,7 +95,7 @@ class TestProto(object):
try:
proto = protocol.Protocol()
msgd = proto.decode(message,
separator='\n' if self.args.debug > 0
separator=b'\n' if self.args.debug > 0
else ', ')
print('')
if self.args.debug >= 2 and msgd.uncompressed:
@ -136,10 +137,10 @@ class TestProto(object):
"""
if self.has_quit:
return 0
message = ''
recvbuf = ''
prompt = '\x1b[36mrelay> \x1b[0m'
sys.stdout.write(prompt)
message = b''
recvbuf = b''
prompt = b'\x1b[36mrelay> \x1b[0m'
sys.stdout.write(prompt.decode())
sys.stdout.flush()
try:
while not self.has_quit:
@ -149,13 +150,14 @@ class TestProto(object):
buf = os.read(_file.fileno(), 4096)
if buf:
message += buf
if '\n' in message:
messages = message.split('\n')
msgsent = '\n'.join(messages[:-1])
if b'\n' in message:
messages = message.split(b'\n')
msgsent = b'\n'.join(messages[:-1])
if msgsent and not self.send(msgsent):
return 4
message = messages[-1]
sys.stdout.write(prompt + message)
sys.stdout.write((prompt + message).decode())
# sys.stdout.write(prompt + message)
sys.stdout.flush()
else:
buf = _file.recv(4096)
@ -178,12 +180,12 @@ class TestProto(object):
if remainder:
recvbuf = remainder
else:
recvbuf = ''
sys.stdout.write(prompt + message)
recvbuf = b''
sys.stdout.write((prompt + message).decode())
sys.stdout.flush()
except: # noqa: E722
traceback.print_exc()
self.send('quit')
self.send(b'quit')
return 0
def __del__(self):
@ -220,7 +222,7 @@ The script returns:
help='debug mode: long objects view '
'(-dd: display raw messages)')
parser.add_argument('-v', '--version', action='version',
version=qweechat_version())
version=qweechat_version)
parser.add_argument('hostname',
help='hostname (or IP address) of machine running '
'WeeChat/relay')

View file

@ -0,0 +1,253 @@
# -*- coding: utf-8 -*-
#
# testproto.py - command-line program for testing WeeChat/relay protocol
#
# Copyright (C) 2013-2021 Sébastien Helleu <flashcode@flashtux.org>
#
# This file is part of QWeeChat, a Qt remote GUI for WeeChat.
#
# QWeeChat is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# QWeeChat is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
#
"""
Command-line program for testing WeeChat/relay protocol.
"""
from __future__ import print_function
import argparse
import os
import select
import shlex
import socket
import struct
import sys
import time
import traceback
import protocol # WeeChat/relay protocol
# from .. version import qweechat_version
qweechat_version = '1.1'
NAME = 'qweechat-testproto'
class TestProto(object):
"""Test of WeeChat/relay protocol."""
def __init__(self, args):
self.args = args
self.sock = None
self.has_quit = False
self.address = '{self.args.hostname}/{self.args.port} ' \
'(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
def connect(self):
"""
Connect to WeeChat/relay.
Return True if OK, False if error.
"""
inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
try:
self.sock = socket.socket(inet, socket.SOCK_STREAM)
self.sock.connect((self.args.hostname, self.args.port))
except: # noqa: E722
if self.sock:
self.sock.close()
print('Failed to connect to', self.address)
return False
print('Connected to', self.address)
return True
def send(self, messages):
"""
Send a text message to WeeChat/relay.
Return True if OK, False if error.
"""
try:
for msg in messages.split('\n'):
if msg == 'quit':
self.has_quit = True
self.sock.sendall(msg + '\n')
print('\x1b[33m<-- ' + msg + '\x1b[0m')
except: # noqa: E722
traceback.print_exc()
print('Failed to send message')
return False
return True
def decode(self, message):
"""
Decode a binary message received from WeeChat/relay.
Return True if OK, False if error.
"""
try:
proto = protocol.Protocol()
msgd = proto.decode(message,
separator='\n' if self.args.debug > 0
else ', ')
print('')
if self.args.debug >= 2 and msgd.uncompressed:
# display raw message
print('\x1b[32m--> message uncompressed ({0} bytes):\n'
'{1}\x1b[0m'
''.format(msgd.size_uncompressed,
protocol.hex_and_ascii(msgd.uncompressed, 20)))
# display decoded message
print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
except: # noqa: E722
traceback.print_exc()
print('Error while decoding message from WeeChat')
return False
return True
def send_stdin(self):
"""
Send commands from standard input if some data is available.
Return True if OK (it's OK if stdin has no commands),
False if error.
"""
inr = select.select([sys.stdin], [], [], 0)[0]
if inr:
data = os.read(sys.stdin.fileno(), 4096)
if data:
if not self.send(data.strip()):
# self.sock.close()
return False
# open stdin to read user commands
sys.stdin = open('/dev/tty')
return True
def mainloop(self):
"""
Main loop: read keyboard, send commands, read socket,
decode/display binary messages received from WeeChat/relay.
Return 0 if OK, 4 if send error, 5 if decode error.
"""
if self.has_quit:
return 0
message = ''
recvbuf = ''
prompt = '\x1b[36mrelay> \x1b[0m'
sys.stdout.write(prompt)
sys.stdout.flush()
try:
while not self.has_quit:
inr = select.select([sys.stdin, self.sock], [], [], 1)[0]
for _file in inr:
if _file == sys.stdin:
buf = os.read(_file.fileno(), 4096)
if buf:
message += buf
if '\n' in message:
messages = message.split('\n')
msgsent = '\n'.join(messages[:-1])
if msgsent and not self.send(msgsent):
return 4
message = messages[-1]
sys.stdout.write(prompt + message)
sys.stdout.flush()
else:
buf = _file.recv(4096)
if buf:
recvbuf += buf
while len(recvbuf) >= 4:
remainder = None
length = struct.unpack('>i', recvbuf[0:4])[0]
if len(recvbuf) < length:
# partial message, just wait for the
# end of message
break
# more than one message?
if length < len(recvbuf):
# save beginning of another message
remainder = recvbuf[length:]
recvbuf = recvbuf[0:length]
if not self.decode(recvbuf):
return 5
if remainder:
recvbuf = remainder
else:
recvbuf = ''
sys.stdout.write(prompt + message)
sys.stdout.flush()
except: # noqa: E722
traceback.print_exc()
self.send('quit')
return 0
def __del__(self):
print('Closing connection with', self.address)
time.sleep(0.5)
self.sock.close()
def main():
"""Main function."""
# parse command line arguments
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
fromfile_prefix_chars='@',
description='Command-line program for testing WeeChat/relay protocol.',
epilog='''
Environment variable "QWEECHAT_PROTO_OPTIONS" can be set with default options.
Argument "@file.txt" can be used to read default options in a file.
Some commands can be piped to the script, for example:
echo "init password=xxxx" | {name} localhost 5000
{name} localhost 5000 < commands.txt
The script returns:
0: OK
2: wrong arguments (command line)
3: connection error
4: send error (message sent to WeeChat)
5: decode error (message received from WeeChat)
'''.format(name=NAME))
parser.add_argument('-6', '--ipv6', action='store_true',
help='connect using IPv6')
parser.add_argument('-d', '--debug', action='count', default=0,
help='debug mode: long objects view '
'(-dd: display raw messages)')
parser.add_argument('-v', '--version', action='version',
version=qweechat_version)
parser.add_argument('hostname',
help='hostname (or IP address) of machine running '
'WeeChat/relay')
parser.add_argument('port', type=int,
help='port of machine running WeeChat/relay')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
_args = parser.parse_args(
shlex.split(os.getenv('QWEECHAT_PROTO_OPTIONS') or '') + sys.argv[1:])
test = TestProto(_args)
# connect to WeeChat/relay
if not test.connect():
sys.exit(3)
# send commands from standard input if some data is available
if not test.send_stdin():
sys.exit(4)
# main loop (wait commands, display messages received)
returncode = test.mainloop()
del test
sys.exit(returncode)
if __name__ == "__main__":
main()