DTube Tutorial Example

This is example-dtube.py for use with the DTube Tutorial, followed by three helper files required by the example.

from dbus_pprint import dbus_pprint
import getpass, sys
import dbus, gobject, telepathy
import dbus.glib # sets up a main loop (!!!); somewhat deprecated
from functools import partial

from accountmgr import AccountManager
from account import Account

from telepathy.constants import (
    CONNECTION_HANDLE_TYPE_CONTACT, HANDLE_TYPE_LIST,
    CHANNEL_TEXT_MESSAGE_TYPE_NORMAL,
    TUBE_TYPE_DBUS, TUBE_STATE_LOCAL_PENDING, TUBE_STATE_OPEN,
    CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED,
    SOCKET_ACCESS_CONTROL_LOCALHOST)
from telepathy.interfaces import (
    CHANNEL, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_CONTACT_LIST,
    CHANNEL_TYPE_DBUS_TUBE,
    CONNECTION, CONNECTION_INTERFACE_REQUESTS,
    CONNECTION_INTERFACE_CONTACTS,
    CONNECTION_INTERFACE_ALIASING,
    CONNECTION_INTERFACE_SIMPLE_PRESENCE,
    CONNECTION_INTERFACE_CONTACT_CAPABILITIES,
    CONNECTION_INTERFACE_CAPABILITIES,
    CHANNEL_INTERFACE_TUBE,

    CLIENT, CLIENT_HANDLER,
    ACCOUNT_MANAGER, ACCOUNT)

DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties'

SERVICE = 'au.com.baum.example' # domain name, backwards
PATH = '/' + SERVICE.replace('.', '/')
TUBETYPE = SERVICE.replace('.', '_')

# Note on the TUBETYPE: in principle, it should be equal to the SERVICE;
# however, in some places, a single word is required...

class Example(dbus.service.Object,
              #telepathy._generated.Client_Handler.ClientHandler,
              telepathy.server.DBusProperties):
    def __init__(self):
        self.buddies = {}
        self._interfaces = set([CLIENT, CLIENT_HANDLER, DBUS_PROPERTIES])
        self.service_name = CLIENT+'.'+TUBETYPE
        object_path = '/'+self.service_name.replace('.', '/')


        dbus.service.Object.__init__(self, bus, object_path)
        telepathy.server.DBusProperties.__init__(self)
        self._implement_property_get(CLIENT, {
            'Interfaces': lambda: [CLIENT, CLIENT_HANDLER, DBUS_PROPERTIES],
          })
        self._implement_property_get(CLIENT_HANDLER, {
            'HandlerChannelFilter': lambda: dbus.Array([
                dbus.Dictionary({
                    CHANNEL + '.ChannelType'     : CHANNEL_TYPE_DBUS_TUBE,
                    CHANNEL + '.TargetHandleType': CONNECTION_HANDLE_TYPE_CONTACT,
                    CHANNEL_TYPE_DBUS_TUBE + '.ServiceName'    : SERVICE,
                }, signature='sv')
            ], signature='a{sv}'),
            #'Capabilities': lambda: dbus.Array([SERVICE], signature='as'),
          })
        #bus.request_name(self.service_name)
        self.name_ownership = dbus.service.BusName(self.service_name, bus=bus)

    def new_conn(self, conn):
        conn.call_when_ready(self.ready_cb)

    def ready_cb(self, conn):
        """ Called when the connection is ready... """

        # get list of buddies:
        self.get_roster(conn)

        # buddies will be contacted as their details come in...

    def get_roster(self, conn):
        conn[CONNECTION_INTERFACE_CONTACT_CAPABILITIES].connect_to_signal(
                  'ContactCapabilitiesChanged',
                  partial(self.contact_capabilities_changed, conn))
        for group in ('subscribe', 'publish', 'stored', 'known'):
            conn[CONNECTION_INTERFACE_REQUESTS].EnsureChannel({
                CHANNEL + '.ChannelType'     : CHANNEL_TYPE_CONTACT_LIST,
                CHANNEL + '.TargetHandleType': HANDLE_TYPE_LIST,
                CHANNEL + '.TargetID'        : group,
                },
                reply_handler = partial(self.member_channel_cb, conn),
                error_handler = partial(self.roster_error_cb, group))

    def roster_error_cb(self, group, *args):
        if group=='known':
            print 'Info: could not retrieve the obsolete "known contacts" list'
            print 'The error was:', args
        else:
            print 'Warning: could not retrieve the "%s contacts" list'%group
            print 'The error was:', args

    def member_channel_cb(self, conn, yours, path, properties):
        channel = telepathy.client.Channel(conn.service_name, path)
        channel[CHANNEL_INTERFACE_GROUP].connect_to_signal(
                  'MembersChanged', partial(self.members_changed_cb, conn))

        handles = channel[DBUS_PROPERTIES].Get(CHANNEL_INTERFACE_GROUP,
                                                                 'Members')

        handles = set(handles)
        if conn.object_path in self.buddies:
            handles -= self.buddies[conn.object_path]
        else:
            self.buddies[conn.object_path] = {}
        if handles:
            self.got_new_buddies(conn, handles)

    def got_new_buddies(self, conn, handles):
        attribs = conn[CONNECTION_INTERFACE_CONTACTS].GetContactAttributes(
                handles, [
                    CONNECTION,
                    CONNECTION_INTERFACE_ALIASING,
                    CONNECTION_INTERFACE_CONTACT_CAPABILITIES,
                ],
                False)
        self.buddies[conn.object_path].update(attribs)
        self.select_buddy(conn, handles)

    def contact_capabilities_changed(self, conn, changed_caps):
        # notes:
        #   1. We're only handling here the situation where contacts go
        #   online; going offline is also signalled here...
        #   2. We should check whether changed_caps contains the relevant
        #   capabilities, to save on re-retrieving irrelevant data.
        self.got_new_buddies(conn, changed_caps.keys())

    def members_changed_cb(self, conn, message, added, removed, local_pending,
            remote_pending, actor, reason):

        # Note: new buddies are signalled here, but not buddies that were
        # offline and have now come online (or vice versa).

        if added:
            self.got_new_buddies(conn, added)
        if removed:
            for handle in removed:
                member = self.conn[CONNECTION].InspectHandles(
                    CONNECTION_HANDLE_TYPE_CONTACT, [handle])[0]
                print 'buddy went away: %s' % member

    def select_buddy(self, conn, new_buddies):
        alias_key = CONNECTION_INTERFACE_ALIASING + '/alias'
        for buddy_h in new_buddies:
            buddy = self.buddies[conn.object_path][buddy_h][alias_key]
            if self.check_buddy_caps(conn, buddy_h):
                print "Got %s, contacting!"%buddy
                self.contact(conn, buddy_h)
            else:
                print "Contact %s can't handle %s, skipping."%(buddy, SERVICE)

    def check_buddy_caps(self, conn, buddy_h):
        alias_key = CONNECTION_INTERFACE_ALIASING + '/alias'
        # caps = CONNECTION_INTERFACE_CONTACT_CAPABILITIES + '/caps' #old spelling
        caps = CONNECTION_INTERFACE_CONTACT_CAPABILITIES + '/capabilities'
        if caps not in self.buddies[conn.object_path][buddy_h]:
            print "No capabilities obtained for %s."%(self.buddies[conn.object_path][buddy_h][alias_key])
            return False
        for props, additional in self.buddies[conn.object_path][buddy_h][caps]:
            if props.get(CHANNEL_TYPE_DBUS_TUBE + '.ServiceName', '')==SERVICE:
                return True
        return False

    def contact(self, conn, buddy_h):
        conn[CONNECTION_INTERFACE_REQUESTS].EnsureChannel({
            CHANNEL + '.ChannelType'     : CHANNEL_TYPE_DBUS_TUBE,
            CHANNEL + '.TargetHandleType': CONNECTION_HANDLE_TYPE_CONTACT,
            CHANNEL + '.TargetHandle'    : buddy_h,
            CHANNEL_TYPE_DBUS_TUBE + '.ServiceName'    : SERVICE,
            },
            reply_handler = partial(self.tube_channel_cb, conn),
            error_handler = self.error_cb)

    def tube_channel_cb(self, conn, yours, path, properties):
        channel = telepathy.client.Channel(conn.service_name, path)
        addr = channel[CHANNEL_TYPE_DBUS_TUBE].Offer({}, SOCKET_ACCESS_CONTROL_LOCALHOST)
        channel[CHANNEL_INTERFACE_TUBE].connect_to_signal('TubeChannelStateChanged',
                    partial(self.tube_state_cb, conn, addr))

    def tube_state_cb(self, conn, addr, state):
        if state == TUBE_STATE_OPEN:
            print 'connected'
            tube = dbus.connection.Connection(addr)
            tube.add_signal_receiver(self.signal_cb)
            me = EgObject(tube, conn)
            other = tube.get_object(object_path=PATH)

            # now that the tube is open, can start using it!
            self.use_connection(me, other)

    def use_connection(self, me, other):
        me.Hello('hello from %s'%getpass.getuser())
        other.Method("xyzzy %s"%getpass.getuser(),
                  reply_handler=self.reply_cb, error_handler=self.error_cb)

    def signal_cb(self, *args, **kwargs):
        print 'Signal:', args, kwargs

    def reply_cb(self, *args):
        print 'Reply:', args

    def error_cb(self, *args):
        print 'Error:', args

    @dbus.service.method(dbus_interface=CLIENT_HANDLER,
                           in_signature='ooa(oa{sv})aota{sv}', out_signature='')
    def HandleChannels(self, acct, conn_path, channels, reqs_satisfied,
                                           user_action_time, handler_info):
        print 'incoming channel'
        conn_name = conn_path.replace('/', '.')[1:]
        conn = telepathy.client.Connection(conn_name, conn_path)
        for path, props in channels:
            assert props[CHANNEL + '.ChannelType'] == CHANNEL_TYPE_DBUS_TUBE
            assert props[CHANNEL_TYPE_DBUS_TUBE + '.ServiceName'] == SERVICE
            assert not props[CHANNEL + '.Requested']
            channel = telepathy.client.Channel(conn_name, path)
            addr = channel[CHANNEL_TYPE_DBUS_TUBE].Accept(SOCKET_ACCESS_CONTROL_LOCALHOST)
            channel[CHANNEL_INTERFACE_TUBE].connect_to_signal(
              'TubeChannelStateChanged', partial(self.tube_state_cb, conn, addr))
            if len(handler_info):
                print 'handler info:',
                dbus_pprint(handler_info)


class EgObject(dbus.service.Object):
    def __init__(self, tube, conn):
        super(EgObject, self).__init__(tube, PATH)
        self.tube = tube

    @dbus.service.signal(dbus_interface=SERVICE, signature='s')
    def Hello(self, msg):
        pass

    @dbus.service.method(dbus_interface=SERVICE, in_signature='s',
                                                         out_signature='b')
    def Method(self, text):
        print "Method called: %s" % text
        return True


def connections_change_cb(e, *args, **kwargs):
    path = kwargs['path']
    if not path.startswith('/org/freedesktop/Telepathy/Connection/'):
        return

    status, reason = args
    service_name = path.replace('/', '.')[1:]

    if status == CONNECTION_STATUS_CONNECTED:
        print "new connection:", service_name, 'reason:', reason
        e.new_conn(telepathy.client.Connection(service_name, path))
    elif status == CONNECTION_STATUS_DISCONNECTED:
        print "connection gone:", service_name, 'reason:', reason

if __name__ == '__main__':

    # get the list of connections
    bus = dbus.SessionBus()

    e = Example()

    # add a handler that watches for changes
    bus.add_signal_receiver(partial(connections_change_cb, e),
        dbus_interface=CONNECTION, signal_name='StatusChanged',
        path_keyword='path')

    # work through the list of connections that are on-line
    am = AccountManager()
    for acct_path in am[DBUS_PROPERTIES].Get(ACCOUNT_MANAGER, 'ValidAccounts'):
        acct = Account(acct_path)
        conn_path = acct[DBUS_PROPERTIES].Get(ACCOUNT, 'Connection')
        if conn_path == '/':
            continue
        conn_name = conn_path.replace('/', '.')[1:]
        conn = telepathy.client.Connection(conn_name, conn_path)
        e.new_conn(conn)
        # note: the next step takes place in Example.ready_cb()

    try:
        gobject.MainLoop().run()
    except KeyboardInterrupt:
        print 'interrupted'

The python binding doesn't (yet) have AccountManager, so here's accountmgr.py

from telepathy.client.interfacefactory import InterfaceFactory
from telepathy.interfaces import ACCOUNT_MANAGER
import dbus

DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties'

class AccountManager(InterfaceFactory):
    service_name = 'org.freedesktop.Telepathy.AccountManager'
    object_path = '/org/freedesktop/Telepathy/AccountManager'

    # Some versions of Mission Control are only activatable under this
    # name, not under the generic AccountManager name
    MC5_name = 'org.freedesktop.Telepathy.MissionControl5'
    MC5_path = '/org/freedesktop/Telepathy/MissionControl5'


    def __init__(self, bus=None):
        if not bus:
            bus = dbus.Bus()

        try:
            object = bus.get_object(self.service_name, self.object_path)
        except:
            raise
            # try activating MissionControl5 (ugly work-around)
            mc5 = bus.get_object(self.MC5_name, self.MC5_path)
            import time
            time.sleep(1)
            object = bus.get_object(self.service_name, self.object_path)
        InterfaceFactory.__init__(self, object, ACCOUNT_MANAGER)

        # FIXME: make this async
        self.get_valid_interfaces().update(self[DBUS_PROPERTIES].Get(ACCOUNT_MANAGER, 'Interfaces'))

if __name__ == '__main__':
    am = AccountManager()
    print am[DBUS_PROPERTIES].Get(ACCOUNT_MANAGER, 'ValidAccounts')

Similarly, the python binding doesn't (yet) have Account, so here's account.py

from telepathy.client.interfacefactory import InterfaceFactory
from telepathy.interfaces import ACCOUNT, ACCOUNT_MANAGER
import dbus

DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties'

class Account(InterfaceFactory):

    def __init__(self, object_path, bus=None):
        if not bus:
            bus = dbus.Bus()
        service_name = 'org.freedesktop.Telepathy.AccountManager'
        #service_name = object_path.replace('/', '.')[1:]

        object = bus.get_object(service_name, object_path)
        InterfaceFactory.__init__(self, object, ACCOUNT)

        # FIXME: make this async
        self.get_valid_interfaces().update(self[DBUS_PROPERTIES].Get(ACCOUNT, 'Interfaces'))

if __name__ == '__main__':
    from accountmgr import AccountManager
    from telepathy.client import Connection
    import gobject
    import dbus.glib # sets up a main loop (!!!); somewhat deprecated
    am = AccountManager()
    def show_conn(conn):
        print repr(conn)
    for acct_path in am[DBUS_PROPERTIES].Get(ACCOUNT_MANAGER, 'ValidAccounts'):
        acct = Account(acct_path)
        conn = acct[DBUS_PROPERTIES].Get(ACCOUNT, 'Connection')
        print conn
        conn = Connection(conn.replace('/', '.')[1:], conn)
        conn.call_when_ready(show_conn)
    gobject.MainLoop().run()

Finally, a short helper module that pretty-prints dbus types, dbus_pprint.py

from pprint import pprint
import dbus

def db2p(db):
    if type(db)==dbus.Struct:
        return tuple(db2p(i) for i in db)
    if type(db)==dbus.Array:
        return [db2p(i) for i in db]
    if type(db)==dbus.Dictionary:
        return dict((db2p(key), db2p(value)) for key, value in db.items())
    if type(db)==dbus.String:
        return db+''
    if type(db)==dbus.UInt32:
        return db+0
    if type(db)==dbus.Boolean:
        return db==True
    return ('type: %s'%type(db), db)

def dbus_pprint(data):
    pprint(db2p(data))