Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Download from gcs
for blob in gcs_bucket.list_blobs(prefix=prefix):
dest = os.path.join(self._dest_dir, os.path.basename(blob.name))
blob.download_to_filename(dest)
# Cleanup temporary table
if self._query:
gbq_client.delete_table(table_ref)
# Cleanup temporary files
for blob in gcs_bucket.list_blobs(prefix=prefix):
blob.delete()
class BigQueryReadCache(BaseBigQuery):
"""
@deprecated
Get data from BigQuery and cache them as pandas.dataframe format.
Use {BigQueryFileDownload} if the result query is estimated to be large.
"""
def __init__(self):
super().__init__()
self._key = None
self._query = None
def key(self, key):
self._key = key
def query(self, query):
self.__class__.__name__, [self._src_dir, self._src_pattern]
)
valid()
gcs_client = Gcs.get_gcs_client(self._credentials)
bucket = gcs_client.get_bucket(self._bucket)
files = super().get_target_files(self._src_dir, self._src_pattern)
self._logger.info("Upload files %s" % files)
for file in files:
self._logger.info("Start upload %s" % file)
blob = bucket.blob(os.path.join(self._dest_dir, os.path.basename(file)))
blob.upload_from_filename(file)
self._logger.info("Finish upload %s" % file)
class CsvReadBigQueryCreate(BaseBigQuery, FileWrite):
"""
Read csv and Insert data into BigQuery table
"""
# default bulk line count to change to dataframe object
BULK_LINE_CNT = 10000
# BigQuery insert mode
REPLACE = "replace"
APPEND = "append"
def __init__(self):
super().__init__()
self._table_schema = None
self._replace = True
self.__columns = []
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._key])
valid()
df = pandas.read_gbq(
query=self._get_query(),
dialect="standard",
location=self._location,
project_id=self._project_id,
credentials=ServiceAccount.auth(self._credentials),
)
ObjectStore.put(self._key, df)
class BigQueryFileDownload(BaseBigQuery):
"""
@deprecated
Download query result as a csv file.
This class saves BigQuery result as a temporary file in GCS, and then download.
Download file could be a multiple if file size exceed 1GB.
"""
def __init__(self):
super().__init__()
self._bucket = None
self._dest_dir = None
self._filename = None
def bucket(self, bucket):
self._bucket = bucket
def execute(self, *args):
BaseBigQuery.execute(self)
param_valid = EssentialParameters(self.__class__.__name__, [self._table_schema])
param_valid()
files = super().get_target_files(self._src_dir, self._src_pattern)
if len(files) > 1:
raise InvalidFileCount("Input file must be only one.")
if len(files) == 0:
raise FileNotFound("The specified csv file not found.")
insert_rows = []
is_inserted = False
# initial if_exists
if_exists = self.REPLACE if self._replace is True else self.APPEND
self.__columns = [name_and_type["name"] for name_and_type in self._table_schema]
with open(files[0], "r", encoding=self._encoding) as f:
Args
insert_rows: dictionary list of input cache
"""
insert_data = {}
for c in self._columns:
v_list = [d.get(c) for d in insert_rows]
if not v_list:
raise InvalidFormat(
"Specified column %s does not exist in an input file." % c
)
insert_data[c] = v_list
return insert_data
class BigQueryCreate(BaseBigQuery):
"""
@deprecated
Insert data into BigQuery table
"""
# default bulk line count to change to dataframe object
BULK_LINE_CNT = 10000
# BigQuery insert mode
REPLACE = "replace"
APPEND = "append"
def __init__(self):
super().__init__()
self._table_schema = None
self._replace = True
import re
import string
from datetime import datetime
import pandas
from google.cloud import bigquery
from cliboa.scenario.gcp import BaseBigQuery, BaseFirestore, BaseGcs
from cliboa.scenario.validator import EssentialParameters
from cliboa.util.cache import ObjectStore
from cliboa.util.exception import InvalidParameter
from cliboa.util.gcp import BigQuery, Firestore, Gcs, ServiceAccount
from cliboa.util.string import StringUtil
class BigQueryRead(BaseBigQuery):
"""
Read data from BigQuery and put them into on-memory or export to a file via GCS.
"""
_RANDOM_STR_LENGTH = 8
def __init__(self):
super().__init__()
self._key = None
self._bucket = None
self._dest_dir = None
self._filename = None
self._query = None
def key(self, key):
self._key = key
def execute(self, *args):
BaseBigQuery.execute(self)
param_valid = EssentialParameters(self.__class__.__name__, [self._table_schema])
param_valid()
files = super().get_target_files(self._src_dir, self._src_pattern)
if len(files) == 0:
raise FileNotFound("The specified csv file not found.")
self._logger.info("insert target files %s" % files)
is_inserted = False
# initial if_exists
if_exists = self._REPLACE if self._replace is True else self._APPEND
self._columns = [name_and_type["name"] for name_and_type in self._table_schema]
for file in files:
insert_rows = []
with open(file, "r", encoding=self._encoding) as f:
#
import ast
import csv
import json
import os
import pandas
from cliboa.core.validator import EssentialParameters
from cliboa.scenario.gcp import BaseBigQuery, BaseFirestore, BaseGcs
from cliboa.scenario.load.file import FileWrite
from cliboa.util.exception import FileNotFound, InvalidFileCount, InvalidFormat
from cliboa.util.gcp import Firestore, Gcs, ServiceAccount
class BigQueryWrite(BaseBigQuery, FileWrite):
"""
Read csv and Insert data into BigQuery table
"""
# default bulk line count to change to dataframe object
_BULK_LINE_CNT = 10000
# BigQuery insert mode
_REPLACE = "replace"
_APPEND = "append"
def __init__(self):
super().__init__()
self._table_schema = None
self._replace = True
self._columns = []