Skip to content

Commit

Permalink
feat: add idle-time fop support and get fop status
Browse files Browse the repository at this point in the history
  • Loading branch information
lihsai0 committed Sep 4, 2024
1 parent 908228c commit 1587507
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 71 deletions.
3 changes: 2 additions & 1 deletion qiniu/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
str('persistentOps'), # 持久化处理操作
str('persistentNotifyUrl'), # 持久化处理结果通知URL
str('persistentPipeline'), # 持久化处理独享队列
str('persistentType'), # 指定是否开始闲时任务
str('persistentType'), # 为 `1` 时,开启闲时任务,必须是 int 类型

str('deleteAfterDays'), # 文件多少天后自动删除
str('fileType'), # 文件的存储类型,0为标准存储,1为低频存储,2为归档存储,3为深度归档存储,4为归档直读存储
str('isPrefixalScope'), # 指定上传文件必须使用的前缀
Expand Down
54 changes: 42 additions & 12 deletions qiniu/services/processing/pfop.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,56 @@ def __init__(self, auth, bucket, pipeline=None, notify_url=None):
self.pipeline = pipeline
self.notify_url = notify_url

def execute(self, key, fops, force=None):
"""执行持久化处理:
Args:
key: 待处理的源文件
fops: 处理详细操作,规格详见 https://developer.qiniu.com/dora/manual/1291/persistent-data-processing-pfop
force: 强制执行持久化处理开关
def execute(self, key, fops, force=None, persistent_type=None):
"""
执行持久化处理
Returns:
一个dict变量,返回持久化处理的persistentId,类似{"persistentId": 5476bedf7823de4068253bae};
一个ResponseInfo对象
Parameters
----------
key: str
待处理的源文件
fops: list[str]
处理详细操作,规格详见 https://developer.qiniu.com/dora/manual/1291/persistent-data-processing-pfop
force: int or str, optional
强制执行持久化处理开关
persistent_type: int or str, optional
持久化处理类型,为 '1' 时开启闲时任务
Returns
-------
ret: dict
持久化处理的 persistentId,类似 {"persistentId": 5476bedf7823de4068253bae};
resp: ResponseInfo
"""
ops = ';'.join(fops)
data = {'bucket': self.bucket, 'key': key, 'fops': ops}
if self.pipeline:
data['pipeline'] = self.pipeline
if self.notify_url:
data['notifyURL'] = self.notify_url
if force == 1:
data['force'] = 1
if force == 1 or force == '1':
data['force'] = str(force)
if persistent_type and type(int(persistent_type)) is int:
data['type'] = str(persistent_type)

url = '{0}/pfop'.format(config.get_default('default_api_host'))
return http._post_with_auth(url, data, self.auth)

def get_status(self, persistent_id):
"""
获取持久化处理状态
Parameters
----------
persistent_id: str
Returns
-------
ret: dict
持久化处理的状态,详见 https://developer.qiniu.com/dora/1294/persistent-processing-status-query-prefop
resp: ResponseInfo
"""
url = '{0}/status/get/prefop'.format(config.get_default('default_api_host'))
data = {
'id': persistent_id
}
return http._get_with_auth(url, data, self.auth)
12 changes: 0 additions & 12 deletions test_qiniu.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,18 +434,6 @@ def test_private_url(self):
assert r.status_code == 200


class MediaTestCase(unittest.TestCase):
def test_pfop(self):
q = Auth(access_key, secret_key)
pfop = PersistentFop(q, 'testres', 'sdktest')
op = op_save('avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240', 'pythonsdk', 'pfoptest')
ops = []
ops.append(op)
ret, info = pfop.execute('sintel_trailer.mp4', ops, 1)
print(info)
assert ret['persistentId'] is not None


class EtagTestCase(unittest.TestCase):
def test_zero_size(self):
open("x", 'a').close()
Expand Down
Empty file.
49 changes: 49 additions & 0 deletions tests/cases/test_services/test_processing/test_pfop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest


from qiniu import PersistentFop, op_save


persistent_id = None


class TestPersistentFop:
def test_pfop_execute(self, qn_auth):
pfop = PersistentFop(qn_auth, 'testres', 'sdktest')
op = op_save('avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240', 'pythonsdk', 'pfoptest')
ops = [
op
]
ret, resp = pfop.execute('sintel_trailer.mp4', ops, 1)
assert resp.status_code == 200, resp
assert ret['persistentId'] is not None, resp
global persistent_id
persistent_id = ret['persistentId']

def test_pfop_get_status(self, qn_auth):
assert persistent_id is not None
pfop = PersistentFop(qn_auth, 'testres', 'sdktest')
ret, resp = pfop.get_status(persistent_id)
assert resp.status_code == 200, resp
assert ret is not None, resp

def test_pfop_idle_time_task(self, set_conf_default, qn_auth):
persistence_key = 'python-sdk-pfop-test/test-pfop-by-api'

key = 'sintel_trailer.mp4'
pfop = PersistentFop(qn_auth, 'testres')
ops = [
op_save(
op='avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240',
bucket='pythonsdk',
key=persistence_key
)
]
ret, resp = pfop.execute(key, ops, force=1, persistent_type=1)
assert resp.status_code == 200, resp
assert 'persistentId' in ret, resp

ret, resp = pfop.get_status(ret['persistentId'])
assert resp.status_code == 200, resp
assert ret['type'] == 1, resp
assert ret['creationDate'] is not None, resp
46 changes: 46 additions & 0 deletions tests/cases/test_services/test_storage/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import os
from collections import namedtuple
from hashlib import new as hashlib_new
import tempfile

import pytest

import requests
Expand Down Expand Up @@ -92,3 +97,44 @@ def regions_with_fake_endpoints(regions_with_real_endpoints):

yield regions


TempFile = namedtuple(
'TempFile',
[
'path',
'md5',
'name',
'size'
]
)


@pytest.fixture(scope='function')
def temp_file(request):
size = 4 * 1024
if hasattr(request, 'param'):
size = request.param

tmp_file_path = tempfile.mktemp()
chunk_size = 4 * 1024

md5_hasher = hashlib_new('md5')
with open(tmp_file_path, 'wb') as f:
remaining_bytes = size
while remaining_bytes > 0:
chunk = os.urandom(min(chunk_size, remaining_bytes))
f.write(chunk)
md5_hasher.update(chunk)
remaining_bytes -= len(chunk)

yield TempFile(
path=tmp_file_path,
md5=md5_hasher.hexdigest(),
name=os.path.basename(tmp_file_path),
size=size
)

try:
os.remove(tmp_file_path)
except Exception:
pass
66 changes: 66 additions & 0 deletions tests/cases/test_services/test_storage/test_upload_pfop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest

import qiniu


KB = 1024
MB = 1024 * KB
GB = 1024 * MB


# set a bucket lifecycle manually to delete prefix `test-pfop`!
# or this test will continue to occupy bucket space.
class TestPersistentFopByUpload:
@pytest.mark.parametrize('temp_file', [10 * MB], indirect=True)
@pytest.mark.parametrize('persistent_type', [None, 0, 1])
def test_pfop_with_upload(
self,
set_conf_default,
qn_auth,
bucket_name,
temp_file,
persistent_type
):
key = 'test-pfop-upload-file'
persistent_key = '_'.join([
'test-pfop-by-upload',
'type',
str(persistent_type)
])
persistent_ops = ';'.join([
qiniu.op_save(
op='avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240',
bucket=bucket_name,
key=persistent_key
)
])

upload_policy = {
'persistentOps': persistent_ops
}

if persistent_type is not None:
upload_policy['persistentType'] = persistent_type

token = qn_auth.upload_token(
bucket_name,
key,
policy=upload_policy
)
ret, resp = qiniu.put_file(
token,
key,
temp_file.path,
check_crc=True
)

assert ret is not None, resp
assert ret['key'] == key, resp
assert 'persistentId' in ret, resp

pfop = qiniu.PersistentFop(qn_auth, bucket_name)
ret, resp = pfop.get_status(ret['persistentId'])
assert resp.status_code == 200, resp
if persistent_type == 1:
assert ret['type'] == 1, resp
assert ret['creationDate'] is not None, resp
47 changes: 1 addition & 46 deletions tests/cases/test_services/test_storage/test_uploader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import os
from collections import namedtuple
from hashlib import new as hashlib_new

import tempfile
import pytest

from qiniu.compat import json, is_py2
Expand All @@ -16,7 +13,7 @@
build_batch_delete
)
from qiniu.http.endpoint import Endpoint
from qiniu.http.region import Region, ServiceName
from qiniu.http.region import ServiceName
from qiniu.services.storage.uploader import _form_put

KB = 1024
Expand Down Expand Up @@ -112,48 +109,6 @@ def set_default_up_host_zone(request, valid_up_host):
qn_config._is_customized_default['default_zone'] = False


TempFile = namedtuple(
'TempFile',
[
'path',
'md5',
'name',
'size'
]
)


@pytest.fixture(scope='function')
def temp_file(request):
size = 4 * KB
if hasattr(request, 'param'):
size = request.param

tmp_file_path = tempfile.mktemp()
chunk_size = 4 * KB

md5_hasher = hashlib_new('md5')
with open(tmp_file_path, 'wb') as f:
remaining_bytes = size
while remaining_bytes > 0:
chunk = os.urandom(min(chunk_size, remaining_bytes))
f.write(chunk)
md5_hasher.update(chunk)
remaining_bytes -= len(chunk)

yield TempFile(
path=tmp_file_path,
md5=md5_hasher.hexdigest(),
name=os.path.basename(tmp_file_path),
size=size
)

try:
os.remove(tmp_file_path)
except Exception:
pass


class TestUploadFuncs:
def test_put(self, qn_auth, bucket_name, get_key):
key = get_key('a\\b\\c"hello', no_rand_trail=True)
Expand Down

0 comments on commit 1587507

Please sign in to comment.