001/* 002 * (C) Copyright 2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.io.download; 020 021import java.io.BufferedOutputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.OutputStreamWriter; 029import java.io.PrintWriter; 030 031import javax.servlet.ServletOutputStream; 032 033import org.apache.commons.io.IOUtils; 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036 037import org.nuxeo.runtime.api.Framework; 038 039/** 040 * A {@link ServletOutputStream} that buffers everything until {@link #stopBuffering()} is called. 041 * <p> 042 * There may only be one such instance per thread. 043 * <p> 044 * Buffering is done first in memory, then on disk if the size exceeds a limit. 045 */ 046public class BufferingServletOutputStream extends ServletOutputStream { 047 048 private static final Log log = LogFactory.getLog(BufferingServletOutputStream.class); 049 050 /** Initial memory buffer size. */ 051 public static final int INITIAL = 4 * 1024; // 4 KB 052 053 /** Maximum memory buffer size, after this a file is used. */ 054 public static final int MAX = 64 * 1024; // 64 KB 055 056 /** Used for 0-length writes. */ 057 private final static OutputStream EMPTY = new ByteArrayOutputStream(0); 058 059 protected static ThreadLocal<BufferingServletOutputStream> threadLocal = new ThreadLocal<>(); 060 061 /** Have we stopped buffering to pass writes directly to the output stream. */ 062 protected boolean streaming; 063 064 protected boolean needsFlush; 065 066 protected boolean needsClose; 067 068 protected final OutputStream outputStream; 069 070 protected PrintWriter writer; 071 072 protected ByteArrayOutputStream memory; 073 074 protected OutputStream file; 075 076 protected File tmp; 077 078 /** 079 * A {@link ServletOutputStream} wrapper that buffers everything until {@link #stopBuffering()} is called. 080 * <p> 081 * {@link #stopBuffering()} <b>MUST</b> be called in a {@code finally} statement in order for resources to be closed 082 * properly. 083 * 084 * @param outputStream the underlying output stream 085 */ 086 public BufferingServletOutputStream(OutputStream outputStream) { 087 this.outputStream = outputStream; 088 threadLocal.set(this); 089 } 090 091 public PrintWriter getWriter() { 092 if (writer == null) { 093 writer = new PrintWriter(new OutputStreamWriter(this)); 094 } 095 return writer; 096 } 097 098 /** 099 * Finds the proper output stream where we can write {@code len} bytes. 100 */ 101 protected OutputStream getOutputStream(int len) throws IOException { 102 if (streaming) { 103 return outputStream; 104 } 105 if (len == 0) { 106 return EMPTY; 107 } 108 if (file != null) { 109 // already to file 110 return file; 111 } 112 int total; 113 if (memory == null) { 114 // no buffer yet 115 if (len <= MAX) { 116 memory = new ByteArrayOutputStream(Math.max(INITIAL, len)); 117 return memory; 118 } 119 total = len; 120 } else { 121 total = memory.size() + len; 122 } 123 if (total <= MAX) { 124 return memory; 125 } else { 126 // switch to a file 127 createTempFile(); 128 file = new BufferedOutputStream(new FileOutputStream(tmp)); 129 if (memory != null) { 130 memory.writeTo(file); 131 memory = null; 132 } 133 return file; 134 } 135 } 136 137 protected void createTempFile() throws IOException { 138 tmp = Framework.createTempFile("nxout", null); 139 } 140 141 @Override 142 public void write(int b) throws IOException { 143 getOutputStream(1).write(b); 144 } 145 146 @Override 147 public void write(byte[] b) throws IOException { 148 getOutputStream(b.length).write(b); 149 } 150 151 @Override 152 public void write(byte[] b, int off, int len) throws IOException { 153 getOutputStream(len).write(b, off, len); 154 } 155 156 /** 157 * This implementation does nothing, we still want to keep buffering and not flush. 158 * <p> 159 * {@inheritDoc} 160 */ 161 @Override 162 public void flush() throws IOException { 163 if (streaming) { 164 outputStream.flush(); 165 } else { 166 needsFlush = true; 167 } 168 } 169 170 /** 171 * This implementation does nothing, we still want to keep the buffer until {@link #stopBuffering()} time. 172 * <p> 173 * {@inheritDoc} 174 */ 175 @Override 176 public void close() throws IOException { 177 if (streaming) { 178 outputStream.close(); 179 } else { 180 needsClose = true; 181 } 182 183 } 184 185 /** 186 * Writes any buffered data to the underlying {@link OutputStream} and from now on don't buffer anymore. 187 */ 188 public void stopBuffering() throws IOException { 189 threadLocal.remove(); 190 if (streaming) { 191 return; 192 } 193 if (writer != null) { 194 writer.flush(); // don't close, streaming needs it 195 } 196 streaming = true; 197 if (log.isDebugEnabled()) { 198 long len; 199 if (memory != null) { 200 len = memory.size(); 201 } else if (file != null) { 202 len = tmp.length(); 203 } else { 204 len = 0; 205 } 206 log.debug("buffered bytes: " + len); 207 } 208 boolean clientAbort = false; 209 try { 210 if (memory != null) { 211 memory.writeTo(outputStream); 212 } else if (file != null) { 213 try { 214 try { 215 file.flush(); 216 } finally { 217 file.close(); 218 } 219 FileInputStream in = new FileInputStream(tmp); 220 try { 221 IOUtils.copy(in, outputStream); 222 } catch (IOException e) { 223 if (DownloadHelper.isClientAbortError(e)) { 224 DownloadHelper.logClientAbort(e); 225 clientAbort = true; 226 } else { 227 throw e; 228 } 229 } finally { 230 in.close(); 231 } 232 } finally { 233 tmp.delete(); 234 } 235 } 236 } catch (IOException e) { 237 if (DownloadHelper.isClientAbortError(e)) { 238 if (!clientAbort) { 239 DownloadHelper.logClientAbort(e); 240 clientAbort = true; 241 } 242 } else { 243 throw e; 244 } 245 } finally { 246 memory = null; 247 file = null; 248 tmp = null; 249 try { 250 if (needsFlush) { 251 outputStream.flush(); 252 } 253 } catch (IOException e) { 254 if (DownloadHelper.isClientAbortError(e)) { 255 if (!clientAbort) { 256 DownloadHelper.logClientAbort(e); 257 } 258 } else { 259 throw e; 260 } 261 } finally { 262 if (needsClose) { 263 outputStream.close(); 264 } 265 } 266 } 267 } 268 269 /** 270 * Tells the given {@link OutputStream} to stop buffering (if it was). 271 */ 272 public static void stopBuffering(OutputStream out) throws IOException { 273 if (out instanceof BufferingServletOutputStream) { 274 ((BufferingServletOutputStream) out).stopBuffering(); 275 } 276 } 277 278 /** 279 * Stop buffering the {@link OutputStream} for this thread (if it was). 280 * 281 * @since 5.5 (HF01) 282 */ 283 public static void stopBufferingThread() throws IOException { 284 @SuppressWarnings("resource") 285 BufferingServletOutputStream out = threadLocal.get(); 286 if (out != null) { 287 out.stopBuffering(); 288 } 289 } 290 291}