Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if len(element) > 5:
r = element[5]
if r.startswith('@'):
r = r.split('@')[1].upper().strip()
if r in fileregex:
fregex = fileregex[r]
if (debug >= 3): log_info(tag + 'Using \"@' + r + '\" regex/filter for \"' + id + '\" (' + fregex + ')')
else:
log_err(tag + 'Regex \"@' + r + '\" does not exist in \"' + fileregexlist + '\" using default \"' + defaultfregex +'\"')
elif r.find('(?P<') == -1:
log_err(tag + 'Regex \"' + r + '\" does not contain placeholder (e.g: \"(?P< ... )\")')
else:
fregex = r
exclude = regex.compile(defaultexclude, regex.I)
if len(element) > 6:
r = element[6]
if r.startswith('@'):
r = r.split('@')[1].upper().strip()
if r in fileregex:
exclude = regex.compile(fileregex[r], regex.I)
if (debug >= 3): log_info(tag + 'Using \"@' + r + '\" exclude regex/filter for \"' + id + '\" (' + r + ')')
else:
log_err(tag + 'Regex \"@' + r + '\" does not exist in \"' + fileregexlist + '\" using default \"' + defaultexclude +'\"')
else:
exclude = regex.compile(r, regex.I)
#if len(element) > 6:
# exclude = regex.compile('(' + element[6] + '|' + defaultexclude + ')', regex.I)
# if (debug >= 3): log_info(tag + id + ': Using \"' + element[6] + '\" exclude-regex/filter')
pattern = re.compile(rule)
match = pattern.search(self.exp_time)
tmp_target = match.group()
tmp_parser = tmp_target.split(":")
if 0 <= int(tmp_parser[0]) <= 11:
self.tp.tunit[3] = int(tmp_parser[0]) + 12
else:
self.tp.tunit[3] = int(tmp_parser[0])
self.tp.tunit[4] = int(tmp_parser[1])
# 处理倾向于未来时间的情况
self.preferFuture(3)
self.isAllDayTime = False
if match is None:
rule = u"(?
show = {}
match = pynab.util.Match()
if match.match('^(.*?)[\. \-]s(\d{1,2})\.?e(\d{1,3})(?:\-e?|\-?e)(\d{1,3})\.', search_name, regex.I):
show = {
'name': match.match_obj.group(1),
'season': int(match.match_obj.group(2)),
'episode': [int(match.match_obj.group(3)), int(match.match_obj.group(4))],
}
elif match.match('^(.*?)[\. \-]s(\d{2})\.?e(\d{2})(\d{2})\.', search_name, regex.I):
show = {
'name': match.match_obj.group(1),
'season': int(match.match_obj.group(2)),
'episode': [int(match.match_obj.group(3)), int(match.match_obj.group(4))],
}
elif match.match('^(.*?)[\. \-]s(\d{1,2})\.?e(\d{1,3})\.?', search_name, regex.I):
show = {
'name': match.match_obj.group(1),
'season': int(match.match_obj.group(2)),
'episode': int(match.match_obj.group(3)),
}
elif match.match('^(.*?)[\. \-]s(\d{1,2})\.', search_name, regex.I):
show = {
'name': match.match_obj.group(1),
'season': int(match.match_obj.group(2)),
'episode': 'all',
}
elif match.match('^(.*?)[\. \-]s(\d{1,2})d\d{1}\.', search_name, regex.I):
show = {
'name': match.match_obj.group(1),
'season': int(match.match_obj.group(2)),
'episode': 'all',
@classmethod
def is_valid(cls, s):
return cls.MIN_LENGTH < len(s) < cls.MAX_LENGTH and \
flanker.addresslib.address.is_email(s)
@classmethod
def scan(cls, string):
for m in cls.RE_ID.finditer(string):
message_id = m.group(1)
if cls.is_valid(message_id):
yield cls(message_id)
class Subject(six.text_type):
RE_RE = re.compile("((RE|FW|FWD|HA)([[]\d])*:\s*)*", re.I)
def __new__(cls, *args, **kw):
return six.text_type.__new__(cls, *args, **kw)
def strip_replies(self):
return self.RE_RE.sub('', self)
import regex
import roman
import datetime
import pytz
import time
from pynab import log
import pynab.util
from pynab.interfaces.movie import INTERFACES as MOVIE_INTERFACES
from pynab.interfaces.tv import INTERFACES as TV_INTERFACES
from pynab.db import db_session, windowed_query, Release, MetaBlack, Category, Movie, TvShow, DBID, DataLog, Episode
import config
CLEANING_REGEX = regex.compile(r'\b(hdtv|dvd|divx|xvid|mpeg2|x264|aac|flac|bd|dvdrip|10 bit|264|720p|1080p\d+x\d+)\b', regex.I)
def process(type, interfaces=None, limit=None, online=True):
"""
Process ID fetching for releases.
:param type: tv/movie
:param interfaces: interfaces to use or None will use all
:param limit: optional limit
:param online: whether to check online apis
:return:
"""
expiry = datetime.datetime.now(pytz.utc) - datetime.timedelta(config.postprocess.get('fetch_blacklist_duration', 7))
with db_session() as db:
# noinspection PyComparisonWithNone,PyComparisonWithNone
# Episodes with a title, 4 digit season number, Single episodes (2016x05, etc) & Multi-episode (2016x05x06, 2016x05-06, 2016x05 x06, etc)
(
'Episodes with a title, 4 digit season number, Single episodes (2016x05, etc) & Multi-episode (2016x05x06, 2016x05-06, 2016x05 x06, etc)',
regex.compile(r"^(?<title>.+?)(?:(?:[-_\W](?<![()\[!]))+(?<season>(?<!\d+)(?:\d{4})(?!\d+))(?:x|\Wx){1,2}(?<episode>\d{2,3}(?!\d+))(?:(?:\-|x|\Wx|_){1,2}(?<episode>\d{2,3}(?!\d+)))*)\W?(?!\\)", regex.I),
),
# Partial season pack
(
'Partial season pack',
regex.compile(r"^(?<title>.+?)(?:\W+S(?<season>(?<!\d+)(?:\d{1,2})(?!\d+))\W+(?:(?:Part\W?|(?<!\d+\W+)e)(?<seasonpart>\d{1,2}(?!\d+)))+)", regex.I),
),
# Mini-Series with year in title, treated as season 1, episodes are labelled as Part01, Part 01, Part.1
(
'Mini-Series with year in title, treated as season 1, episodes are labelled as Part01, Part 01, Part.1',
regex.compile(r"^(?<title>.+?\d{4})(?:\W+(?:(?:Part\W?|e)(?<episode>\d{1,2}(?!\d+)))+)", regex.I),
),
# Mini-Series, treated as season 1, multi episodes are labelled as E1-E2
(
'Mini-Series, treated as season 1, multi episodes are labelled as E1-E2',
regex.compile(r"^(?<title>.+?)(?:[-._ ][e])(?<episode>\d{2,3}(?!\d+))(?:(?:\-?[e])(?<episode>\d{2,3}(?!\d+)))+", regex.I),
),
# Mini-Series, treated as season 1, episodes are labelled as Part01, Part 01, Part.1
(
'Mini-Series, treated as season 1, episodes are labelled as Part01, Part 01, Part.1',
regex.compile(r"^(?<title>.+?)(?:\W+(?:(?:Part\W?|(?<!\d+\W+)e)(?<episode>\d{1,2}(?!\d+)))+)", regex.I),
),
# Mini-Series, treated as season 1, episodes are labelled as Part One/Two/Three/...Nine, Part.One, Part_One
(</title>
def character_count(text: str) -> int:
"""Count how many characters an ASS line contains.
Doesn't take into account effects such as text invisibility etc.
:param text: input ASS line
:return: number of characters
"""
return len(
regex.sub(r"\W+", "", ass_to_plaintext(text), flags=regex.I | regex.U)
)
PROCESS_CHUNK_SIZE = 500
TVRAGE_FULL_SEARCH_URL = 'http://services.tvrage.com/feeds/full_search.php'
# use compiled xpaths and regex for speedup
XPATH_SHOW = etree.XPath('//show')
XPATH_NAME = etree.XPath('name/text()')
XPATH_AKA = etree.XPath('akas/aka/text()')
XPATH_LINK = etree.XPath('link/text()')
XPATH_COUNTRY = etree.XPath('country/text()')
RE_LINK = regex.compile('tvrage\.com\/((?!shows)[^\/]*)$', regex.I)
def process(limit=None, online=True):
"""Processes [limit] releases to add TVRage information."""
expiry = datetime.datetime.now(pytz.utc) - datetime.timedelta(config.postprocess.get('fetch_blacklist_duration', 7))
api_session = requests.Session()
with db_session() as db:
# clear expired metablacks
db.query(MetaBlack).filter(MetaBlack.tvshow != None).filter(MetaBlack.time <= expiry).delete(
synchronize_session='fetch')
query = db.query(Release).filter((Release.tvshow == None) | (Release.episode == None)).join(Category).filter(
Category.parent_id == 5000)
if online:
def extract(s: str, entities: Iterable[str], useregex=False, ignorecase=True) -> Iterable[str]:
for m in re.compile(
r"\b(?:{})\b".format(r"|".join(
e if useregex else re.escape(e).replace(' ', r"s+") for e in entities
)),
re.I if ignorecase else 0
).finditer(s):
yield m.group(0)
u'\N{MODIFIER LETTER TURNED COMMA}', # u'\u02bb'
u'\N{ARMENIAN APOSTROPHE}', # u'\u055a'
u'\N{LATIN SMALL LETTER SALTILLO}', # u'\ua78c'
u'\N{PRIME}', # u'\u2032'
u'\N{REVERSED PRIME}', # u'\u2035'
u'\N{MODIFIER LETTER PRIME}', # u'\u02b9'
u'\N{FULLWIDTH APOSTROPHE}', # u'\uff07'
]
RE_NBSP = re.compile(u'\xa0', flags=re.UNICODE)
RE_SPACES = re.compile(r'\s+')
RE_TRIM_SPACES = re.compile(r'^\s+(\S.*?)\s+$')
RE_TRIM_COLONS = re.compile(r'(\S.*?):*$')
RE_SANITIZE_SKIP = re.compile(r'\t|\n|\r|\u00bb|,\s\u0432|\u200e|\xb7|\u200f|\u064e|\u064f', flags=re.M)
RE_SANITIZE_RUSSIAN = re.compile(r'([\W\d])\u0433\.', flags=re.I | re.U)
RE_SANITIZE_PERIOD = re.compile(r'(?<=\D+)\.', flags=re.U)
RE_SANITIZE_ON = re.compile(r'^.*?on:\s+(.*)')
RE_SANITIZE_APOSTROPHE = re.compile(u'|'.join(APOSTROPHE_LOOK_ALIKE_CHARS))
RE_SEARCH_TIMESTAMP = re.compile(r'^\d{10}(?![^\d.])')
def sanitize_spaces(date_string):
date_string = RE_NBSP.sub(' ', date_string)
date_string = RE_SPACES.sub(' ', date_string)
date_string = RE_TRIM_SPACES.sub(r'\1', date_string)
return date_string
def date_range(begin, end, **kwargs):
dateutil_error_prone_args = ['year', 'month', 'week', 'day', 'hour',