Skip to content

Commit

Permalink
Merge pull request #522 from qiniu/ys_dev
Browse files Browse the repository at this point in the history
支持多区域上传
  • Loading branch information
xwen-winnie authored May 28, 2021
2 parents c1ef0b1 + 4f5f606 commit 97b75df
Show file tree
Hide file tree
Showing 14 changed files with 1,012 additions and 285 deletions.
339 changes: 182 additions & 157 deletions src/main/java/com/qiniu/storage/AutoRegion.java

Large diffs are not rendered by default.

76 changes: 76 additions & 0 deletions src/main/java/com/qiniu/storage/BaseUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.qiniu.storage;

import com.qiniu.common.QiniuException;
import com.qiniu.http.Client;
import com.qiniu.http.Response;

public abstract class BaseUploader {

protected final Client client;
protected final String key;
protected final String upToken;
protected final ConfigHelper configHelper;
protected final Configuration config;

BaseUploader(Client client, String upToken, String key, Configuration config) {
this.client = client;
this.key = key;
this.upToken = upToken;
if (config == null) {
this.config = new Configuration();
} else {
this.config = config.clone();
}
this.configHelper = new ConfigHelper(this.config);
}

public Response upload() throws QiniuException {
if (this.config == null) {
throw QiniuException.unrecoverable("config can't be empty");
}
return uploadWithRegionRetry();
}

private Response uploadWithRegionRetry() throws QiniuException {
Response response = null;
while (true) {
try {
response = uploadFlows();
if (!couldSwitchRegionAndRetry(response, null)
|| !couldReloadSource() || !reloadSource()
|| config.region == null || !config.region.switchRegion(new UploadToken(upToken))) {
break;
}
} catch (QiniuException e) {
if (!couldSwitchRegionAndRetry(null, e)
|| !couldReloadSource() || !reloadSource()
|| config.region == null || !config.region.switchRegion(new UploadToken(upToken))) {
throw e;
}
}
}
return response;
}

abstract Response uploadFlows() throws QiniuException;

abstract boolean couldReloadSource();

abstract boolean reloadSource();

private boolean couldSwitchRegionAndRetry(Response response, QiniuException exception) {
Response checkResponse = response;
if (checkResponse == null && exception != null) {
checkResponse = exception.response;
}

if (checkResponse != null) {
int statusCode = checkResponse.statusCode;
return (statusCode > -2 && statusCode < 200) || (statusCode > 299
&& statusCode != 401 && statusCode != 413 && statusCode != 419
&& statusCode != 608 && statusCode != 614 && statusCode != 630);
}

return exception == null || !exception.isUnrecoverable();
}
}
2 changes: 2 additions & 0 deletions src/main/java/com/qiniu/storage/FixBlockUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Deprecated
public class FixBlockUploader {
private final int blockSize;
private final ConfigHelper configHelper;
Expand Down Expand Up @@ -883,6 +884,7 @@ class Record {
long size;
long blockSize;
List<EtagIdx> etagIdxes;

// 用于区分记录是 V1 还是 V2
boolean isValid() {
return uploadId != null && etagIdxes != null && etagIdxes.size() > 0;
Expand Down
42 changes: 24 additions & 18 deletions src/main/java/com/qiniu/storage/FormUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
* 该类封装了七牛提供的表单上传机制
* 参考文档:<a href="https://developer.qiniu.com/kodo/manual/form-upload">表单上传</a>
*/
public final class FormUploader {
public final class FormUploader extends BaseUploader {

private final String token;
private final String key;
private final File file;
private final byte[] data;
private final String mime;
Expand Down Expand Up @@ -45,9 +43,9 @@ public FormUploader(Client client, String upToken, String key, File file, String

private FormUploader(Client client, String upToken, String key, byte[] data, File file, StringMap params,
String mime, boolean checkCrc, Configuration configuration) {
super(client, upToken, key, configuration);

this.client = client;
token = upToken;
this.key = key;
this.file = file;
this.data = data;
this.params = params;
Expand All @@ -56,23 +54,21 @@ private FormUploader(Client client, String upToken, String key, byte[] data, Fil
this.configHelper = new ConfigHelper(configuration);
}

/**
* 同步上传文件
*/
public Response upload() throws QiniuException {
@Override
Response uploadFlows() throws QiniuException {
buildParams();
String host = configHelper.upHost(token);
String host = configHelper.upHost(upToken);
try {
if (data != null) {
return client.multipartPost(configHelper.upHost(token), params, "file", filename, data,
return client.multipartPost(configHelper.upHost(upToken), params, "file", filename, data,
mime, new StringMap());
} else {
return client.multipartPost(configHelper.upHost(token), params, "file", filename, file,
return client.multipartPost(configHelper.upHost(upToken), params, "file", filename, file,
mime, new StringMap());
}
} catch (QiniuException e) {
if (e.response == null || e.response.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
throw e;
}
Expand All @@ -83,32 +79,42 @@ public Response upload() throws QiniuException {
*/
public void asyncUpload(final UpCompletionHandler handler) throws IOException {
buildParams();
final String host = configHelper.upHost(token);
final String host = configHelper.upHost(upToken);
if (data != null) {
client.asyncMultipartPost(host, params, "file", filename,
data, mime, new StringMap(), new AsyncCallback() {
@Override
public void complete(Response res) {
if (res != null && res.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
handler.complete(key, res);
}
});
return;
}
client.asyncMultipartPost(configHelper.upHost(token), params, "file", filename,
client.asyncMultipartPost(configHelper.upHost(upToken), params, "file", filename,
file, mime, new StringMap(), new AsyncCallback() {
@Override
public void complete(Response res) {
if (res != null && res.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
handler.complete(key, res);
}
});
}

@Override
boolean couldReloadSource() {
return true;
}

@Override
boolean reloadSource() {
return true;
}

private void changeHost(String upToken, String host) {
try {
configHelper.tryChangeUpHost(upToken, host);
Expand All @@ -120,7 +126,7 @@ private void changeHost(String upToken, String host) {

private void buildParams() throws QiniuException {
if (params == null) return;
params.put("token", token);
params.put("token", upToken);

if (key != null) {
params.put("key", key);
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/qiniu/storage/Region.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

public class Region {

// 有效时间戳,过了有效期,region会无效,此处只在取时缓存判断; -1 为无限期
private long timestamp = -1;
// 区域名称:z0 华东 z1 华北 z2 华南 na0 北美 as0 东南亚
private String region = "z0";

Expand All @@ -25,6 +27,23 @@ public class Region {
private String rsHost = "rs.qbox.me";
private String rsfHost = "rsf.qbox.me";
private String apiHost = "api.qiniu.com";
private String ucHost = "uc.qbox.me";

Region() {
}

Region(long timestamp, String region, List<String> srcUpHosts, List<String> accUpHosts, String iovipHost,
String rsHost, String rsfHost, String apiHost, String ucHost) {
this.timestamp = timestamp;
this.region = region;
this.srcUpHosts = srcUpHosts;
this.accUpHosts = accUpHosts;
this.iovipHost = iovipHost;
this.rsHost = rsHost;
this.rsfHost = rsfHost;
this.apiHost = apiHost;
this.ucHost = ucHost;
}

/**
* 华东机房相关域名
Expand Down Expand Up @@ -211,10 +230,18 @@ public static Region autoRegion(String ucServer) {
return new Builder().autoRegion(ucServer);
}

boolean switchRegion(RegionReqInfo regionReqInfo) {
return false;
}

String getRegion(RegionReqInfo regionReqInfo) {
return this.region;
}

Region getCurrentRegion(RegionReqInfo regionReqInfo) {
return this;
}

List<String> getSrcUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
return this.srcUpHosts;
}
Expand All @@ -239,6 +266,14 @@ String getApiHost(RegionReqInfo regionReqInfo) throws QiniuException {
return apiHost;
}

boolean isValid() {
if (timestamp < 0) {
return true;
} else {
return System.currentTimeMillis() < timestamp * 1000;
}
}

/**
* 域名构造器
*/
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/com/qiniu/storage/RegionGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.qiniu.storage;

import com.qiniu.common.QiniuException;

import java.util.ArrayList;
import java.util.List;

public class RegionGroup extends Region {

private Region currentRegion = null;
private int currentRegionIndex = 0;
private final List<Region> regionList = new ArrayList<>();


public boolean addRegion(Region region) {
if (region == null) {
return false;
}

regionList.add(region);

if (currentRegion == null) {
updateCurrentRegion();
}

return true;
}

@Override
boolean switchRegion(RegionReqInfo regionReqInfo) {
if (currentRegion != null && currentRegion.isValid() && currentRegion.switchRegion(regionReqInfo)) {
return true;
}

if ((currentRegionIndex + 1) < regionList.size()) {
currentRegionIndex += 1;
updateCurrentRegion();
return true;
} else {
return false;
}
}

String getRegion(RegionReqInfo regionReqInfo) {
if (currentRegion == null) {
return "";
} else {
return currentRegion.getRegion(regionReqInfo);
}
}

List<String> getSrcUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getSrcUpHost(regionReqInfo);
}
}

List<String> getAccUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getAccUpHost(regionReqInfo);
}
}

String getIovipHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getIovipHost(regionReqInfo);
}
}

String getRsHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getRsHost(regionReqInfo);
}
}

String getRsfHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getRsfHost(regionReqInfo);
}
}

String getApiHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getApiHost(regionReqInfo);
}
}

Region getCurrentRegion(RegionReqInfo regionReqInfo) {
if (currentRegion == null) {
return null;
} else if (currentRegion instanceof AutoRegion || currentRegion instanceof RegionGroup) {
return currentRegion.getCurrentRegion(regionReqInfo);
} else {
return currentRegion;
}
}

@Override
boolean isValid() {
if (currentRegion == null) {
return false;
}
// 只判断当前的
return currentRegion.isValid();
}

private void updateCurrentRegion() {
if (regionList.size() == 0) {
return;
}

if (currentRegionIndex < regionList.size()) {
currentRegion = regionList.get(currentRegionIndex);
}
}
}
Loading

0 comments on commit 97b75df

Please sign in to comment.