from .utils import get_args_count
from collections import namedtuple
Context = namedtuple('Context', ('function', 'priority'))
[docs]class Botify(object):
"""Framework for creating tools which perform various tasks
based on natural language.
This framework enables developers to create a mapping between
a natural language and user defined functions. This technique can
then be used to create a tool which carries out certain tasks.
Attributes
----------
StrictModeEnabled : bool
whether strict mode is enabled(Default True).
Parameters
----------
is_token_data_callback : function
A function to determine whether a string token is a valid data.
The function should take a single parameter as input and return
True or False based on whether the token recieved is a valid data.
`data` is defined as a token which yields information which needs
to be passed to one of the defined tasks.
clean_data_callback : function
A function to perform any modification prior to passing the string
token to the appropriate task. This callback if specified, will be used
to generate the parameters which are to be passed to the tasks.
"""
ACTION_DELETE = 'delete'
ACTION_UPDATE_RULE = 'update_rule'
ACTION_UPDATE_CONTEXT = 'update_context'
def __init__(self, is_token_data_callback=None,
clean_data_callback=None):
self._parsed_list = []
self._most_recent_report = []
if is_token_data_callback is None:
self._is_token_data_callback = lambda x: False
else:
self._is_token_data_callback = is_token_data_callback
if clean_data_callback is None:
self._clean_data_callback = lambda x: x
else:
self._clean_data_callback = clean_data_callback
self._clean_data_callback = clean_data_callback
self.strict_mode_enabled = True
self._tasks = {}
#self._rule_modifiers = {}
#self._context_modifiers = {}
self._modifiers = {}
def _get_priority_set(self):
s = set()
for item in self._parsed_list:
if self._is_token_data_callback(item) is False:
s.add(item['context'][1])
return s
def _get_action_mapping(self):
return {self.ACTION_DELETE : self._action_delete,
self.ACTION_UPDATE_RULE : self._action_update_rule,
self.ACTION_UPDATE_CONTEXT : self._action_update_context,
}
[docs] def add_task(self, keywords, context, rule):
"""Map a function to a list of keywords
Parameters
----------
keywords : iterable of str
sequence of strings which should trigger the given function
context : Context
A Context object created using desired function
rule : tuple
A tuple of integers, which act as relative indices using which data
is extracted to be passed to the function passed via context.
"""
for keyword in keywords:
self._tasks[keyword] = {'context': context, 'rule': rule}
[docs] def add_modifier(self, modifier, keywords, relative_pos,
action, parameter=None):
"""Modify existing tasks based on presence of a keyword.
Parameters
----------
modifier : str
A string value which would trigger the given Modifier.
keywords : iterable of str
sequence of strings which are keywords for some task,
which has to be modified.
relative_pos : int
Relative position of the task which should be modified
in the presence of `modifier`. It's value can never be 0. Data
fields should also be considered when calculating the relative
position.
action : str
String value representing the action which should be performed
on the task. Action represents calling a arbitrary function
to perform th emodification.
parameter : object
value required by the `action`.(Default None)
"""
if relative_pos == 0:
raise ValueError("relative_pos cannot be 0")
modifier_dict = self._modifiers.get(modifier, {})
value = (action, parameter, relative_pos)
for keyword in keywords:
action_list = list(modifier_dict.get(keyword, []))
action_list.append(value)
modifier_dict[keyword] = tuple(action_list)
self._modifiers[modifier] = modifier_dict
[docs] def parse(self, text):
"""Parse the string `text` and return a tuple of left over Data fields.
Parameters
----------
text : str
A string to be parsed
Returns
-------
result : tuple
A tuple of left over Data after processing
"""
self._parsed_list = []
self._most_recent_report = []
self._token_list = text.lower().split()
modifier_index_list = []
for item in self._token_list:
if(self._is_token_data_callback(item)):
self._parsed_list.append(self._clean_data_callback(item))
if item in self._tasks:
d = {}
d['context'] = self._tasks[item]['context']
d['rule'] = self._tasks[item]['rule']
d['task'] = item
self._parsed_list.append(d)
if item in self._modifiers:
modifier_index_list.append((len(self._parsed_list), item))
self._apply_modifiers(modifier_index_list)
return self._evaluate()
def _apply_modifiers(self, modifier_index_list):
for pos, item in modifier_index_list:
for key, action_list in self._modifiers[item].items():
for value in action_list:
try:
task_index = pos + value[2]
if value[2] > 0:
task_index -= 1
if self._is_token_data_callback(
self._parsed_list[task_index]):
pass
elif self._parsed_list[task_index]['task'] == key:
action_map = self._get_action_mapping()
action = action_map[value[0]]
if value[1] is None:
action(task_index)
else:
action(task_index, value[1])
except IndexError:
pass
def _get_report(self):
"""Return a list of dicts with parsing info.
The dicts contain information about how the string was parsed to
obtain the final result. This information can be used for debugging
purposes.
"""
return self._most_recent_report[:]
def _evaluate(self):
for priority in sorted(self._get_priority_set(),reverse=True):
while(True):
temp = []
offset = 0
for index, item in enumerate(self._parsed_list):
if not self._is_token_data_callback(item):
if(item['context'].priority == priority):
temp.append(index-offset)
offset += get_args_count(item['context'].function)
if(len(temp) == 0):
break;
for task_index in temp:
# While Debugging Uncomment the next line.
# It gives a very detailed output
# print(task_index, temp, self._parsed_list)
# TODO: use the return value of _find_data
self._find_data(-1, task_index)
for item in self._parsed_list:
if self._is_token_data_callback(item):
pass
else:
raise ValueError("Unable to Parse")
return tuple(self._parsed_list)
def _find_data(self, caller_index, task_index):
# if task_index does not have a dict that means
# it has already been evaluated
# so we just need to return
try:
args_count = get_args_count(self._parsed_list[task_index]['context'].function)
except (TypeError, IndexError):
return False
should_repeat = not self.strict_mode_enabled
while(True):
data_index_list = []
rule = self._parsed_list[task_index]['rule']
for i in rule:
k = task_index + i
if 0 <= k < len(self._parsed_list):
if self._is_token_data_callback(self._parsed_list[k]):
data_index_list.append(k)
elif caller_index != k:
# the above check is necessary to prevent recursion cycles
status = self._find_data(task_index,k)
if status is True\
and 0 <= k < len(self._parsed_list)\
and self._is_token_data_callback(self._parsed_list[k])\
and k not in data_index_list:
data_index_list.append(k)
else:
return False
if len(data_index_list) == args_count:
break
if len(data_index_list) == args_count:
self._apply_task(task_index, data_index_list)
return True
else:
if should_repeat:
self._parsed_list[task_index]['rule'] = self._get_nonstrict_rule(task_index)
should_repeat = False
else:
raise ValueError('Unable to Parse. Try a different Input')
def _action_delete(self, task_index, offset):
del self._parsed_list[task_index + offset]
def _action_update_rule(self, task_index, rule):
self._parsed_list[task_index]['rule'] = rule
def _action_update_context(self, task_index, context):
self._parsed_list[task_index]['context'] = context
def _apply_task(self, task_index, data_index_list):
task_context = self._parsed_list[task_index]['context']
data_list = [self._parsed_list[index] for index in data_index_list]
offset = 0
del self._parsed_list[task_index]
for index,item in enumerate(data_index_list):
if item > task_index:
data_index_list[index] -= 1
else:
offset += 1
for index in sorted(data_index_list, reverse=True):
del self._parsed_list[index]
res = task_context.function(*data_list)
self._most_recent_report.append({'function': task_context.function.__name__,
'parameters': tuple(data_list),
'result': res})
if self._is_token_data_callback(res) is True:
self._parsed_list.insert(task_index-offset, res)
def _get_default_rule(self, task_index):
l = []
k = [-1,1]
for i in range(get_args_count(self._parsed_list[task_index]['context'].function)):
l += list(map(lambda x: (i+1)*x, k))
return l
def _get_nonstrict_rule(self, task_index):
strictrule_list = list(self._parsed_list[task_index]['rule'])
l = self._get_default_rule(task_index)
for item in l:
if item not in strictrule_list:
strictrule_list.append(item)
return tuple(strictrule_list)