Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[mosquitto-dev] checking wildcard subscriptions against wildcard ACLs

There has been a little discussion about applying ACLs to subscriptions on the list. 

And I saw a comment in read_handle_server.c:

...mosquitto_topic_matches_sub, which can't cope with checking subscriptions that have wildcards against ACLs that have wildcards...

Based on my work on topics for a persistence layer I thought this possible and came up with an algorithm implemented in the attached python script with test data, which may be useful.

Of course, it may not be bulletproof, and probably could be improved - if anyone sees a hole or finds a case that fails, please let me know.

ml


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

def _check_for_subtree(tpl):
    if tpl[-1] == u'#':
        del tpl[-1]  # trim '#'
        return True
    else:
        return False

def acl_check(ap, sp):  # ap: acl pattern; sp: subscription pattern
    apl = ap.split(u'/')  # make a list
    ap_stf = _check_for_subtree(apl)  # set the subtree flag and trim if True
    apn = len(apl)
    
    spl = sp.split(u'/')  # make a list
    sp_stf = _check_for_subtree(spl)  # set the subtree flag and trim if True
    spn = len(spl)
    
    if sp_stf and not ap_stf:
        return False  # subscription may have subtree only if acl does also
    
    if apn > spn:  # subscription must have at least as many levels as acl
        return False
    elif spn > apn:
        if ap_stf:
            apl += [u'+'] * (spn - apn)  # pad the apl with '+' for matching
        else:  # if no acl subtree, subscription levels cannot exceed acl's
            return False
    else:  # apn == spn
        pass
        
    for i in range(spn):  # check each level - all levels must match
        if apl[i] == u'+':  # any value in sub matches; '+' also matches
            pass
        elif apl[i] == spl[i]: # an explicit match
            pass
        else:  # an explict failure to match at this level
            return False
            
    return True

if __name__ == u"__main__":
    test_data_tuple_list = [
        # (acl pattern, subscription pattern, expected result)
        (u'foo/+/bar', u'foo/#', False),
        (u'foo/+/ba℞/#', u'foo/baz/ba℞', True),
        (u'foo/+/ba℞/#', u'foo/baz/ba℞/+', True),
        (u'foo/+/ba℞/#', u'foo/baz/ba℞/#', True),
        (u'foo/+/ba℞/#', u'foo/baz/+/#', False),
        (u'/+//#', u'/foo///#', True),
        (u'#', u'#', True),
        (u'#', u'+', True),
        (u'+', u'#', False),
        (u'+', u'+', True),
        
    ]
    
    for tdt in test_data_tuple_list:
        result = acl_check(*tdt[:2])
        test_result = u'passed' if result == tdt[2] else u'failed'
        
        print(
            (
                u"ap: {}; sp: {}; Expectation: {}; "
                u"Result: {}; Test Result: {}"
            ).format(tdt[0], tdt[1], tdt[2], result, test_result)
        )
        
    sys.exit()

Back to the top