TimberBot – blob

You can use Git to clone the repository via the web URL. Download snapshot (zip)
Clean up dice plugin and fix its help text
[TimberBot] / parser.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # SPDX-License-Identifier: GPL-3.0-or-later
5 # This file is part of TimberBot. <https://fietkau.software/timberbot>
6 # Copyright (C) 2014-2022 Julian Fietkau
8 import plugin_manager
10 class Parser:
11     def __init__(self, data_dir, send_callback, reply_callback, config_calls, special_action_call):
12         self.data_dir = data_dir
13         self.send_callback = send_callback
14         self.reply_callback = reply_callback
15         self.config_calls = config_calls
16         self.special_action = special_action_call
18         self.storage = {'names': {}, 'ignore_list': []}
19         self.storage['ignore_list'] = self.get_conf('main', 'ignore_list', '').split(' ')
21         self.bot_name = self.get_conf('main', 'nick')
22         plugins = self.get_conf('main', 'plugins')
23         if plugins == None:
24             plugins = 'names'
25         plugins = plugins.split(' ')
26         self.command_prefixes = self.get_conf('main', 'command_prefixes').split(' ')
27         if not self.command_prefixes:
28             self.command_prefixes = ['!']
29             self.set_conf('main', 'command_prefixes', '!')
30         self.plugin_manager = plugin_manager.PluginManager(self.data_dir, lambda message: self.send(message), lambda user, message: self.reply(user, message), self.config_calls, self.special_action, lambda: self.get_users())
31         self.plugin_manager.enable_plugins(*plugins)
33     def get_users(self, including_offline = False):
34         if including_offline:
35             names = [self.storage['names'][n]['name'] for n in self.storage['names']]
36         else:
37             names = [self.storage['names'][n]['name'] for n in self.storage['names'] if self.storage['names'][n]['status'] in ['online', 'prelim-online']]
38         if self.bot_name in names:
39             names.remove(self.bot_name)
40         return names
42     def get_conf(self, category, key, default = None):
43         if not isinstance(default, str):
44             default = str(default).lower()
45         value = self.config_calls['get'](category, key, default)
46         if category == 'main' and key in ['admins']:
47             result = [name.strip() for name in value.split(' ') if name != 'none']
48             return result
49         result = value
50         if value.lower() == 'none':
51             result = None
52         if value.lower() in ['true', 'yes', 'on']:
53             result = True
54         if value.lower() in ['false', 'no', 'off']:
55             result = False
56         return result
58     def set_conf(self, category, key, value):
59         if category == 'main' and key in ['admins']:
60             value.sort()
61             value = ' '.join(value)
62         if value in [None, '']:
63             value = 'none'
64         if value == True:
65             value = 'true'
66         if value == False:
67             value = 'false'
68         value = self.config_calls['set'](category, key, value)
70     def send(self, message, to_user = None):
71         if not self.get_conf('main', 'mute'):
72             self.send_callback(message)
74     def reply(self, user, message):
75         if not self.get_conf('main', 'mute'):
76             self.reply_callback(user, message)
78     def split_command(self, message):
79         result = []
80         is_command = False
81         for command_prefix in self.command_prefixes:
82             if message.startswith(command_prefix):
83                 is_command = True
84                 message = message[len(command_prefix):]
85                 break
86         if not is_command:
87             return result
88         current = ''
89         index = 0
90         protected = False
91         while index < len(message):
92             char = message[index]
93             if char == ' ' and not protected:
94                 if len(current) > 0:
95                     if current[-1] == ',':
96                         current = current[:-1]
97                     result.append(current)
98                     current = ''
99             elif char in ['"', '\''] and not protected:
100                 protected = char
101             elif char == protected:
102                 protected = False
103             else:
104                 current = current + char
105             index = index + 1
106         if len(current) > 0:
107             if current[-1] == ',':
108                 current = current[:-1]
109             result.append(current)
110         return result
112     def display_name(self, user):
113         if user.lower() not in self.storage['names'] or (user.lower() in self.storage['names'] and user.lower() == self.storage['names'][user.lower()]['name']):
114             if user.lower() not in self.storage['names']:
115                 self.storage['names'][user.lower()] = {'name': user, 'status': None}
116             else:
117                 self.storage['names'][user.lower()]['name'] = user
118         if user != user.lower() and user != self.storage['names'][user.lower()]['name']:
119             self.storage['names'][user.lower()]['name'] = user
120         return self.storage['names'][user.lower()]['name']
122     def special_event(self, event_type, params):
123         if event_type == 'ME':
124             user = params['user']
125             message = params['message']
126             if user.lower() == self.bot_name.lower() or len(message) == 0:
127                 return
128             user = self.display_name(user)
129             params['user'] = user
130             self.plugin_manager.special_event(event_type, params)
132     def is_admin(self, user, params):
133         result = False
134         if self.get_conf('main', 'mods_as_admins') and params['mod']:
135             result = True
136         if user.lower() in self.get_conf('main', 'admins'):
137             result = True
138         return result
140     def parse(self, user, message, params):
142         if user.lower() == self.bot_name.lower() or len(message) == 0:
143             return
144         self.channel_join(user, False, 'prelim-online')
146         user = self.display_name(user)
148         is_command = False
149         for command_prefix in self.command_prefixes:
150             if message.startswith(command_prefix):
151                 is_command = True
152                 break
154         if not is_command:
155             if self.is_admin(user, params):
156                 self.plugin_manager.admin_message_read(user, message)
157             self.plugin_manager.message_priority_read(user, message)
158             if user.lower() not in self.storage['ignore_list']:
159                 self.plugin_manager.message_read(user, message)
161         if is_command:
163             reserved_commands = ['about', 'help']
164             all_commands = reserved_commands + self.plugin_manager.get_provided_commands()
165             all_commands.sort()
167             command = self.split_command(message)
169             raw_params = ''
170             if message.find(' ') != -1:
171                 raw_params = message[message.index(' ')+1:]
173             self.plugin_manager.command_priority_read(user, command, raw_params)
174             if user.lower() in self.storage['ignore_list']:
175                 return
176             if command[0] in self.plugin_manager.get_provided_commands() and command[0] not in reserved_commands:
177                 self.plugin_manager.command_read(user, command, raw_params)
178                 return
180             if command[0] == 'about':
181                 if self.bot_name == 'TimberBot':
182                     about = 'I am TimberBot, a chat bot written in Python by Julian Fietkau.'
183                 else:
184                     about = 'I am ' + self.bot_name + ', a chat bot based on TimberBot by Julian Fietkau.'
185                 self.reply(user, about + ' See https://fietkau.software/timberbot for further information.')
186                 return
188             if command[0] == 'help':
189                 if len(command) == 1:
190                     command_list = ['help']
191                     command_list += list(self.plugin_manager.get_help_text().keys())
192                     command_list.sort()
193                     command_list_text = ' - '.join([self.command_prefixes[0] + cmd for cmd in command_list])
194                     self.reply(user, 'Main commands are: ' + command_list_text + '. Type "' + self.command_prefixes[0] + 'help commandname" for details (e.g. "' + self.command_prefixes[0] + 'help ' + command_list[0] + '"). Type "' + self.command_prefixes[0] + 'help commands" for a full command list.')
195                 if len(command) >= 2:
196                     if command[1] == 'commands':
197                         command_list = ['about', 'help']
198                         command_list = command_list + self.plugin_manager.get_provided_commands()
199                         command_list.sort()
200                         command_list_text = ' - '.join([self.command_prefixes[0] + cmd for cmd in command_list])
201                         self.reply(user, command_list_text)
202                     elif command[1] == 'help':
203                         self.reply(user, 'The ' + self.command_prefixes[0] + 'help command provides usage directions for any active command provided by a plugin. Type "' + self.command_prefixes[0] + 'help commandname" for details, or just "' + self.command_prefixes[0] + 'help" to get a list of commands that have help text available.')
204                     elif command[1] in self.plugin_manager.get_help_text():
205                         self.reply(user, self.plugin_manager.get_help_text()[command[1]])
206                 return
208             if command[0] == 'admin':
209                 if self.is_admin(user, params):
210                     if len(command) > 1:
211                         command = command[1:]
212                         raw_params = raw_params[raw_params.index(' ')+1:]
213                     else:
214                         return
215                 else:
216                     self.reply(user, 'Only ' + self.bot_name + ' admins can do that.')
218             if not self.is_admin(user, params):
219                 return
221             # From this point onward, it is guaranteed that the user is a bot admin (else the method has already returned).
223             if command[0] in self.plugin_manager.get_provided_admin_commands():
224                 if self.is_admin(user, params):
225                     self.plugin_manager.admin_command_read(user, command, raw_params)
227             core_admin_commands = ['add', 'remove', 'enable', 'disable', 'reload', 'flush_names', 'mute', 'unmute', 'ignore', 'conf', 'plugins']
228             if command[0] in core_admin_commands:
230                 if command[0] == 'add' and len(command) >= 2:
231                     admins = self.get_conf('main', 'admins')
232                     successes = []
233                     for user_name in command[1:]:
234                         if user_name.lower() not in admins:
235                             admins.append(user_name.lower())
236                             successes.append(user_name)
237                         else:
238                             self.reply(user, self.display_name(user_name) + ' is already an administrator.')
239                     self.set_conf('main', 'admins', admins)
240                     if len(successes) == 1:
241                         self.reply(user, 'Added ' + successes[0] + ' as an administrator.')
242                     if len(successes) > 1:
243                         self.reply(user, 'Added ' + ', '.join(successes) + ' as administrators.')
245                 if command[0] == 'remove' and len(command) >= 2:
246                     admins = self.get_conf('main', 'admins')
247                     successes = []
248                     for user_name in command[1:]:
249                         if user_name.lower() in admins and user_name.lower() != user.lower():
250                             admins.remove(user_name.lower())
251                             successes.append(user_name)
252                         elif user_name.lower() in admins and user_name.lower() == user.lower():
253                             self.reply(user, 'You cannot remove yourself.')
254                         else:
255                             self.reply(user, self.display_name(user_name) + ' is not an administrator.')
256                     self.set_conf('main', 'admins', admins)
257                     if len(successes) >= 1:
258                         self.reply(user, 'Removed ' + ', '.join(successes) + ' from administrator list.')
260                 if command[0] == 'enable' and len(command) >= 2:
261                     for plugin_name in command[1:]:
262                         if plugin_name not in self.plugin_manager.active_plugins:
263                             self.plugin_manager.scan_plugins()
264                             if plugin_name in self.plugin_manager.available_plugins:
265                                 try:
266                                     status = self.plugin_manager.enable_plugins(plugin_name)
267                                     if status == 0:
268                                         self.reply(user, 'Enabled ' + plugin_name + ' plugin.')
269                                 except RuntimeError as error:
270                                     self.reply(user, str(error))
271                             else:
272                                 self.reply(user, 'Plugin ' + plugin_name + ' not found.')
273                         else:
274                             self.reply(user, 'Plugin ' + plugin_name + ' is already enabled.')
276                 if command[0] == 'disable' and len(command) >= 2:
277                     for plugin_name in command[1:]:
278                         if plugin_name in self.plugin_manager.active_plugins:
279                             try:
280                                 status = self.plugin_manager.disable_plugins(plugin_name)
281                                 if status == 0:
282                                     self.reply(user, 'Disabled ' + plugin_name + ' plugin.')
283                             except RuntimeError as error:
284                                 self.reply(user, str(error))
285                         else:
286                             self.reply(user, 'Plugin ' + plugin_name + ' is not enabled.')
288                 if command[0] == 'reload' and len(command) >= 2:
289                     for plugin_name in command[1:]:
290                         if plugin_name in self.plugin_manager.active_plugins:
291                             self.plugin_manager.reload_plugins(plugin_name)
292                             self.reply(user, 'Reloaded ' + plugin_name + ' plugin.')
293                         else:
294                             self.reply(user, 'Plugin ' + plugin_name + ' is not enabled.')
296                 if command[0] == 'plugins':
297                     print_lists = ['active', 'additional']
298                     if len(command) >= 2 and command[1] == 'active':
299                         print_lists = ['active']
300                     if len(command) >= 2 and command[1] == 'additional':
301                         print_lists = ['additional']
302                     if len(command) >= 2 and command[1] == 'all':
303                         print_lists = ['all']
304                     if 'active' in print_lists:
305                         self.reply(user, 'Active plugins: ' + ', '.join(sorted(self.plugin_manager.active_plugins.keys())))
306                     if 'additional' in print_lists:
307                         additional_plugins = [plugin for plugin in self.plugin_manager.available_plugins if plugin not in self.plugin_manager.active_plugins]
308                         self.reply(user, 'Additionally available plugins: ' + ', '.join(sorted(additional_plugins)))
309                     if 'all' in print_lists:
310                         self.reply(user, 'All available plugins: ' + ', '.join(sorted(self.plugin_manager.available_plugins)))
312                 if command[0] == 'mute':
313                     self.reply(user, 'The bot is now muted and will send no more messages until an admin uses the "unmute" command.')
314                     self.set_conf('main', 'mute', True)
315                 if command[0] == 'unmute':
316                     self.set_conf('main', 'mute', False)
317                     self.reply(user, 'The bot is now unmuted and will send messages as normal.')
319                 if command[0] == 'ignore':
320                     if len(command) > 1:
321                         additions = []
322                         removals = []
323                         for name in command[1:]:
324                             if name.lower() in self.storage['ignore_list']:
325                                 self.storage['ignore_list'].remove(name)
326                                 removals.append(name)
327                             else:
328                                 self.storage['ignore_list'].append(name)
329                                 additions.append(name)
330                         reply = ''
331                         if len(additions) > 0:
332                             reply += 'Added ' + ', '.join(additions) + ' to the ignore list. '
333                         if len(removals) > 0:
334                             reply += 'Removed ' + ', '.join(removals) + ' from the ignore list. '
335                         if len(reply) > 0:
336                             reply = reply[:-1]
337                             self.reply(user, reply)
338                             self.set_conf('main', 'ignore_list', self.storage['ignore_list'].join(' '))
340                 if command[0] == 'flush_names':
341                     for name in self.storage['names']:
342                         if self.storage['names'][name]['status'] in ['online', 'prelim-online'] and name.lower() != self.bot_name:
343                             self.storage['names'][name]['status'] = 'offline'
344                     self.reply(user, 'Chatter list flushed.')
346                 if command[0] == 'conf':
347                     if len(command) < 3:
348                         return
349                     if len(command) == 3:
350                         result = self.get_conf(command[1].lower(), command[2].lower())
351                         if type(result) == list:
352                             result = ', '.join(result)
353                         self.reply(user, 'Configuration for "' + command[1].lower() + ' -> ' + command[2].lower() + '" is "' + str(result) + '".')
354                         return
355                     if command[3].lower() in ['true', 'yes', 'on', '1']:
356                         command[3] = True
357                     elif command[3].lower() in ['false', 'no', 'off', '0']:
358                         command[3] = False
359                     self.set_conf(command[1].lower(), command[2].lower(), command[3])
360                     self.reply(user, 'Configuration for "' + command[1].lower() + ' -> ' + command[2].lower() + '" is now "' + str(command[3]) + '".')
362     def scheduled_update(self):
363         self.plugin_manager.check_schedule()
365     def channel_join(self, name, batch = False, newstatus = 'online'):
366         if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
367             name = name[1:]
369         new_name = self.display_name(name)
370         if isinstance(new_name, str):
371             name = new_name
372         if name.lower() not in self.storage['names']:
373             self.storage['names'][name.lower()] = {}
374             self.storage['names'][name.lower()]['name'] = name
375             self.storage['names'][name.lower()]['status'] = None
376         if self.storage['names'][name.lower()]['status'] != newstatus:
377             if newstatus == 'prelim-online' and self.storage['names'][name.lower()]['status'] == 'online':
378                 return
379             if self.storage['names'][name.lower()]['status'] in [None, 'offline']:
380                 if name.lower() not in self.storage['ignore_list']:
381                     self.plugin_manager.user_join(name, batch)
382             self.storage['names'][name.lower()]['status'] = newstatus
384     def channel_part(self, name, batch = False):
385         if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
386             name = name[1:]
387         if name.lower() == self.bot_name.lower():
388             return
389         if name.lower() not in self.storage['names']:
390             self.storage['names'][name.lower()] = {}
391             self.storage['names'][name.lower()]['name'] = None
392             self.storage['names'][name.lower()]['status'] = None
393         if self.storage['names'][name.lower()]['status'] in [None, 'online', 'prelim-online']:
394             new_name = self.display_name(name)
395             if isinstance(new_name, str):
396                 name = new_name
397             self.storage['names'][name.lower()]['status'] = 'offline'
398             if name.lower() not in self.storage['ignore_list']:
399                 self.plugin_manager.user_part(name, batch)
401     def update_names(self, names_list, silent, adding = False):
402         names = []
403         for name in names_list:
404             if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
405                 name = name[1:]
406             if len(name) > 0 and name.lower() != self.bot_name.lower():
407                 names.append(name)
408         if len(names) <= 1 or adding:
409             return
410         for name in self.storage['names']:
411             if name.lower() not in [n.lower() for n in names]:
412                 if self.storage['names'][name.lower()]['status'] in ['online', 'prelim-online']:
413                     self.channel_part(name, True)
414         for name in names:
415             if name.lower() in self.storage['names'] and 'status' in self.storage['names'][name.lower()] and self.storage['names'][name.lower()]['status'] == 'prelim-online':
416                 self.storage['names'][name.lower()]['status'] = 'online'
417             else:
418                 # At this point we know that name is someone currently in chat.
419                 # self.channel_join does the check whether it is someone new itself.
420                 self.channel_join(name, True)