Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #929 implement a utility class to save large downloads to a file #12292

Closed
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.client;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.client.Response.CompleteListener;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link Response.ContentListener} that produces an {@link Path}
* that allows applications to save a file from a response {@link Response}
* like curl <URL> -o file.bin does.
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Typical usage is:
* <pre>
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
* httpClient.newRequest(host, port)
* .send(new PathResponseListener(Path.of("/tmp/file.bin"));
*
* var request = httpClient.newRequest(host, port);
* CompletableFuture&gt;Path&gt; completable = PathResponseListener.write(request, Path.of("/tmp/file.bin"));
* </pre>
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
*/
public class PathResponseListener implements CompleteListener, Response.ContentListener
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class);

private CompletableFuture<Path> completable = new CompletableFuture<>();
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
private final CountDownLatch responseLatch = new CountDownLatch(1);
private Path path;
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
private Response response;
private Throwable failure;
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
private FileOutputStream fileOut;
private FileLock fileLock;
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved

public PathResponseListener(Path path) throws FileNotFoundException, IOException
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
if (!path.isAbsolute())
{
throw new FileNotFoundException();
}
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved

this.path = path;

try
{
fileOut = new FileOutputStream(this.path.toFile(), true);
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
fileOut.getChannel().truncate(0);
fileLock = fileOut.getChannel().lock();
}
catch (IOException e)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to instantiate object", e);
else
throw e;
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public void onContent(Response response, ByteBuffer content) throws IOException
{
if (response.getStatus() != HttpStatus.OK_200)
{
throw new HttpResponseException(String.format("HTTP status code of this response %d", response.getStatus()), response);
}
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved

if (!fileLock.isValid())
{
throw new IOException("File lock is not valid");
}
try
{
fileOut.getChannel().write(content);
}
catch (IOException e)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to write file", e);
else
throw e;
}
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onComplete(Result result)
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
if (result.isFailed() && this.failure == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Result failure", failure);
}

this.response = result.getResponse();
this.completable.complete(this.path);

try
{
fileLock.close();
}
catch (IOException e)
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to close file", e);
}
}

public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
{
boolean expired = !responseLatch.await(timeout, unit);
if (expired && this.response == null)
throw new TimeoutException();
try (AutoLock ignored = lock.lock())
{
// If the request failed there is no response.
if (response == null)
throw new ExecutionException(failure);
return response;
}
}

public Response get() throws InterruptedException, ExecutionException
{
this.completable.get();
return this.response;
}
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved

public static CompletableFuture<Path> write(Request request, Path path)
{
return CompletableFuture.supplyAsync(() ->
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
InputStreamResponseListener listener = new InputStreamResponseListener();

try (BufferedInputStream contentStream = new BufferedInputStream(listener.getInputStream(), 1048576);
FileOutputStream file = new FileOutputStream(path.toFile(), true);
BufferedOutputStream fileStream = new BufferedOutputStream(file, 1048576);
FileLock fileLock = file.getChannel().lock();)
{
request.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);

if (response.getStatus() == HttpStatus.OK_200)
{
if (LOG.isDebugEnabled())
LOG.debug("Start writing a file");

fileStream.write(contentStream.readAllBytes());
}
else
{ if (LOG.isDebugEnabled())
LOG.debug("Unable to proceed with request");
else
throw new HttpResponseException(Integer.toString(response.getStatus()), response);
}
}
catch (InterruptedException | TimeoutException | ExecutionException | IOException | HttpResponseException e)
{
throw new CompletionException(e);
}

return path;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,15 @@ default Transport getTransport()
* @param listener the listener that receives response events
*/
void send(Response.CompleteListener listener);


arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
/**
* <p>Sends this request and asynchronously notifies the given listener for response events
* parse content with requested file and save it into {@link Path}</p>
*
* @param listener the listener that receives response events
*/
void send(PathResponseListener listener);
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved

/**
* Attempts to abort the send of this request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PathRequestContent;
import org.eclipse.jetty.client.PathResponseListener;
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.RequestListeners;
import org.eclipse.jetty.client.Response;
Expand Down Expand Up @@ -743,7 +744,14 @@ public void send(Response.CompleteListener listener)
Destination destination = client.resolveDestination(this);
destination.send(this, listener);
}


arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void send(PathResponseListener listener)
arsenalzp marked this conversation as resolved.
Show resolved Hide resolved
{
Destination destination = client.resolveDestination(this);
destination.send(this, listener);
}

void sendAsync(HttpDestination destination, Response.CompleteListener listener)
{
if (listener != null)
Expand Down
Loading
Loading