# Handle limiter written by Buys de Barbanson, Hubrecht 2017
# This class allows for writing many files at the same time without the
# hassle of thinking about handle limitations.
import gzip
import time
[docs]class HandleLimiter(object):
def __init__(self, maxHandles=32, pruneEvery=10000, compressionLevel=1):
self.openHandles = {}
self.seen = set() # Which files have been opened before
self.maxHandles = maxHandles
self.pruneEvery = pruneEvery
self.pruneIntervalCounter = 0
self.compressionLevel = compressionLevel
[docs] def write(self, path, string, method=None, forceAppend=False): # 0= plain, 1:gzip
if path not in self.openHandles:
self.openHandles[path] = {}
failedOpening = True
while failedOpening:
try:
if path in self.seen or forceAppend:
# Append when we already wrote to the file
if method == 1:
self.openHandles[path]['handle'] = gzip.open(
path, 'ab', self.compressionLevel)
else:
self.openHandles[path]['handle'] = open(path, 'a')
else:
# Open as new file when it is the first write
if method == 1:
self.openHandles[path]['handle'] = gzip.open(
path, 'wb', self.compressionLevel)
else:
self.openHandles[path]['handle'] = open(path, 'w')
# Remember that we accessed this file
self.seen.add(path)
failedOpening = False
except Exception as e:
failedOpening = True
if len(self.openHandles) > 1:
self.close()
else:
# This is mayorly bad...
print(
'Failed writing to %s, even after closing all other open file-handles. Out of options...' %
path)
print(e)
# Raise the error to the parent method
raise
if method == 0:
self.openHandles[path]['handle'].write(string)
else:
self.openHandles[path]['handle'].write(bytes(string, 'UTF-8'))
self.openHandles[path]['lastw'] = time.time()
self.pruneIntervalCounter += 1
if self.pruneIntervalCounter >= self.pruneEvery:
self.prune()
[docs] def prune(self):
if len(self.openHandles) > self.maxHandles:
toPrune = len(self.openHandles) - self.maxHandles
pathsToPrune = sorted(
self.openHandles.keys(), key=lambda path: (
self.openHandles[path]['lastw']))[
:toPrune] # ,reverse=True
for path in pathsToPrune:
if 'handle' in self.openHandles[path]:
try:
self.openHandles[path]['handle'].close()
except Exception as e:
pass
self.openHandles.pop(path)
self.pruneIntervalCounter = 0
[docs] def close(self):
k = self.openHandles.keys()
destroyed = []
for path in k:
if 'handle' in self.openHandles[path]:
try:
self.openHandles[path]['handle'].close()
except BaseException:
pass
else:
print('Closed broken file handle for %s' % path)
destroyed.append(path)
for delete in destroyed:
self.openHandles.pop(delete)