TimberBot – blob
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # SPDX-License-Identifier: GPL-3.0-or-later
5 # This file is part of TimberBot. <https://fietkau.software/timberbot>
6 # Copyright (C) 2014-2022 Julian Fietkau
8 import plugin_manager
10 class Parser:
11 def __init__(self, data_dir, send_callback, reply_callback, config_calls, special_action_call):
12 self.data_dir = data_dir
13 self.send_callback = send_callback
14 self.reply_callback = reply_callback
15 self.config_calls = config_calls
16 self.special_action = special_action_call
18 self.storage = {'names': {}, 'ignore_list': []}
19 self.storage['ignore_list'] = self.get_conf('main', 'ignore_list', '').split(' ')
21 self.bot_name = self.get_conf('main', 'nick')
22 plugins = self.get_conf('main', 'plugins')
23 if plugins == None:
24 plugins = 'names'
25 plugins = plugins.split(' ')
26 self.command_prefixes = self.get_conf('main', 'command_prefixes').split(' ')
27 if not self.command_prefixes:
28 self.command_prefixes = ['!']
29 self.set_conf('main', 'command_prefixes', '!')
30 self.plugin_manager = plugin_manager.PluginManager(self.data_dir, lambda message: self.send(message), lambda user, message: self.reply(user, message), self.config_calls, self.special_action, lambda: self.get_users())
31 self.plugin_manager.enable_plugins(*plugins)
33 def get_users(self, including_offline = False):
34 if including_offline:
35 names = [self.storage['names'][n]['name'] for n in self.storage['names']]
36 else:
37 names = [self.storage['names'][n]['name'] for n in self.storage['names'] if self.storage['names'][n]['status'] in ['online', 'prelim-online']]
38 if self.bot_name in names:
39 names.remove(self.bot_name)
40 return names
42 def get_conf(self, category, key, default = None):
43 if not isinstance(default, str):
44 default = str(default).lower()
45 value = self.config_calls['get'](category, key, default)
46 if category == 'main' and key in ['admins']:
47 result = [name.strip() for name in value.split(' ') if name != 'none']
48 return result
49 result = value
50 if value.lower() == 'none':
51 result = None
52 if value.lower() in ['true', 'yes', 'on']:
53 result = True
54 if value.lower() in ['false', 'no', 'off']:
55 result = False
56 return result
58 def set_conf(self, category, key, value):
59 if category == 'main' and key in ['admins']:
60 value.sort()
61 value = ' '.join(value)
62 if value in [None, '']:
63 value = 'none'
64 if value == True:
65 value = 'true'
66 if value == False:
67 value = 'false'
68 value = self.config_calls['set'](category, key, value)
70 def send(self, message, to_user = None):
71 if not self.get_conf('main', 'mute'):
72 self.send_callback(message)
74 def reply(self, user, message):
75 if not self.get_conf('main', 'mute'):
76 self.reply_callback(user, message)
78 def split_command(self, message):
79 result = []
80 is_command = False
81 for command_prefix in self.command_prefixes:
82 if message.startswith(command_prefix):
83 is_command = True
84 message = message[len(command_prefix):]
85 break
86 if not is_command:
87 return result
88 current = ''
89 index = 0
90 protected = False
91 while index < len(message):
92 char = message[index]
93 if char == ' ' and not protected:
94 if len(current) > 0:
95 if current[-1] == ',':
96 current = current[:-1]
97 result.append(current)
98 current = ''
99 elif char in ['"', '\''] and not protected:
100 protected = char
101 elif char == protected:
102 protected = False
103 else:
104 current = current + char
105 index = index + 1
106 if len(current) > 0:
107 if current[-1] == ',':
108 current = current[:-1]
109 result.append(current)
110 return result
112 def display_name(self, user):
113 if user.lower() not in self.storage['names'] or (user.lower() in self.storage['names'] and user.lower() == self.storage['names'][user.lower()]['name']):
114 if user.lower() not in self.storage['names']:
115 self.storage['names'][user.lower()] = {'name': user, 'status': None}
116 else:
117 self.storage['names'][user.lower()]['name'] = user
118 if user != user.lower() and user != self.storage['names'][user.lower()]['name']:
119 self.storage['names'][user.lower()]['name'] = user
120 return self.storage['names'][user.lower()]['name']
122 def special_event(self, event_type, params):
123 if event_type == 'ME':
124 user = params['user']
125 message = params['message']
126 if user.lower() == self.bot_name.lower() or len(message) == 0:
127 return
128 user = self.display_name(user)
129 params['user'] = user
130 self.plugin_manager.special_event(event_type, params)
132 def is_admin(self, user, params):
133 result = False
134 if self.get_conf('main', 'mods_as_admins') and params['mod']:
135 result = True
136 if user.lower() in self.get_conf('main', 'admins'):
137 result = True
138 return result
140 def parse(self, user, message, params):
142 if user.lower() == self.bot_name.lower() or len(message) == 0:
143 return
144 self.channel_join(user, False, 'prelim-online')
146 user = self.display_name(user)
148 is_command = False
149 for command_prefix in self.command_prefixes:
150 if message.startswith(command_prefix):
151 is_command = True
152 break
154 if not is_command:
155 if self.is_admin(user, params):
156 self.plugin_manager.admin_message_read(user, message)
157 self.plugin_manager.message_priority_read(user, message)
158 if user.lower() not in self.storage['ignore_list']:
159 self.plugin_manager.message_read(user, message)
161 if is_command:
163 reserved_commands = ['about', 'help']
164 all_commands = reserved_commands + self.plugin_manager.get_provided_commands()
165 all_commands.sort()
167 command = self.split_command(message)
169 raw_params = ''
170 if message.find(' ') != -1:
171 raw_params = message[message.index(' ')+1:]
173 self.plugin_manager.command_priority_read(user, command, raw_params)
174 if user.lower() in self.storage['ignore_list']:
175 return
176 if command[0] in self.plugin_manager.get_provided_commands() and command[0] not in reserved_commands:
177 self.plugin_manager.command_read(user, command, raw_params)
178 return
180 if command[0] == 'about':
181 if self.bot_name == 'TimberBot':
182 about = 'I am TimberBot, a chat bot written in Python by Julian Fietkau.'
183 else:
184 about = 'I am ' + self.bot_name + ', a chat bot based on TimberBot by Julian Fietkau.'
185 self.reply(user, about + ' See https://fietkau.software/timberbot for further information.')
186 return
188 if command[0] == 'help':
189 if len(command) == 1:
190 command_list = ['help']
191 command_list += list(self.plugin_manager.get_help_text().keys())
192 command_list.sort()
193 command_list_text = ' - '.join([self.command_prefixes[0] + cmd for cmd in command_list])
194 self.reply(user, 'Main commands are: ' + command_list_text + '. Type "' + self.command_prefixes[0] + 'help commandname" for details (e.g. "' + self.command_prefixes[0] + 'help ' + command_list[0] + '"). Type "' + self.command_prefixes[0] + 'help commands" for a full command list.')
195 if len(command) >= 2:
196 if command[1] == 'commands':
197 command_list = ['about', 'help']
198 command_list = command_list + self.plugin_manager.get_provided_commands()
199 command_list.sort()
200 command_list_text = ' - '.join([self.command_prefixes[0] + cmd for cmd in command_list])
201 self.reply(user, command_list_text)
202 elif command[1] == 'help':
203 self.reply(user, 'The ' + self.command_prefixes[0] + 'help command provides usage directions for any active command provided by a plugin. Type "' + self.command_prefixes[0] + 'help commandname" for details, or just "' + self.command_prefixes[0] + 'help" to get a list of commands that have help text available.')
204 elif command[1] in self.plugin_manager.get_help_text():
205 self.reply(user, self.plugin_manager.get_help_text()[command[1]])
206 return
208 if command[0] == 'admin':
209 if self.is_admin(user, params):
210 if len(command) > 1:
211 command = command[1:]
212 raw_params = raw_params[raw_params.index(' ')+1:]
213 else:
214 return
215 else:
216 self.reply(user, 'Only ' + self.bot_name + ' admins can do that.')
218 if not self.is_admin(user, params):
219 return
221 # From this point onward, it is guaranteed that the user is a bot admin (else the method has already returned).
223 if command[0] in self.plugin_manager.get_provided_admin_commands():
224 if self.is_admin(user, params):
225 self.plugin_manager.admin_command_read(user, command, raw_params)
227 core_admin_commands = ['add', 'remove', 'enable', 'disable', 'reload', 'flush_names', 'mute', 'unmute', 'ignore', 'conf', 'plugins']
228 if command[0] in core_admin_commands:
230 if command[0] == 'add' and len(command) >= 2:
231 admins = self.get_conf('main', 'admins')
232 successes = []
233 for user_name in command[1:]:
234 if user_name.lower() not in admins:
235 admins.append(user_name.lower())
236 successes.append(user_name)
237 else:
238 self.reply(user, self.display_name(user_name) + ' is already an administrator.')
239 self.set_conf('main', 'admins', admins)
240 if len(successes) == 1:
241 self.reply(user, 'Added ' + successes[0] + ' as an administrator.')
242 if len(successes) > 1:
243 self.reply(user, 'Added ' + ', '.join(successes) + ' as administrators.')
245 if command[0] == 'remove' and len(command) >= 2:
246 admins = self.get_conf('main', 'admins')
247 successes = []
248 for user_name in command[1:]:
249 if user_name.lower() in admins and user_name.lower() != user.lower():
250 admins.remove(user_name.lower())
251 successes.append(user_name)
252 elif user_name.lower() in admins and user_name.lower() == user.lower():
253 self.reply(user, 'You cannot remove yourself.')
254 else:
255 self.reply(user, self.display_name(user_name) + ' is not an administrator.')
256 self.set_conf('main', 'admins', admins)
257 if len(successes) >= 1:
258 self.reply(user, 'Removed ' + ', '.join(successes) + ' from administrator list.')
260 if command[0] == 'enable' and len(command) >= 2:
261 for plugin_name in command[1:]:
262 if plugin_name not in self.plugin_manager.active_plugins:
263 self.plugin_manager.scan_plugins()
264 if plugin_name in self.plugin_manager.available_plugins:
265 try:
266 status = self.plugin_manager.enable_plugins(plugin_name)
267 if status == 0:
268 self.reply(user, 'Enabled ' + plugin_name + ' plugin.')
269 except RuntimeError as error:
270 self.reply(user, str(error))
271 else:
272 self.reply(user, 'Plugin ' + plugin_name + ' not found.')
273 else:
274 self.reply(user, 'Plugin ' + plugin_name + ' is already enabled.')
276 if command[0] == 'disable' and len(command) >= 2:
277 for plugin_name in command[1:]:
278 if plugin_name in self.plugin_manager.active_plugins:
279 try:
280 status = self.plugin_manager.disable_plugins(plugin_name)
281 if status == 0:
282 self.reply(user, 'Disabled ' + plugin_name + ' plugin.')
283 except RuntimeError as error:
284 self.reply(user, str(error))
285 else:
286 self.reply(user, 'Plugin ' + plugin_name + ' is not enabled.')
288 if command[0] == 'reload' and len(command) >= 2:
289 for plugin_name in command[1:]:
290 if plugin_name in self.plugin_manager.active_plugins:
291 self.plugin_manager.reload_plugins(plugin_name)
292 self.reply(user, 'Reloaded ' + plugin_name + ' plugin.')
293 else:
294 self.reply(user, 'Plugin ' + plugin_name + ' is not enabled.')
296 if command[0] == 'plugins':
297 print_lists = ['active', 'additional']
298 if len(command) >= 2 and command[1] == 'active':
299 print_lists = ['active']
300 if len(command) >= 2 and command[1] == 'additional':
301 print_lists = ['additional']
302 if len(command) >= 2 and command[1] == 'all':
303 print_lists = ['all']
304 if 'active' in print_lists:
305 self.reply(user, 'Active plugins: ' + ', '.join(sorted(self.plugin_manager.active_plugins.keys())))
306 if 'additional' in print_lists:
307 additional_plugins = [plugin for plugin in self.plugin_manager.available_plugins if plugin not in self.plugin_manager.active_plugins]
308 self.reply(user, 'Additionally available plugins: ' + ', '.join(sorted(additional_plugins)))
309 if 'all' in print_lists:
310 self.reply(user, 'All available plugins: ' + ', '.join(sorted(self.plugin_manager.available_plugins)))
312 if command[0] == 'mute':
313 self.reply(user, 'The bot is now muted and will send no more messages until an admin uses the "unmute" command.')
314 self.set_conf('main', 'mute', True)
315 if command[0] == 'unmute':
316 self.set_conf('main', 'mute', False)
317 self.reply(user, 'The bot is now unmuted and will send messages as normal.')
319 if command[0] == 'ignore':
320 if len(command) > 1:
321 additions = []
322 removals = []
323 for name in command[1:]:
324 if name.lower() in self.storage['ignore_list']:
325 self.storage['ignore_list'].remove(name)
326 removals.append(name)
327 else:
328 self.storage['ignore_list'].append(name)
329 additions.append(name)
330 reply = ''
331 if len(additions) > 0:
332 reply += 'Added ' + ', '.join(additions) + ' to the ignore list. '
333 if len(removals) > 0:
334 reply += 'Removed ' + ', '.join(removals) + ' from the ignore list. '
335 if len(reply) > 0:
336 reply = reply[:-1]
337 self.reply(user, reply)
338 self.set_conf('main', 'ignore_list', self.storage['ignore_list'].join(' '))
340 if command[0] == 'flush_names':
341 for name in self.storage['names']:
342 if self.storage['names'][name]['status'] in ['online', 'prelim-online'] and name.lower() != self.bot_name:
343 self.storage['names'][name]['status'] = 'offline'
344 self.reply(user, 'Chatter list flushed.')
346 if command[0] == 'conf':
347 if len(command) < 3:
348 return
349 if len(command) == 3:
350 result = self.get_conf(command[1].lower(), command[2].lower())
351 if type(result) == list:
352 result = ', '.join(result)
353 self.reply(user, 'Configuration for "' + command[1].lower() + ' -> ' + command[2].lower() + '" is "' + str(result) + '".')
354 return
355 if command[3].lower() in ['true', 'yes', 'on', '1']:
356 command[3] = True
357 elif command[3].lower() in ['false', 'no', 'off', '0']:
358 command[3] = False
359 self.set_conf(command[1].lower(), command[2].lower(), command[3])
360 self.reply(user, 'Configuration for "' + command[1].lower() + ' -> ' + command[2].lower() + '" is now "' + str(command[3]) + '".')
362 def scheduled_update(self):
363 self.plugin_manager.check_schedule()
365 def channel_join(self, name, batch = False, newstatus = 'online'):
366 if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
367 name = name[1:]
369 new_name = self.display_name(name)
370 if isinstance(new_name, str):
371 name = new_name
372 if name.lower() not in self.storage['names']:
373 self.storage['names'][name.lower()] = {}
374 self.storage['names'][name.lower()]['name'] = name
375 self.storage['names'][name.lower()]['status'] = None
376 if self.storage['names'][name.lower()]['status'] != newstatus:
377 if newstatus == 'prelim-online' and self.storage['names'][name.lower()]['status'] == 'online':
378 return
379 if self.storage['names'][name.lower()]['status'] in [None, 'offline']:
380 if name.lower() not in self.storage['ignore_list']:
381 self.plugin_manager.user_join(name, batch)
382 self.storage['names'][name.lower()]['status'] = newstatus
384 def channel_part(self, name, batch = False):
385 if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
386 name = name[1:]
387 if name.lower() == self.bot_name.lower():
388 return
389 if name.lower() not in self.storage['names']:
390 self.storage['names'][name.lower()] = {}
391 self.storage['names'][name.lower()]['name'] = None
392 self.storage['names'][name.lower()]['status'] = None
393 if self.storage['names'][name.lower()]['status'] in [None, 'online', 'prelim-online']:
394 new_name = self.display_name(name)
395 if isinstance(new_name, str):
396 name = new_name
397 self.storage['names'][name.lower()]['status'] = 'offline'
398 if name.lower() not in self.storage['ignore_list']:
399 self.plugin_manager.user_part(name, batch)
401 def update_names(self, names_list, silent, adding = False):
402 names = []
403 for name in names_list:
404 if len(name) > 0 and name[0] in ['@', '+', '!', '%']:
405 name = name[1:]
406 if len(name) > 0 and name.lower() != self.bot_name.lower():
407 names.append(name)
408 if len(names) <= 1 or adding:
409 return
410 for name in self.storage['names']:
411 if name.lower() not in [n.lower() for n in names]:
412 if self.storage['names'][name.lower()]['status'] in ['online', 'prelim-online']:
413 self.channel_part(name, True)
414 for name in names:
415 if name.lower() in self.storage['names'] and 'status' in self.storage['names'][name.lower()] and self.storage['names'][name.lower()]['status'] == 'prelim-online':
416 self.storage['names'][name.lower()]['status'] = 'online'
417 else:
418 # At this point we know that name is someone currently in chat.
419 # self.channel_join does the check whether it is someone new itself.
420 self.channel_join(name, True)