Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def finalize(cls):
del cls.store
@staticmethod
def split_num(lookup):
prefix, sep, num = lookup.rpartition(' ')
if not prefix or not num.isdigit():
return lookup, 0
return prefix, int(num)
def lookup(self, rest=''):
rest = rest.strip()
return self.lookup_with_num(*self.split_num(rest))
class SQLiteQuotes(Quotes, storage.SQLiteStorage):
def init_tables(self):
CREATE_QUOTES_TABLE = '''
CREATE TABLE
IF NOT EXISTS quotes (
quoteid INTEGER NOT NULL,
library VARCHAR NOT NULL,
quote TEXT NOT NULL,
PRIMARY KEY (quoteid)
)
'''
CREATE_QUOTES_INDEX = '''
CREATE INDEX
IF NOT EXISTS ix_quotes_library
on quotes(library)
'''
CREATE_QUOTE_LOG_TABLE = '''
from . import storage
from .core import command, on_join
class Notify(storage.SelectableStorage):
@classmethod
def init(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
class SQLiteNotify(Notify, storage.SQLiteStorage):
def init_tables(self):
CREATE_NOTIFY_TABLE = '''
CREATE TABLE
IF NOT EXISTS notify
(
notifyid INTEGER NOT NULL,
tonick VARCHAR NOT NULL,
fromnick VARCHAR NOT NULL,
message VARCHAR NOT NULL,
notifytime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
primary key(notifyid)
)
'''
self.db.execute(CREATE_NOTIFY_TABLE)
self.db.commit()
@staticmethod
def format_entry(entry):
"""
Format the entry suitable for output (add the author if suitable).
"""
needs_author = ' by ' not in entry['title'] and 'author' in entry
template = '{title} by {author}' if needs_author else '{title}'
return template.format(**entry)
class FeedparserDB(storage.SelectableStorage):
pass
class SQLiteFeedparserDB(FeedparserDB, storage.SQLiteStorage):
def init_tables(self):
self.db.execute("CREATE TABLE IF NOT EXISTS feed_seen (key varchar)")
self.db.execute('CREATE INDEX IF NOT EXISTS ix_feed_seen_key ON feed_seen (key)')
self.db.commit()
def get_seen_feeds(self):
return [row[0] for row in self.db.execute('select key from feed_seen')]
def add_entries(self, entries):
self.db.executemany('INSERT INTO feed_seen (key) values (?)', [(x,) for x in entries])
self.db.commit()
def clear(self):
"Clear all entries"
self.db.execute('DELETE FROM feed_seen')
Given two words, initial then follows, associate those words
"""
filter = dict(_id=initial)
key = 'begets.' + fields.encode(follows)
oper = {'$inc': {key: 1}}
self.db.update(filter, oper, upsert=True)
def next(self, initial):
doc = self.db.find_one(dict(_id=initial))
return fields.decode(self.choose(doc['begets']))
def purge(self):
self.db.drop()
class SQLiteChains(Chains, pmxbot.storage.SQLiteStorage):
def init_tables(self):
create_table = """
CREATE TABLE IF NOT EXISTS chains (
initial varchar,
follows varchar,
count INTEGER
)
"""
self.db.execute(create_table)
self.db.commit()
@staticmethod
def _wrap_initial(initial, query):
"""
When 'initial' is None, sqlite requires a different syntax to
match. Patch queries accordingly.
debug = False
class Stack(storage.SelectableStorage):
@classmethod
def init(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
class SQLiteStack(Stack, storage.SQLiteStorage):
def init_tables(self):
CREATE_STACK_TABLE = '''
CREATE TABLE
IF NOT EXISTS stack
(
topic VARCHAR NOT NULL,
items VARCHAR NOT NULL,
primary key(topic)
)
'''
self.db.execute(CREATE_STACK_TABLE)
self.db.commit()
def get_topics(self):
rows = self.db.execute("SELECT topic FROM stack")
return [row[0] for row in rows]
# override MutableSequence methods to persist changes
def __setitem__(self, *args, **kwargs):
super().__setitem__(*args, **kwargs)
self.save()
def __delitem__(self, *args, **kwargs):
super().__delitem__(*args, **kwargs)
self.save()
def insert(self, *args, **kwargs):
super().insert(*args, **kwargs)
self.save()
class SQLiteChannels(ChannelList, storage.SQLiteStorage):
def init_tables(self):
CREATE_SQL = '''
CREATE TABLE IF NOT EXISTS channels (
aspect VARCHAR NOT NULL,
items VARCHAR NOT NULL,
PRIMARY KEY (aspect) )
'''
self.db.execute(CREATE_SQL)
self.db.commit()
def load(self):
SQL = 'SELECT items FROM channels WHERE aspect = ?'
res = next(self.db.execute(SQL, [self.aspect]), None)
if not res:
return
del cls.store
def message(self, channel, nick, msg):
channel = channel.replace('#', '').lower()
self._message(channel, nick, msg)
def list_channels(self):
return self._list_channels()
def clear(self):
"""
Remove all messages from the database.
"""
class SQLiteLogger(Logger, storage.SQLiteStorage):
def init_tables(self):
LOG_CREATE_SQL = '''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER NOT NULL,
datetime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
channel VARCHAR NOT NULL,
nick VARCHAR NOT NULL,
message TEXT,
PRIMARY KEY (id) )
'''
INDEX_DTC_CREATE_SQL = """
CREATE INDEX
IF NOT EXISTS ix_logs_datetime_channel
ON logs (datetime, channel)
"""
INDEX_DT_CREATE_SQL = """
def link(self, thing1, thing2):
"""
Link thing1 and thing2, adding the karma of each into
a single entry.
If any thing does not exist, it is created.
"""
thing1 = thing1.strip().lower()
thing2 = thing2.strip().lower()
if thing1 == thing2:
raise SameName("Attempted to link two of the same name")
self.change(thing1, 0)
self.change(thing2, 0)
return self._link(thing1, thing2)
class SQLiteKarma(Karma, storage.SQLiteStorage):
def init_tables(self):
CREATE_KARMA_VALUES_TABLE = '''
CREATE TABLE IF NOT EXISTS karma_values (
karmaid INTEGER NOT NULL,
karmavalue INTEGER,
primary key (karmaid)
)
'''
CREATE_KARMA_KEYS_TABLE = '''
CREATE TABLE IF NOT EXISTS karma_keys (
karmakey varchar,
karmaid INTEGER,
primary key (karmakey)
)
'''
CREATE_KARMA_LOG_TABLE = '''
if channel not in pmxbot.config.log_channels:
return
ParticipantLogger.store.log_join(nick, channel)
@on_leave()
def log_leave(event, nick, channel):
"""
Log a quit or part event.
"""
if channel not in pmxbot.config.log_channels:
return
ParticipantLogger.store.log(nick, channel, event.type)
class SQLiteLogger(ParticipantLogger, storage.SQLiteStorage):
def init_tables(self):
LOG_CREATE_SQL = '''
CREATE TABLE IF NOT EXISTS rolls (
id INTEGER NOT NULL,
datetime DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
channel VARCHAR NOT NULL,
nick VARCHAR NOT NULL,
change TEXT,
PRIMARY KEY (id) )
'''
INDEX_DTC_CREATE_SQL = """
CREATE INDEX IF NOT EXISTS ix_rolls_datetime_channel
ON rolls (datetime, channel)
"""
INDEX_DT_CREATE_SQL = """
CREATE INDEX IF NOT EXISTS ix_rolls_datetime