001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.io.download;
018
019import java.io.BufferedOutputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.PrintWriter;
028
029import javax.servlet.ServletOutputStream;
030
031import org.apache.commons.io.IOUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * A {@link ServletOutputStream} that buffers everything until {@link #stopBuffering()} is called.
037 * <p>
038 * There may only be one such instance per thread.
039 * <p>
040 * Buffering is done first in memory, then on disk if the size exceeds a limit.
041 */
042public class BufferingServletOutputStream extends ServletOutputStream {
043
044    private static final Log log = LogFactory.getLog(BufferingServletOutputStream.class);
045
046    /** Initial memory buffer size. */
047    public static final int INITIAL = 4 * 1024; // 4 KB
048
049    /** Maximum memory buffer size, after this a file is used. */
050    public static final int MAX = 64 * 1024; // 64 KB
051
052    /** Used for 0-length writes. */
053    private final static OutputStream EMPTY = new ByteArrayOutputStream(0);
054
055    protected static ThreadLocal<BufferingServletOutputStream> threadLocal = new ThreadLocal<BufferingServletOutputStream>();
056
057    /** Have we stopped buffering to pass writes directly to the output stream. */
058    protected boolean streaming;
059
060    protected boolean needsFlush;
061
062    protected boolean needsClose;
063
064    protected final OutputStream outputStream;
065
066    protected PrintWriter writer;
067
068    protected ByteArrayOutputStream memory;
069
070    protected OutputStream file;
071
072    protected File tmp;
073
074    /**
075     * A {@link ServletOutputStream} wrapper that buffers everything until {@link #stopBuffering()} is called.
076     * <p>
077     * {@link #stopBuffering()} <b>MUST</b> be called in a {@code finally} statement in order for resources to be closed
078     * properly.
079     *
080     * @param outputStream the underlying output stream
081     */
082    public BufferingServletOutputStream(OutputStream outputStream) {
083        this.outputStream = outputStream;
084        threadLocal.set(this);
085    }
086
087    public PrintWriter getWriter() {
088        if (writer == null) {
089            writer = new PrintWriter(new OutputStreamWriter(this));
090        }
091        return writer;
092    }
093
094    /**
095     * Finds the proper output stream where we can write {@code len} bytes.
096     */
097    protected OutputStream getOutputStream(int len) throws IOException {
098        if (streaming) {
099            return outputStream;
100        }
101        if (len == 0) {
102            return EMPTY;
103        }
104        if (file != null) {
105            // already to file
106            return file;
107        }
108        int total;
109        if (memory == null) {
110            // no buffer yet
111            if (len <= MAX) {
112                memory = new ByteArrayOutputStream(Math.max(INITIAL, len));
113                return memory;
114            }
115            total = len;
116        } else {
117            total = memory.size() + len;
118        }
119        if (total <= MAX) {
120            return memory;
121        } else {
122            // switch to a file
123            createTempFile();
124            file = new BufferedOutputStream(new FileOutputStream(tmp));
125            if (memory != null) {
126                memory.writeTo(file);
127                memory = null;
128            }
129            return file;
130        }
131    }
132
133    protected void createTempFile() throws IOException {
134        tmp = File.createTempFile("nxout", null);
135    }
136
137    @Override
138    public void write(int b) throws IOException {
139        getOutputStream(1).write(b);
140    }
141
142    @Override
143    public void write(byte[] b) throws IOException {
144        getOutputStream(b.length).write(b);
145    }
146
147    @Override
148    public void write(byte[] b, int off, int len) throws IOException {
149        getOutputStream(len).write(b, off, len);
150    }
151
152    /**
153     * This implementation does nothing, we still want to keep buffering and not flush.
154     * <p>
155     * {@inheritDoc}
156     */
157    @Override
158    public void flush() throws IOException {
159        if (streaming) {
160            outputStream.flush();
161        } else {
162            needsFlush = true;
163        }
164    }
165
166    /**
167     * This implementation does nothing, we still want to keep the buffer until {@link #stopBuffering()} time.
168     * <p>
169     * {@inheritDoc}
170     */
171    @Override
172    public void close() throws IOException {
173        if (streaming) {
174            outputStream.close();
175        } else {
176            needsClose = true;
177        }
178
179    }
180
181    /**
182     * Writes any buffered data to the underlying {@link OutputStream} and from now on don't buffer anymore.
183     */
184    public void stopBuffering() throws IOException {
185        threadLocal.remove();
186        if (streaming) {
187            return;
188        }
189        if (writer != null) {
190            writer.flush(); // don't close, streaming needs it
191        }
192        streaming = true;
193        if (log.isDebugEnabled()) {
194            long len;
195            if (memory != null) {
196                len = memory.size();
197            } else if (file != null) {
198                len = tmp.length();
199            } else {
200                len = 0;
201            }
202            log.debug("buffered bytes: " + len);
203        }
204        boolean clientAbort = false;
205        try {
206            if (memory != null) {
207                memory.writeTo(outputStream);
208            } else if (file != null) {
209                try {
210                    try {
211                        file.flush();
212                    } finally {
213                        file.close();
214                    }
215                    FileInputStream in = new FileInputStream(tmp);
216                    try {
217                        IOUtils.copy(in, outputStream);
218                    } catch (IOException e) {
219                        if (DownloadHelper.isClientAbortError(e)) {
220                            DownloadHelper.logClientAbort(e);
221                            clientAbort = true;
222                        } else {
223                            throw e;
224                        }
225                    } finally {
226                        in.close();
227                    }
228                } finally {
229                    tmp.delete();
230                }
231            }
232        } catch (IOException e) {
233            if (DownloadHelper.isClientAbortError(e)) {
234                if (!clientAbort) {
235                    DownloadHelper.logClientAbort(e);
236                    clientAbort = true;
237                }
238            } else {
239                throw e;
240            }
241        } finally {
242            memory = null;
243            file = null;
244            tmp = null;
245            try {
246                if (needsFlush) {
247                    outputStream.flush();
248                }
249            } catch (IOException e) {
250                if (DownloadHelper.isClientAbortError(e)) {
251                    if (!clientAbort) {
252                        DownloadHelper.logClientAbort(e);
253                    }
254                } else {
255                    throw e;
256                }
257            } finally {
258                if (needsClose) {
259                    outputStream.close();
260                }
261            }
262        }
263    }
264
265    /**
266     * Tells the given {@link OutputStream} to stop buffering (if it was).
267     */
268    public static void stopBuffering(OutputStream out) throws IOException {
269        if (out instanceof BufferingServletOutputStream) {
270            ((BufferingServletOutputStream) out).stopBuffering();
271        }
272    }
273
274    /**
275     * Stop buffering the {@link OutputStream} for this thread (if it was).
276     *
277     * @since 5.5 (HF01)
278     */
279    public static void stopBufferingThread() throws IOException {
280        @SuppressWarnings("resource")
281        BufferingServletOutputStream out = threadLocal.get();
282        if (out != null) {
283            out.stopBuffering();
284        }
285    }
286
287}