netty 文件上传
Contents
netty 文件上传
http://blog.csdn.net/mcpang/article/details/41140409
【初学与研发之NETTY】netty4之文件上传
标签: netty4文件上传
2014-11-15 04:35 6005人阅读 评论(29) 收藏 举报
分类:
JAVA (48) Netty (6)
版权声明: 本文为博主原创文章,未经博主允许不得转载。
客户端:
print?
public class UpLoadClient {
private StringBuffer resultBuffer = new StringBuffer();
private EventLoopGroup group = null;
private HttpDataFactory factory = null;
private Object waitObject = new Object();
private ChannelFuture future = null;
public UpLoadClient(String host, int port) throws Exception {
this.group = new NioEventLoopGroup();
this.factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
Bootstrap b = new Bootstrap();
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_SNDBUF, 1048576*200);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.group(group).channel(NioSocketChannel.class);
b.handler(new UpLoadClientIntializer());
this.future = b.connect(host, port).sync();
}
public void uploadFile(String path) {
if(path == null) {
System.out.println(“上传文件的路径不能为null…");
return;
}
File file = new File(path);
if (!file.canRead()) {
System.out.println(file.getName() + “不可读…");
return;
}
if (file.isHidden() || !file.isFile()) {
System.out.println(file.getName() + “不存在…");
return;
}
try {
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, “");
HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, false);
bodyRequestEncoder.addBodyAttribute(“getform”, “POST”);
bodyRequestEncoder.addBodyFileUpload(“myfile”, file, “application/x-zip-compressed”, false);
List bodylist = bodyRequestEncoder.getBodyListAttributes();
if (bodylist == null) {
System.out.println(“请求体不存在…");
return;
}
HttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, file.getName());
HttpPostRequestEncoder bodyRequestEncoder2 = new HttpPostRequestEncoder(factory, request2, true);
bodyRequestEncoder2.setBodyHttpDatas(bodylist);
bodyRequestEncoder2.finalizeRequest();
Channel channel = this.future.channel();
if(channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(request2);
if (bodyRequestEncoder2.isChunked()) {
channel.writeAndFlush(bodyRequestEncoder2).awaitUninterruptibly();
}
bodyRequestEncoder2.cleanFiles();
}
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public void shutdownClient() {
// 等待数据的传输通道关闭
group.shutdownGracefully();
factory.cleanAllHttpDatas();
}
public boolean isCompleted() {
while(waitObject != null) {
//当通道处于开通和活动时,处于等待
}
if(resultBuffer.length() > 0) {
if(“200”.equals(resultBuffer.toString())) {
resultBuffer.setLength(0);
return true;
}
}
return false;
}
private class UpLoadClientIntializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“decoder”, new HttpResponseDecoder());
pipeline.addLast(“encoder”, new HttpRequestEncoder());
pipeline.addLast(“chunkedWriter”, new ChunkedWriteHandler());
pipeline.addLast(“dispatcher”, new UpLoadClientHandler());
}
}
private class UpLoadClientHandler extends SimpleChannelInboundHandler {
private boolean readingChunks = false;
private int succCode = 200;
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
succCode = response.getStatus().code();
if (succCode == 200 && HttpHeaders.isTransferEncodingChunked(response)) {
readingChunks = true;
}
}
if (msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg;
System.out.println("【响应】“+succCode+"»"+chunk.content().toString(CharsetUtil.UTF_8));
if (chunk instanceof LastHttpContent) {
readingChunks = false;
}
}
if (!readingChunks) {
resultBuffer.append(succCode);
ctx.channel().close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
waitObject = null;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
resultBuffer.setLength(0);
resultBuffer.append(500);
System.out.println(“管道异常: " + cause.getMessage());
cause.printStackTrace();
ctx.channel().close();
}
}
}
服务端:
print?
public class DBServer extends Thread {
//单实例
private static DBServer dbServer = null;
//定时调度的周期实例
private static Scheduler sched = null;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
//创建实例
public static DBServer newBuild() {
if(dbServer == null) {
dbServer = new DBServer();
}
return dbServer;
}
public void run() {
try {
startServer();
} catch(Exception e) {
System.out.println(“数据服务启动出现异常: “+e.toString());
e.printStackTrace();
}
}
private void startServer() throws Exception {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.SO_TIMEOUT, 60000);
b.option(ChannelOption.SO_SNDBUF, 1048576*200);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.channel(NioServerSocketChannel.class);
b.childHandler(new DBServerInitializer());
// 服务器绑定端口监听
ChannelFuture f = b.bind(DBConfig.curHost.getIp(), DBConfig.curHost.getPort()).sync();
System.out.println(“数据服务: “+DBConfig.curHost.getServerHost()+"启动完成…");
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
print?
public class DBServerHandler extends SimpleChannelInboundHandler {
private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
private String uri = null;
private HttpRequest request = null;
private HttpPostRequestDecoder decoder;
//message、download、upload
private String type = “message”;
public static final String HTTP_DATE_FORMAT = “EEE, dd MMM yyyy HH:mm:ss zzz”;
public static final String HTTP_DATE_GMT_TIMEZONE = “GMT”;
public static final int HTTP_CACHE_SECONDS = 60;
static {
DiskFileUpload.baseDirectory = DBConfig.curHost.getZipPath();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
uri = sanitizeUri(request.getUri());
if (request.getMethod() == HttpMethod.POST) {
if (decoder != null) {
decoder.cleanFiles();
decoder = null;
}
try {
decoder = new HttpPostRequestDecoder(factory, request);
} catch (Exception e) {
e.printStackTrace();
writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
ctx.channel().close();
return;
}
}
}
if (decoder != null && msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg;
try {
decoder.offer(chunk);
} catch (Exception e) {
e.printStackTrace();
writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString());
ctx.channel().close();
return;
}
readHttpDataChunkByChunk();
if (chunk instanceof LastHttpContent) {
writeResponse(ctx.channel(), HttpResponseStatus.OK, “");
reset();
return;
}
}
}
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, “UTF-8”);
} catch(UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, “ISO-8859-1”);
} catch(UnsupportedEncodingException e1) {
throw new Error();
}
}
return uri;
}
private void reset() {
request = null;
//销毁decoder释放所有的资源
decoder.destroy();
decoder = null;
}
/**
通过chunk读取request,获取chunk数据
@throws IOException
*/
private void readHttpDataChunkByChunk() throws IOException {
try {
while (decoder.hasNext()) {
InterfaceHttpData data = decoder.next();
if (data != null) {
try {
writeHttpData(data);
} finally {
data.release();
}
}
}
} catch (EndOfDataDecoderException e1) {
System.out.println(“end chunk”);
}
}
private void writeHttpData(InterfaceHttpData data) throws IOException {
if (data.getHttpDataType() == HttpDataType.FileUpload) {
FileUpload fileUpload = (FileUpload) data;
if (fileUpload.isCompleted()) {
StringBuffer fileNameBuf = new StringBuffer();
fileNameBuf.append(DiskFileUpload.baseDirectory)
.append(uri);
fileUpload.renameTo(new File(fileNameBuf.toString()));
}
} else if (data.getHttpDataType() == HttpDataType.Attribute) {
Attribute attribute = (Attribute) data;
if(CommonParam.DOWNLOAD_COLLECTION.equals(attribute.getName())) {
SynchMessageWatcher.newBuild().getMsgQueue().add(attribute.getValue());
}
}
}
private void writeDownLoadResponse(ChannelHandlerContext ctx, RandomAccessFile raf, File file) throws Exception {
long fileLength = raf.length();
//判断是否关闭请求响应连接
boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION))
|| request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
&& !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION));
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders.setContentLength(response, fileLength);
setContentHeader(response, file);
if (!close) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
System.out.println(“读取大小: “+fileLength);
final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, 1000);
ChannelFuture writeFuture = ctx.write(region, ctx.newProgressivePromise());
writeFuture.addListener(new ChannelProgressiveFutureListener() {
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) {
System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}
public void operationComplete(ChannelProgressiveFuture future) {
}
});
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if(close) {
raf.close();
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
private static void setContentHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
// Date header
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(CACHE_CONTROL, “private, max-age=” + HTTP_CACHE_SECONDS);
response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(file.lastModified())));
}
private void writeResponse(Channel channel, HttpResponseStatus httpResponseStatus, String returnMsg) {
String resultStr = “节点【“+DBConfig.curHost.getServerHost()+”】";
if(httpResponseStatus.code() == HttpResponseStatus.OK.code()) {
resultStr += “正常接收”;
if(“message”.equals(type)) {
resultStr += “字符串。";
} else if(“upload”.equals(type)) {
resultStr += “上传文件。";
} else if(“download”.equals(type)) {
resultStr += “下载文件名。";
}
} else if(httpResponseStatus.code() == HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) {
resultStr += “接收”;
if(“message”.equals(type)) {
resultStr += “字符串”;
} else if(“upload”.equals(type)) {
resultStr += “上传文件”;
} else if(“download”.equals(type)) {
resultStr += “下载文件名”;
}
resultStr += “的过程中出现异常: “+returnMsg;
}
//将请求响应的内容转换成ChannelBuffer.
ByteBuf buf = copiedBuffer(resultStr, CharsetUtil.UTF_8);
//判断是否关闭请求响应连接
boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION))
|| request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
&& !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION));
//构建请求响应对象
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, httpResponseStatus, buf);
response.headers().set(CONTENT_TYPE, “text/plain; charset=UTF-8”);
if (!close) {
//若该请求响应是最后的响应,则在响应头中没有必要添加’Content-Length’
response.headers().set(CONTENT_LENGTH, buf.readableBytes());
}
//发送请求响应
ChannelFuture future = channel.writeAndFlush(response);
//发送请求响应操作结束后关闭连接
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.getCause().printStackTrace();
writeResponse(ctx.channel(), HttpResponseStatus.INTERNAL_SERVER_ERROR, “数据文件通过过程中出现异常: “+cause.getMessage().toString());
ctx.channel().close();
}
}
print?
public class DBServerInitializer extends ChannelInitializer {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“decoder”, new HttpRequestDecoder());
pipeline.addLast(“encoder”, new HttpResponseEncoder());
pipeline.addLast(“deflater”, new HttpContentCompressor());
pipeline.addLast(“handler”, new DBServerHandler());
}
}
Author -
LastMod 2016-05-24