#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright (C) 2011 Sebastien Helleu # # This file is part of QWeeChat, a Qt remote GUI for WeeChat. # # QWeeChat is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # QWeeChat is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with QWeeChat. If not, see . # # # Decode binary messages received from WeeChat/relay. # # For info about protocol and format of messages, please read document "WeeChat Relay Protocol", # available at: http://www.weechat.org/doc/ # # History: # # 2011-11-23, Sebastien Helleu : # start dev # import struct, zlib class WeechatObject: def __init__(self, objtype, value): self.objtype = objtype; self.value = value def _str_value(self, v): if type(v) is str and not v is None: return '\'%s\'' % v return str(v) def _str_value_hdata(self): lines = ['', ' keys: %s' % str(self.value['keys']), ' path: %s' % str(self.value['path'])] for i, item in enumerate(self.value['items']): lines.append(' item %d:' % (i + 1)) lines.append('\n'.join([' %s: %s' % (key, self._str_value(value)) for key, value in iter(sorted(item.iteritems()))])) return '\n'.join(lines) def _str_value_infolist(self): lines = ['', ' name: %s' % self.value['name']] for i, item in enumerate(self.value['items']): lines.append(' item %d:' % (i + 1)) lines.append('\n'.join([' %s: %s' % (key, self._str_value(value)) for key, value in iter(sorted(item.iteritems()))])) return '\n'.join(lines) def _str_value_other(self): return self._str_value(self.value) def __str__(self): self._obj_cb = {'hda': self._str_value_hdata, 'lis': self._str_value_infolist, } return '%s: %s' % (self.objtype, self._obj_cb.get(self.objtype, self._str_value_other)()) class WeechatObjects(list): def __str__(self): return '\n'.join([str(obj) for obj in self]) class WeechatMessage: def __init__(self, size, size_uncompressed, compression, uncompressed, msgid, objects): self.size = size self.size_uncompressed = size_uncompressed self.compression = compression self.uncompressed = uncompressed self.msgid = msgid self.objects = objects def __str__(self): if self.compression != 0: return 'size: %d/%d (%d%%), id=\'%s\', objects:\n%s' % ( self.size, self.size_uncompressed, 100 - ((self.size * 100) / self.size_uncompressed), self.msgid, self.objects) else: return 'size: %d, id=\'%s\', objects:\n%s' % (self.size, self.msgid, self.objects) class Protocol: """Decode binary message received from WeeChat/relay.""" def __init__(self): self._obj_cb = {'chr': self._obj_char, 'int': self._obj_int, 'lon': self._obj_long, 'str': self._obj_str, 'buf': self._obj_buffer, 'ptr': self._obj_ptr, 'tim': self._obj_time, 'hda': self._obj_hdata, 'inf': self._obj_info, 'lis': self._obj_infolist, } def _obj_len_data(self, length_size): """Read length (1 or 4 bytes), then value with this length.""" if len(self.data) < length_size: self.data = '' return None if length_size == 1: length = struct.unpack('B', self.data[0:1])[0] self.data = self.data[1:] else: length = self._obj_int() if length < 0: return None if length > 0: value = self.data[0:length] self.data = self.data[length:] else: value = '' return value def _obj_char(self): """Read a char in data.""" if len(self.data) < 1: return 0 value = struct.unpack('b', self.data[0:1])[0] self.data = self.data[1:] return value def _obj_int(self): """Read an integer in data (4 bytes).""" if len(self.data) < 4: self.data = '' return 0 value = struct.unpack('>i', self.data[0:4])[0] self.data = self.data[4:] return value def _obj_long(self): """Read a long integer in data (length on 1 byte + value as string).""" value = self._obj_len_data(1) if value is None: return None return int(str(value)) def _obj_str(self): """Read a string in data (length on 4 bytes + content).""" value = self._obj_len_data(4) if value is None: return None return str(value) def _obj_buffer(self): """Read a buffer in data (length on 4 bytes + data).""" return self._obj_len_data(4) def _obj_ptr(self): """Read a pointer in data (length on 1 byte + value as string).""" value = self._obj_len_data(1) if value is None: return None return '0x%s' % str(value) def _obj_time(self): """Read a time in data (length on 1 byte + value as string).""" value = self._obj_len_data(1) if value is None: return None return str(value) def _obj_hdata(self): """Read a hdata in data.""" path = self._obj_str() keys = self._obj_str() count = self._obj_int() list_path = path.split('/') list_keys = keys.split(',') keys_types = [] dict_keys = {} for key in list_keys: items = key.split(':') keys_types.append(items) dict_keys[items[0]] = items[1] items = [] for i in xrange(0, count): item = {} pointers = [] for p in xrange(0, len(list_path)): pointers.append(self._obj_ptr()) for key, objtype in keys_types: item[key] = self._obj_cb[objtype]() item['__path'] = pointers items.append(item) return {'path': list_path, 'keys': dict_keys, 'count': count, 'items': items, } def _obj_info(self): """Read an info in data.""" name = self._obj_str() value = self._obj_str() return (name, value) def _obj_infolist(self): """Read an infolist in data.""" name = self._obj_str() count_items = self._obj_int() items = [] for i in xrange(0, count_items): count_vars = self._obj_int() variables = {} for v in xrange(0, count_vars): var_name = self._obj_str() var_type = self._obj_type() var_value = self._obj_cb[var_type]() variables[var_name] = var_value items.append(variables) return {'name': name, 'items': items} def _obj_type(self): """Read type in data (3 chars).""" if len(self.data) < 3: self.data = '' return '' objtype = str(self.data[0:3]) self.data = self.data[3:] return objtype def decode(self, data): """Decode binary data and return list of objects.""" self.data = data size = len(self.data) size_uncompressed = size uncompressed = None # uncompress data (if it is compressed) compression = struct.unpack('b', self.data[4:5])[0] if compression: uncompressed = zlib.decompress(self.data[5:]) size_uncompressed = len(uncompressed) + 5 uncompressed = '%s%s%s' % (struct.pack('>i', size_uncompressed), struct.pack('b', 0), uncompressed) self.data = uncompressed # skip length and compression flag self.data = self.data[5:] # read id msgid = self._obj_str() if msgid is None: msgid = '' # read objects objects = WeechatObjects() while len(self.data) > 0: objtype = self._obj_type() value = self._obj_cb[objtype]() objects.append(WeechatObject(objtype, value)) return WeechatMessage(size, size_uncompressed, compression, uncompressed, msgid, objects) def hex_and_ascii(data, bytes_per_line=10): """Convert a QByteArray to hex + ascii output.""" num_lines = ((len(data) - 1) / bytes_per_line) + 1 if num_lines == 0: return '' lines = [] for i in xrange(0, num_lines): str_hex = [] str_ascii = [] for char in data[i*bytes_per_line:(i*bytes_per_line)+bytes_per_line]: byte = struct.unpack('B', char)[0] str_hex.append('%02X' % int(byte)) if byte >= 32 and byte <= 127: str_ascii.append(char) else: str_ascii.append('.') fmt = '%%-%ds %%s' % ((bytes_per_line * 3) - 1) lines.append(fmt % (' '.join(str_hex), ''.join(str_ascii))) return '\n'.join(lines)