Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from utils import time_logger
parser = argparse.ArgumentParser(description="groupby benchmark")
parser.add_argument("--path", dest="path", help="path to the csv data file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file = args.path
file_size = os.path.getsize(file)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df = pd.read_csv(file)
blocks = df._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Groupby + sum aggregation on axis=0: {}; Size: {} bytes".format(file, file_size)
):
df_groupby = df.groupby("1")
blocks = df_groupby.sum()._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Groupby mean on axis=0: {}; Size: {} bytes".format(file, file_size)):
blocks = df_groupby.mean()._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
import os
import modin.pandas as pd
from utils import time_logger
import numpy as np
parser = argparse.ArgumentParser(description="arithmetic benchmark")
parser.add_argument("--path", dest="path", help="path to the csv data file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file = args.path
file_size = os.path.getsize(file)
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df = pd.read_csv(file)
blocks = df._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
num_rows, num_cols = df.shape
new_row = np.random.randint(0, 100, size=num_cols)
new_col = np.random.randint(0, 100, size=num_rows)
def rand_row_loc():
return np.random.randint(0, num_rows)
def rand_col_loc():
return np.random.randint(0, num_cols)
parser.add_argument("--left", dest="left", help="path to the left csv data " "file")
parser.add_argument("--right", dest="right", help="path to the right csv data " "file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file_left = args.left
file_size_left = os.path.getsize(file_left)
file_right = args.right
file_size_right = os.path.getsize(file_right)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df_left = pd.read_csv(file_left)
df_right = pd.read_csv(file_right)
blocks = df_left._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
blocks = df_right._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
):
result = df_left.join(df_right, how="inner", lsuffix="left_")
ray.wait(result._block_partitions.flatten().tolist())
with time_logger(
"Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
parser.add_argument("--right", dest="right", help="path to the right csv data " "file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file_left = args.left
file_size_left = os.path.getsize(file_left)
file_right = args.right
file_size_right = os.path.getsize(file_right)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df_left = pd.read_csv(file_left)
df_right = pd.read_csv(file_right)
blocks = df_left._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
blocks = df_right._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
):
result = df_left.join(df_right, how="inner", lsuffix="left_")
ray.wait(result._block_partitions.flatten().tolist())
with time_logger(
"Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
from utils import time_logger
parser = argparse.ArgumentParser(description="arithmetic benchmark")
parser.add_argument("--path", dest="path", help="path to the csv data file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file = args.path
file_size = os.path.getsize(file)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df = pd.read_csv(file)
blocks = df._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Transpose: {}; Size: {} bytes".format(file, file_size)):
blocks = df.T.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Sum on axis=0: {}; Size: {} bytes".format(file, file_size)):
df.sum()
with time_logger("Sum on axis=1: {}; Size: {} bytes".format(file, file_size)):
df.sum(axis=1)
with time_logger("Median on axis=0: {}; Size: {} bytes".format(file, file_size)):
df.median()
"""Apply some callable function to the data in this partition.
Note: It is up to the implementation how kwargs are handled. They are
an important part of many implementations. As of right now, they
are not serialized.
Args:
func: The lambda to apply (may already be correctly formatted)
Returns:
A new `BaseFramePartition` containing the object that has had `func`
applied to it.
"""
call_queue = self.call_queue + [[func, kwargs]]
future = self.client.submit(apply_list_of_funcs, call_queue, self.future)
return PandasOnDaskFramePartition(future)
raise ValueError(
"Item wrong length {} instead of {}.".format(
len(key), len(self.index)
)
)
key = check_bool_indexer(self.index, key)
# We convert to a RangeIndex because getitem_row_array is expecting a list
# of indices, and RangeIndex will give us the exact indices of each boolean
# requested.
key = pandas.RangeIndex(len(self.index))[key]
if len(key):
return DataFrame(
query_compiler=self._query_compiler.getitem_row_array(key)
)
else:
return DataFrame(columns=self.columns)
else:
if any(k not in self.columns for k in key):
raise KeyError(
"{} not index".format(
str([k for k in key if k not in self.columns]).replace(",", "")
)
)
return DataFrame(
query_compiler=self._query_compiler.getitem_column_array(key)
)
def read_parquet(path, engine="auto", columns=None, **kwargs):
"""Load a parquet object from the file path, returning a DataFrame.
Args:
path: The filepath of the parquet file.
We only support local files for now.
engine: This argument doesn't do anything for now.
kwargs: Pass into parquet's read_pandas function.
"""
return DataFrame(
query_compiler=BaseFactory.read_parquet(
path=path, columns=columns, engine=engine, **kwargs
)
if self._is_multi_by:
return self._default_to_pandas(map_func, **kwargs)
if not isinstance(self._by, type(self._query_compiler)):
return self._apply_agg_function(map_func, drop=drop, **kwargs)
# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
if self._idx_name is not None and drop:
groupby_qc = self._query_compiler.drop(columns=[self._idx_name])
else:
groupby_qc = self._query_compiler
from .dataframe import DataFrame
return DataFrame(
query_compiler=groupby_qc.groupby_reduce(
self._by,
self._axis,
self._kwargs,
map_func,
kwargs,
reduce_func=reduce_func,
reduce_args=kwargs,
numeric_only=numeric_only,
)
)
)
)
elif all(isinstance(o, Series) for o in to_append):
self.name = None
for i in range(len(to_append)):
to_append[i].name = None
to_append[i] = to_append[i]._query_compiler
else:
# Matching pandas behavior of naming the Series columns 0
self.name = 0
for i in range(len(to_append)):
if isinstance(to_append[i], Series):
to_append[i].name = 0
to_append[i] = DataFrame(to_append[i])
return DataFrame(self.copy()).append(
to_append,
ignore_index=ignore_index,
verify_integrity=verify_integrity,
)
elif isinstance(to_append, Series):
self.name = None
to_append.name = None
to_append = [to_append._query_compiler]
elif isinstance(to_append, DataFrame):
self.name = 0
return DataFrame(self.copy()).append(
to_append, ignore_index=ignore_index, verify_integrity=verify_integrity
)
else:
raise TypeError(bad_type_msg.format(type(to_append)))
# If ignore_index is False, by definition the Index will be correct.