Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_segy(self):
with segyio.open(self.segy_file, 'r') as segyfile:
segyfile.mmap()
self.startInline = segyfile.ilines[0]
self.endInline = segyfile.ilines[-1]
self.nEast = len(segyfile.ilines)
self.stepInline = (self.endInline - self.startInline) // \
(self.nEast - 1)
self.startCrline = segyfile.xlines[0]
self.endCrline = segyfile.xlines[-1]
self.nNorth = len(segyfile.xlines)
self.stepCrline = (self.endCrline - self.startCrline) // \
(self.nNorth - 1)
self.startDepth = segyfile.samples[0]
self.endDepth = segyfile.samples[-1]
self.nDepth = len(segyfile.samples)
self.stepDepth = (self.endDepth - self.startDepth) // \
(self.nDepth - 1)
spec.ilines = src.ilines[::2]
spec.xlines = src.xlines[::2]
with segyio.create(dstfile, spec) as dst:
# use the inline headers as base
dst.header.iline = src.header.iline[::2]
# then update crossline numbers from the crossline headers
for xl in dst.xlines:
f = next(src.header.xline[xl])[TraceField.CROSSLINE_3D]
dst.header.xline[xl] = { TraceField.CROSSLINE_3D: f }
# but we override the last xline to be 6, not 5
dst.header.xline[5] = { TraceField.CROSSLINE_3D: 6 }
dst.iline = src.iline[::2]
with segyio.open(dstfile, "r") as f:
self.assertListEqual(list(f.ilines), list(spec.ilines))
self.assertListEqual(list(f.xlines), [1, 3, 6])
self.assertAlmostEqual(1, f.iline[1][0][0], places = 4)
self.assertAlmostEqual(3.004, f.iline[3][0][4], places = 4)
self.assertAlmostEqual(3.014, f.iline[3][1][4], places = 4)
self.assertAlmostEqual(7.023, f.iline[7][2][3], places = 4)
def test_create_sgy_shorter_traces(self):
dstfile = self.filename.replace(".sgy", "-shorter.sgy")
with segyio.open(self.filename, "r") as src:
spec = segyio.spec()
spec.format = int(src.format)
spec.sorting = int(src.sorting)
spec.samples = 20 # reduces samples per trace
spec.ilines = src.ilines
spec.xlines = src.xlines
with segyio.create(dstfile, spec) as dst:
for i, srch in enumerate(src.header):
dst.header[i] = srch
d = { TraceField.INLINE_3D: srch[TraceField.INLINE_3D] + 100 }
dst.header[i] = d
for lineno in dst.ilines:
dst.iline[lineno] = src.iline[lineno]
def test_dt_no_fallback(self):
f_name = self.filename.replace( ".sgy", "_dt_test.sgy")
shutil.copyfile(self.filename, f_name)
dt_us = 6000
with segyio.open(f_name, "r+") as f:
f.bin[BinField.Interval] = dt_us
f.header[0][TraceField.TRACE_SAMPLE_INTERVAL] = dt_us
f.flush()
np.testing.assert_almost_equal(segyio.dt(f), dt_us/1000)
def main():
if len(sys.argv) < 2:
sys.exit("Usage: {} [source-file] [destination-file] [prefix]".format(sys.argv[0]))
srcfile = sys.argv[1]
dstfile = sys.argv[2] if len(sys.argv) > 2 else srcfile
prefix = sys.argv[3] if len(sys.argv) > 3 else os.path.split(dstfile)
for pre in ['normal', 'acute', 'right', 'obtuse', 'straight', 'reflex', 'left', 'inv-acute']:
fname = pathjoin(pre, dstfile, prefix)
shutil.copyfile(srcfile, fname)
with segyio.open(srcfile) as src, segyio.open(fname, 'r+') as dst:
for i in range(1 + src.ext_headers):
dst.text[i] = src.text[i]
dst.bin = src.bin
dst.trace = src.trace
dst.header = src.header
with segyio.open(pathjoin('normal', dstfile, prefix), 'r+') as dst:
for i, (x, y) in enumerate(product(src)):
trh = dst.header[i]
trh[su.cdpx] = x
trh[su.cdpy] = y
trh[su.scalco] = 10
with segyio.open(pathjoin('acute', dstfile, prefix), 'r+') as dst:
for i, (x, y) in enumerate(product(src)):
"""
Loads segy file and uses the input inline and crossline byte values to load
the trace headers. It determines which inline or crossline the traces
start with. SEGY files can be non standard and use other byte location for
these values. In that case, the data from this method will be erroneous. It
is up to the user to figure out which numbers to use by reading the SEGY text
header and finding the byte offsets visually.
:param str input_file: path to segy file
:param int iline: inline byte position
:param int xline: crossline byte position
:returns: fast_distinct, slow_distinct, trace_headers, samplesize
:rtype: list, DataFrame, DataFrame, int
"""
with segyio.open(input_file, ignore_geometry=True) as segy_file:
segy_file.mmap()
# Initialize df with trace id as index and headers as columns
trace_headers = pd.DataFrame(index=range(0, segy_file.tracecount), columns=["i", "j"])
# Fill dataframe with all trace headers values
trace_headers["i"] = segy_file.attributes(iline)
trace_headers["j"] = segy_file.attributes(xline)
_identify_fast_direction(trace_headers, FAST, SLOW)
samplesize = len(segy_file.samples)
fast_distinct = _remove_duplicates(trace_headers[FAST])
slow_distinct = np.unique(trace_headers[SLOW])
return fast_distinct, slow_distinct, trace_headers, samplesize
"""
# Select last channel
if type(out_cube) is list:
out_cube = out_cube[-1]
print("Writing interpretation to " + out_filename)
# Copy segy file
from shutil import copyfile
copyfile(in_filename, out_filename)
# Moving temporal axis back again
out_cube = np.moveaxis(out_cube, 0, -1)
# Open out-file
with segyio.open(out_filename, "r+") as src:
iline_start = src.ilines[0]
dtype = src.iline[iline_start].dtype
# loop through inlines and insert output
for i in src.ilines:
iline = out_cube[i - iline_start, :, :]
src.iline[i] = np.ascontiguousarray(iline.astype(dtype))
# TODO: rewrite this whole function
# Moving temporal axis first again - just in case the user want to keep working on it
out_cube = np.moveaxis(out_cube, -1, 0)
print("Writing interpretation - Finished")
return
def load_segy_with_geometry(segyfile):
try:
segy = segyio.open(segyfile, ignore_geometry=False)
segy.mmap()
print(f"Loaded with geometry: {segyfile} :")
print(f"\tNum samples per trace: {len(segy.samples)}")
print(f"\tNum traces in file: {segy.tracecount}")
except ValueError as ex:
print(f"Load failed with geometry: {segyfile} :")
print(ex)
def open_file_and_wrap(cls, file_name, file_activity_monitor=None):
"""
Creates and wrap a segyio instance for a given filename.
:param file_name:
:param file_activity_monitor:
:return: SegyIOWrapper
"""
wrapped = cls(file_name=file_name, file_activity_monitor=file_activity_monitor)
wrapped.segy = segyio.open(str(file_name))
wrapped._wrap_segyio_slices()
return wrapped
def segy_read(segy_path, out_path, out_name):
def write(chunk, segy_file, dset):
for i in chunk:
dset[i[0], i[1], :] = segy_file.trace.raw[i[2]]
return(chunk)
segy_file = segyio.open(segy_path)
trace_inlines = segy_file.attributes(segyio.TraceField.INLINE_3D)[:]
trace_xlines = segy_file.attributes(segyio.TraceField.CROSSLINE_3D)[:]
trace_inlines_unique = np.unique(trace_inlines)
trace_xlines_unique = np.unique(trace_xlines)
num_inline = trace_inlines_unique.size
num_xline = trace_xlines_unique.size
num_zsamples = len(segy_file.samples)
min_inline = trace_inlines_unique.min()
min_xline = trace_xlines_unique.min()
min_zsample = segy_file.samples.min()
max_inline = trace_inlines_unique.max()
max_xline = trace_xlines_unique.max()