Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cname_object(self):
bucket = oss2.Bucket(oss2.Auth(OSS_ID, OSS_SECRET), OSS_CNAME, OSS_BUCKET, is_cname=True)
bucket.put_object('hello.txt', 'hello world')
raise ValueError('No endpoint in oss_config.json')
if 'bucketName' not in oss_config:
raise ValueError('No bucketName in oss_config.json')
if 'ossDomain' not in oss_config:
raise ValueError('No ossDomain in oss_config.json')
if 'localDir' not in oss_config:
raise ValueError('No localDir in oss_config.json')
if not str(oss_config['localDir']).strip().endswith('/'):
raise ValueError('localDir must end with a slash, example: /Users/Poison/blog/public/')
is_windows = False
if os.name == 'nt':
is_windows = True
auth = oss2.Auth(oss_config['accessKeyId'], oss_config['accessKeySecret'])
bucket = oss2.Bucket(auth, oss_config['endpoint'], oss_config['bucketName'])
oss_domain = oss_config['ossDomain']
local_dir = oss_config['localDir']
with ThreadPoolExecutor() as executor:
for dirpath, dirnames, filenames in os.walk(local_dir):
for filename in filenames:
executor.submit(upload_file_to_aliyun_oss, os.path.join(dirpath, filename))
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
# 分别以HTTP、HTTPS协议访问。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
key = 'motto.txt'
content = oss2.to_bytes('a' * 1024 * 1024)
filename = 'download.txt'
# 上传文件
bucket.put_object(key, content, headers={'content-length': str(1024 * 1024)})
"""
文件下载
"""
# 下载文件
result = bucket.get_object(key)
# 验证一下
def ali_upload_file(self,access_key,secret_key,bucket_name,domain,endpoint,source_file, filename):
auth = oss2.Auth(access_key, secret_key)
bucket = oss2.Bucket(auth, endpoint,bucket_name)
new_filename='%s/'%self.usr_id_p+filename
result = bucket.put_object(new_filename, source_file) # 上传
if result.status == 200:
return domain + new_filename
return None
def get_bucket(self):
oss_conf = self.__get_oss_config()
auth = oss2.Auth(oss_conf["OSS_ACCESS_KEY"], oss_conf["OSS_ACCESS_KEY_SECRET"])
bucket = oss2.Bucket(auth, oss_conf["OSS_ENDPOINT"], oss_conf["OSS_BUCKET"], is_cname=True)
return bucket
#encoding:utf-8
# __author__ = 'donghao'
# __time__ = 2019/3/7 11:08
import oss2
import os, sys
from datetime import datetime
auth = oss2.Auth('xxxx','secret_key')
# 配置请看oss 文档
bucket = oss2.Bucket(auth,'oss-cn-hangzhou.aliyuncs.com','secondbook')
base_avater_url = 'https://secondbook.oss-cn-hangzhou.aliyuncs.com/avater/'
base_books_url = 'https://secondbook.oss-cn-hangzhou.aliyuncs.com/books/'
def change_filename(filename):
dt = datetime.now()
time = dt.strftime('%Y%m%d%H%M%S')
filename = time+filename
return filename
def percentage(consumed_bytes, total_bytes,id=None):
if total_bytes:
print('我执行了')
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate), end='')
sys.stdout.flush()
html = get_html(url)
img_list = get_img(html)
logger.info(img_list)
creds = context.credentials
if (local):
auth = oss2.Auth(creds.access_key_id,
creds.access_key_secret)
else:
auth = oss2.StsAuth(creds.access_key_id,
creds.access_key_secret,
creds.security_token)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
count = 0
for item in img_list:
count += 1
logging.info(item)
# Get each picture
pic = urllib.urlopen(item)
# Store all the pictures in oss bucket, keyed by timestamp in microsecond unit
bucket.put_object(str(datetime.datetime.now().microsecond) + '.png', pic)
return 'Done!'
split_keys = evt['split_keys']
output_prefix = evt['output_prefix']
video_type = evt['target_type']
video_process_dir = evt['video_proc_dir']
transcoded_split_keys = []
for k in split_keys:
fileDir, shortname, extension = get_fileNameExt(k)
transcoded_filename = 'transcoded_%s.%s' % (shortname, video_type)
transcoded_filepath = os.path.join(fileDir, transcoded_filename)
transcoded_split_keys.append(transcoded_filepath)
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId,
creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, 'oss-%s-internal.aliyuncs.com' % context.region, oss_bucket_name)
if len(transcoded_split_keys) == 0:
raise Exception("no transcoded_split_keys")
LOGGER.info({
"target_type": video_type,
"transcoded_split_keys": transcoded_split_keys
})
_, shortname, extension = get_fileNameExt(video_key)
segs_filename = 'segs_%s.txt' % (shortname + video_type)
segs_filepath = os.path.join(video_process_dir, segs_filename)
if os.path.exists(segs_filepath):
os.remove(segs_filepath)
KEY = ''
KEYSECRET = ''
BUCKETNAME = ''
ENDPOINT = 'http://oss-cn-hangzhou.aliyuncs.com'
REDIS_HOST = "localhost"
REDIS_USER = "root"
REDIS_PASSWORD = ""
REDIS_DB_NAME = 1
REDIS_PORT = 6379
list_name = 'restaurant' # 列队名
# oss
auth = oss2.Auth(KEY, KEYSECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKETNAME)
# redis 池
pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB_NAME, password=REDIS_PASSWORD,
decode_responses=True)
r = redis.Redis(connection_pool=pool)
def put_img():
"""上传逻辑,根据项目需求修改即可"""
url = r.rpop(list_name)
input = requests.get(url)
if input.status_code == 200:
file_name = url # this is file name
obj = bucket.put_object(file_name, input)
if obj.status == 200:
print('OK', file_name)
@staticmethod
def upload_data(file_name, data):
auth = oss2.Auth(SmsCodeConfig.access_key_id, SmsCodeConfig.access_key_secret)
bucket = oss2.Bucket(auth, MeFileConfig.endpoint, MeFileConfig.bucket_name)
bucket.put_object(file_name, data)