Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def MaintainCache( self ):
with self._lock:
while True:
if len( self._keys_fifo ) == 0:
break
else:
( key, last_access_time ) = next( iter( self._keys_fifo.items() ) )
if HydrusData.TimeHasPassed( last_access_time + self._timeout ):
self._DeleteItem()
else:
break
def _CheckShareAuthorised( self, share_key ):
self._CheckDataUsage()
info = self._GetInfo( share_key )
timeout = info[ 'timeout' ]
if timeout is not None and HydrusData.TimeHasPassed( timeout ):
raise HydrusExceptions.NotFoundException( 'This share has expired.' )
def GetGalleryStatus( self ):
with self._lock:
if HydrusData.TimeHasPassed( self._no_work_until ):
gallery_status = self._gallery_status
else:
no_work_text = HydrusData.ConvertTimestampToPrettyExpires( self._no_work_until ) + ': ' + self._no_work_until_reason
gallery_status = no_work_text
return gallery_status
def _CleanCache( self ):
if HydrusData.TimeHasPassed( self._next_clean_cache_time ):
for cache in ( self._html_to_soups, self._json_to_jsons ):
dead_datas = set()
for ( data, ( last_accessed, parsed_object ) ) in cache.items():
if HydrusData.TimeHasPassed( last_accessed + 10 ):
dead_datas.add( data )
for dead_data in dead_datas:
del cache[ dead_data ]
self._next_clean_cache_time = HydrusData.GetNow() + 5
if HG.client_controller.ShouldStopThisWork( self._maintenance_mode, self._stop_time ):
should_cancel = True
if should_cancel:
self.Cancel()
if not self._deleted.is_set():
if self._deletion_time is not None:
if HydrusData.TimeHasPassed( self._deletion_time ):
self.Finish()
self._deleted.set()
def _NoDelays( self ):
return HydrusData.TimeHasPassed( self._no_work_until )
if self._banned_info is None:
return False
else:
( reason, created, expires ) = self._banned_info
if expires is None:
return True
else:
if HydrusData.TimeHasPassed( expires ):
self._banned_info = None
else:
return True
def CleanUp( self ):
self._htpa.CommitBigJob()
if HydrusData.TimeHasPassed( self._time_started + 120 ):
self._htpa.Optimise()
self._htpa.Close()
self._htpa = None
number_of_errors = HG.client_controller.new_options.GetInteger( 'domain_network_infrastructure_error_number' )
error_time_delta = HG.client_controller.new_options.GetInteger( 'domain_network_infrastructure_error_time_delta' )
if number_of_errors == 0:
return True
# this will become flexible and customisable when I have domain profiles/status/ui
# also should extend it to 'global', so if multiple domains are having trouble, we maybe assume the whole connection is down? it would really be nicer to have a better sockets-level check there
if domain in self._second_level_domains_to_network_infrastructure_errors:
network_infrastructure_errors = self._second_level_domains_to_network_infrastructure_errors[ domain ]
network_infrastructure_errors = [ timestamp for timestamp in network_infrastructure_errors if not HydrusData.TimeHasPassed( timestamp + error_time_delta ) ]
self._second_level_domains_to_network_infrastructure_errors[ domain ] = network_infrastructure_errors
if len( network_infrastructure_errors ) >= number_of_errors:
return False
elif len( network_infrastructure_errors ) == 0:
del self._second_level_domains_to_network_infrastructure_errors[ domain ]
return True
def _MaintainCache( self ):
if HydrusData.TimeHasPassed( self._next_cache_maintenance_timestamp ):
now = HydrusData.GetNow()
oldest_second = now - self.MAX_SECONDS_TIME_DELTA
oldest_minute = now - self.MAX_MINUTES_TIME_DELTA
oldest_hour = now - self.MAX_HOURS_TIME_DELTA
oldest_day = now - self.MAX_DAYS_TIME_DELTA
def clear_counter( counter, timestamp ):
bad_keys = [ key for key in list(counter.keys()) if key < timestamp ]
for bad_key in bad_keys:
del counter[ bad_key ]