001/* 002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.io.download; 018 019import java.io.BufferedOutputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.PrintWriter; 028 029import javax.servlet.ServletOutputStream; 030 031import org.apache.commons.io.IOUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * A {@link ServletOutputStream} that buffers everything until {@link #stopBuffering()} is called. 037 * <p> 038 * There may only be one such instance per thread. 039 * <p> 040 * Buffering is done first in memory, then on disk if the size exceeds a limit. 041 */ 042public class BufferingServletOutputStream extends ServletOutputStream { 043 044 private static final Log log = LogFactory.getLog(BufferingServletOutputStream.class); 045 046 /** Initial memory buffer size. */ 047 public static final int INITIAL = 4 * 1024; // 4 KB 048 049 /** Maximum memory buffer size, after this a file is used. */ 050 public static final int MAX = 64 * 1024; // 64 KB 051 052 /** Used for 0-length writes. */ 053 private final static OutputStream EMPTY = new ByteArrayOutputStream(0); 054 055 protected static ThreadLocal<BufferingServletOutputStream> threadLocal = new ThreadLocal<BufferingServletOutputStream>(); 056 057 /** Have we stopped buffering to pass writes directly to the output stream. */ 058 protected boolean streaming; 059 060 protected boolean needsFlush; 061 062 protected boolean needsClose; 063 064 protected final OutputStream outputStream; 065 066 protected PrintWriter writer; 067 068 protected ByteArrayOutputStream memory; 069 070 protected OutputStream file; 071 072 protected File tmp; 073 074 /** 075 * A {@link ServletOutputStream} wrapper that buffers everything until {@link #stopBuffering()} is called. 076 * <p> 077 * {@link #stopBuffering()} <b>MUST</b> be called in a {@code finally} statement in order for resources to be closed 078 * properly. 079 * 080 * @param outputStream the underlying output stream 081 */ 082 public BufferingServletOutputStream(OutputStream outputStream) { 083 this.outputStream = outputStream; 084 threadLocal.set(this); 085 } 086 087 public PrintWriter getWriter() { 088 if (writer == null) { 089 writer = new PrintWriter(new OutputStreamWriter(this)); 090 } 091 return writer; 092 } 093 094 /** 095 * Finds the proper output stream where we can write {@code len} bytes. 096 */ 097 protected OutputStream getOutputStream(int len) throws IOException { 098 if (streaming) { 099 return outputStream; 100 } 101 if (len == 0) { 102 return EMPTY; 103 } 104 if (file != null) { 105 // already to file 106 return file; 107 } 108 int total; 109 if (memory == null) { 110 // no buffer yet 111 if (len <= MAX) { 112 memory = new ByteArrayOutputStream(Math.max(INITIAL, len)); 113 return memory; 114 } 115 total = len; 116 } else { 117 total = memory.size() + len; 118 } 119 if (total <= MAX) { 120 return memory; 121 } else { 122 // switch to a file 123 createTempFile(); 124 file = new BufferedOutputStream(new FileOutputStream(tmp)); 125 if (memory != null) { 126 memory.writeTo(file); 127 memory = null; 128 } 129 return file; 130 } 131 } 132 133 protected void createTempFile() throws IOException { 134 tmp = File.createTempFile("nxout", null); 135 } 136 137 @Override 138 public void write(int b) throws IOException { 139 getOutputStream(1).write(b); 140 } 141 142 @Override 143 public void write(byte[] b) throws IOException { 144 getOutputStream(b.length).write(b); 145 } 146 147 @Override 148 public void write(byte[] b, int off, int len) throws IOException { 149 getOutputStream(len).write(b, off, len); 150 } 151 152 /** 153 * This implementation does nothing, we still want to keep buffering and not flush. 154 * <p> 155 * {@inheritDoc} 156 */ 157 @Override 158 public void flush() throws IOException { 159 if (streaming) { 160 outputStream.flush(); 161 } else { 162 needsFlush = true; 163 } 164 } 165 166 /** 167 * This implementation does nothing, we still want to keep the buffer until {@link #stopBuffering()} time. 168 * <p> 169 * {@inheritDoc} 170 */ 171 @Override 172 public void close() throws IOException { 173 if (streaming) { 174 outputStream.close(); 175 } else { 176 needsClose = true; 177 } 178 179 } 180 181 /** 182 * Writes any buffered data to the underlying {@link OutputStream} and from now on don't buffer anymore. 183 */ 184 public void stopBuffering() throws IOException { 185 threadLocal.remove(); 186 if (streaming) { 187 return; 188 } 189 if (writer != null) { 190 writer.flush(); // don't close, streaming needs it 191 } 192 streaming = true; 193 if (log.isDebugEnabled()) { 194 long len; 195 if (memory != null) { 196 len = memory.size(); 197 } else if (file != null) { 198 len = tmp.length(); 199 } else { 200 len = 0; 201 } 202 log.debug("buffered bytes: " + len); 203 } 204 boolean clientAbort = false; 205 try { 206 if (memory != null) { 207 memory.writeTo(outputStream); 208 } else if (file != null) { 209 try { 210 try { 211 file.flush(); 212 } finally { 213 file.close(); 214 } 215 FileInputStream in = new FileInputStream(tmp); 216 try { 217 IOUtils.copy(in, outputStream); 218 } catch (IOException e) { 219 if (DownloadHelper.isClientAbortError(e)) { 220 DownloadHelper.logClientAbort(e); 221 clientAbort = true; 222 } else { 223 throw e; 224 } 225 } finally { 226 in.close(); 227 } 228 } finally { 229 tmp.delete(); 230 } 231 } 232 } catch (IOException e) { 233 if (DownloadHelper.isClientAbortError(e)) { 234 if (!clientAbort) { 235 DownloadHelper.logClientAbort(e); 236 clientAbort = true; 237 } 238 } else { 239 throw e; 240 } 241 } finally { 242 memory = null; 243 file = null; 244 tmp = null; 245 try { 246 if (needsFlush) { 247 outputStream.flush(); 248 } 249 } catch (IOException e) { 250 if (DownloadHelper.isClientAbortError(e)) { 251 if (!clientAbort) { 252 DownloadHelper.logClientAbort(e); 253 } 254 } else { 255 throw e; 256 } 257 } finally { 258 if (needsClose) { 259 outputStream.close(); 260 } 261 } 262 } 263 } 264 265 /** 266 * Tells the given {@link OutputStream} to stop buffering (if it was). 267 */ 268 public static void stopBuffering(OutputStream out) throws IOException { 269 if (out instanceof BufferingServletOutputStream) { 270 ((BufferingServletOutputStream) out).stopBuffering(); 271 } 272 } 273 274 /** 275 * Stop buffering the {@link OutputStream} for this thread (if it was). 276 * 277 * @since 5.5 (HF01) 278 */ 279 public static void stopBufferingThread() throws IOException { 280 @SuppressWarnings("resource") 281 BufferingServletOutputStream out = threadLocal.get(); 282 if (out != null) { 283 out.stopBuffering(); 284 } 285 } 286 287}