001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.io.download;
020
021import java.io.BufferedOutputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.OutputStreamWriter;
029import java.io.PrintWriter;
030import java.nio.file.Files;
031
032import javax.servlet.ServletOutputStream;
033import javax.servlet.WriteListener;
034
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038
039import org.nuxeo.runtime.api.Framework;
040
041/**
042 * A {@link ServletOutputStream} that buffers everything until {@link #stopBuffering()} is called.
043 * <p>
044 * There may only be one such instance per thread.
045 * <p>
046 * Buffering is done first in memory, then on disk if the size exceeds a limit.
047 */
048public class BufferingServletOutputStream extends ServletOutputStream {
049
050    private static final Log log = LogFactory.getLog(BufferingServletOutputStream.class);
051
052    /** Initial memory buffer size. */
053    public static final int INITIAL = 4 * 1024; // 4 KB
054
055    /** Maximum memory buffer size, after this a file is used. */
056    public static final int MAX = 64 * 1024; // 64 KB
057
058    /** Used for 0-length writes. */
059    private final static OutputStream EMPTY = new ByteArrayOutputStream(0);
060
061    /** Have we stopped buffering to pass writes directly to the output stream. */
062    protected boolean streaming;
063
064    protected boolean needsFlush;
065
066    protected boolean needsClose;
067
068    protected final ServletOutputStream outputStream;
069
070    protected PrintWriter writer;
071
072    protected ByteArrayOutputStream memory;
073
074    protected OutputStream file;
075
076    protected File tmp;
077
078    /**
079     * A {@link ServletOutputStream} wrapper that buffers everything until {@link #stopBuffering()} is called.
080     * <p>
081     * {@link #stopBuffering()} <b>MUST</b> be called in a {@code finally} statement in order for resources to be closed
082     * properly.
083     *
084     * @param outputStream the underlying output stream
085     */
086    public BufferingServletOutputStream(ServletOutputStream outputStream) {
087        this.outputStream = outputStream;
088    }
089
090    public PrintWriter getWriter() {
091        if (writer == null) {
092            writer = new PrintWriter(new OutputStreamWriter(this));
093        }
094        return writer;
095    }
096
097    /**
098     * Finds the proper output stream where we can write {@code len} bytes.
099     */
100    protected OutputStream getOutputStream(int len) throws IOException {
101        if (streaming) {
102            return outputStream;
103        }
104        if (len == 0) {
105            return EMPTY;
106        }
107        if (file != null) {
108            // already to file
109            return file;
110        }
111        int total;
112        if (memory == null) {
113            // no buffer yet
114            if (len <= MAX) {
115                memory = new ByteArrayOutputStream(Math.max(INITIAL, len));
116                return memory;
117            }
118            total = len;
119        } else {
120            total = memory.size() + len;
121        }
122        if (total <= MAX) {
123            return memory;
124        } else {
125            // switch to a file
126            createTempFile();
127            file = new BufferedOutputStream(new FileOutputStream(tmp));
128            if (memory != null) {
129                memory.writeTo(file);
130                memory = null;
131            }
132            return file;
133        }
134    }
135
136    protected void createTempFile() throws IOException {
137        tmp = Framework.createTempFile("nxout", null);
138    }
139
140    @Override
141    public void write(int b) throws IOException {
142        getOutputStream(1).write(b);
143    }
144
145    @Override
146    public void write(byte[] b) throws IOException {
147        getOutputStream(b.length).write(b);
148    }
149
150    @Override
151    public void write(byte[] b, int off, int len) throws IOException {
152        getOutputStream(len).write(b, off, len);
153    }
154
155    /**
156     * This implementation does nothing, we still want to keep buffering and not flush.
157     * <p>
158     * {@inheritDoc}
159     */
160    @Override
161    public void flush() throws IOException {
162        if (streaming) {
163            outputStream.flush();
164        } else {
165            needsFlush = true;
166        }
167    }
168
169    /**
170     * This implementation does nothing, we still want to keep the buffer until {@link #stopBuffering()} time.
171     * <p>
172     * {@inheritDoc}
173     */
174    @Override
175    public void close() throws IOException {
176        if (streaming) {
177            outputStream.close();
178        } else {
179            needsClose = true;
180        }
181
182    }
183
184    /**
185     * Writes any buffered data to the underlying {@link OutputStream} and from now on don't buffer anymore.
186     */
187    public void stopBuffering() throws IOException {
188        if (streaming) {
189            return;
190        }
191        if (writer != null) {
192            writer.flush(); // don't close, streaming needs it
193        }
194        streaming = true;
195        if (log.isDebugEnabled()) {
196            long len;
197            if (memory != null) {
198                len = memory.size();
199            } else if (file != null) {
200                len = tmp.length();
201            } else {
202                len = 0;
203            }
204            log.debug("buffered bytes: " + len);
205        }
206        boolean clientAbort = false;
207        try {
208            if (memory != null) {
209                memory.writeTo(outputStream);
210            } else if (file != null) {
211                try {
212                    try {
213                        file.flush();
214                    } finally {
215                        file.close();
216                    }
217                    try (FileInputStream in = new FileInputStream(tmp)) {
218                        IOUtils.copy(in, outputStream);
219                    } catch (IOException e) {
220                        if (DownloadHelper.isClientAbortError(e)) {
221                            DownloadHelper.logClientAbort(e);
222                            clientAbort = true;
223                        } else {
224                            throw e;
225                        }
226                    }
227                } finally {
228                    Files.delete(tmp.toPath());
229                }
230            }
231        } catch (IOException e) {
232            if (DownloadHelper.isClientAbortError(e)) {
233                if (!clientAbort) {
234                    DownloadHelper.logClientAbort(e);
235                    clientAbort = true;
236                }
237            } else {
238                throw e;
239            }
240        } finally {
241            memory = null;
242            file = null;
243            tmp = null;
244            try {
245                if (needsFlush) {
246                    outputStream.flush();
247                }
248            } catch (IOException e) {
249                if (DownloadHelper.isClientAbortError(e)) {
250                    if (!clientAbort) {
251                        DownloadHelper.logClientAbort(e);
252                    }
253                } else {
254                    log.error(e); // don't rethrow inside finally
255                }
256            } finally {
257                if (needsClose) {
258                    outputStream.close();
259                }
260            }
261        }
262    }
263
264    /**
265     * Tells the given {@link OutputStream} to stop buffering (if it was).
266     */
267    public static void stopBuffering(OutputStream out) throws IOException {
268        if (out instanceof BufferingServletOutputStream) {
269            ((BufferingServletOutputStream) out).stopBuffering();
270        }
271    }
272
273    @Override
274    public boolean isReady() {
275        if (streaming) {
276            return outputStream.isReady();
277        } else {
278            // memory or file
279            return true;
280        }
281    }
282
283    @Override
284    public void setWriteListener(WriteListener writeListener) {
285    }
286
287}