001/*
002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.io.download;
020
021import java.io.BufferedOutputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.OutputStreamWriter;
029import java.io.PrintWriter;
030
031import javax.servlet.ServletOutputStream;
032
033import org.apache.commons.io.IOUtils;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036
037import org.nuxeo.runtime.api.Framework;
038
039/**
040 * A {@link ServletOutputStream} that buffers everything until {@link #stopBuffering()} is called.
041 * <p>
042 * There may only be one such instance per thread.
043 * <p>
044 * Buffering is done first in memory, then on disk if the size exceeds a limit.
045 */
046public class BufferingServletOutputStream extends ServletOutputStream {
047
048    private static final Log log = LogFactory.getLog(BufferingServletOutputStream.class);
049
050    /** Initial memory buffer size. */
051    public static final int INITIAL = 4 * 1024; // 4 KB
052
053    /** Maximum memory buffer size, after this a file is used. */
054    public static final int MAX = 64 * 1024; // 64 KB
055
056    /** Used for 0-length writes. */
057    private final static OutputStream EMPTY = new ByteArrayOutputStream(0);
058
059    protected static ThreadLocal<BufferingServletOutputStream> threadLocal = new ThreadLocal<>();
060
061    /** Have we stopped buffering to pass writes directly to the output stream. */
062    protected boolean streaming;
063
064    protected boolean needsFlush;
065
066    protected boolean needsClose;
067
068    protected final OutputStream outputStream;
069
070    protected PrintWriter writer;
071
072    protected ByteArrayOutputStream memory;
073
074    protected OutputStream file;
075
076    protected File tmp;
077
078    /**
079     * A {@link ServletOutputStream} wrapper that buffers everything until {@link #stopBuffering()} is called.
080     * <p>
081     * {@link #stopBuffering()} <b>MUST</b> be called in a {@code finally} statement in order for resources to be closed
082     * properly.
083     *
084     * @param outputStream the underlying output stream
085     */
086    public BufferingServletOutputStream(OutputStream outputStream) {
087        this.outputStream = outputStream;
088        threadLocal.set(this);
089    }
090
091    public PrintWriter getWriter() {
092        if (writer == null) {
093            writer = new PrintWriter(new OutputStreamWriter(this));
094        }
095        return writer;
096    }
097
098    /**
099     * Finds the proper output stream where we can write {@code len} bytes.
100     */
101    protected OutputStream getOutputStream(int len) throws IOException {
102        if (streaming) {
103            return outputStream;
104        }
105        if (len == 0) {
106            return EMPTY;
107        }
108        if (file != null) {
109            // already to file
110            return file;
111        }
112        int total;
113        if (memory == null) {
114            // no buffer yet
115            if (len <= MAX) {
116                memory = new ByteArrayOutputStream(Math.max(INITIAL, len));
117                return memory;
118            }
119            total = len;
120        } else {
121            total = memory.size() + len;
122        }
123        if (total <= MAX) {
124            return memory;
125        } else {
126            // switch to a file
127            createTempFile();
128            file = new BufferedOutputStream(new FileOutputStream(tmp));
129            if (memory != null) {
130                memory.writeTo(file);
131                memory = null;
132            }
133            return file;
134        }
135    }
136
137    protected void createTempFile() throws IOException {
138        tmp = Framework.createTempFile("nxout", null);
139    }
140
141    @Override
142    public void write(int b) throws IOException {
143        getOutputStream(1).write(b);
144    }
145
146    @Override
147    public void write(byte[] b) throws IOException {
148        getOutputStream(b.length).write(b);
149    }
150
151    @Override
152    public void write(byte[] b, int off, int len) throws IOException {
153        getOutputStream(len).write(b, off, len);
154    }
155
156    /**
157     * This implementation does nothing, we still want to keep buffering and not flush.
158     * <p>
159     * {@inheritDoc}
160     */
161    @Override
162    public void flush() throws IOException {
163        if (streaming) {
164            outputStream.flush();
165        } else {
166            needsFlush = true;
167        }
168    }
169
170    /**
171     * This implementation does nothing, we still want to keep the buffer until {@link #stopBuffering()} time.
172     * <p>
173     * {@inheritDoc}
174     */
175    @Override
176    public void close() throws IOException {
177        if (streaming) {
178            outputStream.close();
179        } else {
180            needsClose = true;
181        }
182
183    }
184
185    /**
186     * Writes any buffered data to the underlying {@link OutputStream} and from now on don't buffer anymore.
187     */
188    public void stopBuffering() throws IOException {
189        threadLocal.remove();
190        if (streaming) {
191            return;
192        }
193        if (writer != null) {
194            writer.flush(); // don't close, streaming needs it
195        }
196        streaming = true;
197        if (log.isDebugEnabled()) {
198            long len;
199            if (memory != null) {
200                len = memory.size();
201            } else if (file != null) {
202                len = tmp.length();
203            } else {
204                len = 0;
205            }
206            log.debug("buffered bytes: " + len);
207        }
208        boolean clientAbort = false;
209        try {
210            if (memory != null) {
211                memory.writeTo(outputStream);
212            } else if (file != null) {
213                try {
214                    try {
215                        file.flush();
216                    } finally {
217                        file.close();
218                    }
219                    FileInputStream in = new FileInputStream(tmp);
220                    try {
221                        IOUtils.copy(in, outputStream);
222                    } catch (IOException e) {
223                        if (DownloadHelper.isClientAbortError(e)) {
224                            DownloadHelper.logClientAbort(e);
225                            clientAbort = true;
226                        } else {
227                            throw e;
228                        }
229                    } finally {
230                        in.close();
231                    }
232                } finally {
233                    tmp.delete();
234                }
235            }
236        } catch (IOException e) {
237            if (DownloadHelper.isClientAbortError(e)) {
238                if (!clientAbort) {
239                    DownloadHelper.logClientAbort(e);
240                    clientAbort = true;
241                }
242            } else {
243                throw e;
244            }
245        } finally {
246            memory = null;
247            file = null;
248            tmp = null;
249            try {
250                if (needsFlush) {
251                    outputStream.flush();
252                }
253            } catch (IOException e) {
254                if (DownloadHelper.isClientAbortError(e)) {
255                    if (!clientAbort) {
256                        DownloadHelper.logClientAbort(e);
257                    }
258                } else {
259                    throw e;
260                }
261            } finally {
262                if (needsClose) {
263                    outputStream.close();
264                }
265            }
266        }
267    }
268
269    /**
270     * Tells the given {@link OutputStream} to stop buffering (if it was).
271     */
272    public static void stopBuffering(OutputStream out) throws IOException {
273        if (out instanceof BufferingServletOutputStream) {
274            ((BufferingServletOutputStream) out).stopBuffering();
275        }
276    }
277
278    /**
279     * Stop buffering the {@link OutputStream} for this thread (if it was).
280     *
281     * @since 5.5 (HF01)
282     */
283    public static void stopBufferingThread() throws IOException {
284        @SuppressWarnings("resource")
285        BufferingServletOutputStream out = threadLocal.get();
286        if (out != null) {
287            out.stopBuffering();
288        }
289    }
290
291}