`
zhanghongliang_cyj
  • 浏览: 49154 次
  • 性别: Icon_minigender_1
  • 来自: 邯郸
社区版块
存档分类
最新评论

java多线程复制

 
阅读更多
java 实现多线程复制功能,支持中断后继续。

CopyTask.java

package copyFileTest;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

public class CopyTask implements Callable<String> {
	// 缓冲区大小
	private int bufferSize = 10240;

	private CountDownLatch latch;
	private RandomAccessFile file = null;
	private RandomAccessFile tempFile = null;
	private RandomAccessFile inputFile = null;

	private int id;
	private long startPosition;
	private long endPosition;
	private long currentPosition;

	public CopyTask(File file, File tmpFile, File inputFile, int id,
			CountDownLatch latch, int bufferSize) {
		try {
			this.file = new RandomAccessFile(file, "rw");
			this.tempFile = new RandomAccessFile(tmpFile, "rw");
			this.inputFile = new RandomAccessFile(inputFile, "r");
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}
		this.id = id;
		this.latch = latch;
		this.bufferSize = bufferSize;
	}

	public String call() throws Exception {

		try {
			tempFile.seek((id - 1) * 28);
			tempFile.readInt();
			this.startPosition = tempFile.readLong();
			this.endPosition = tempFile.readLong();
			this.currentPosition = tempFile.readLong();
		} catch (IOException e) {
			e.printStackTrace();
		}

		System.out.println("Thread " + id + " begin!");

		while (true) {
			try {
				tempFile.seek(id * 28 - 8);
				if (currentPosition < endPosition) {
					inputFile.seek(currentPosition);

					file.seek(currentPosition);

					int len = 0;
					byte[] b = new byte[bufferSize];
					while ((len = inputFile.read(b)) != -1) {
						file.write(b, 0, len);

						currentPosition += len;
						tempFile.seek(id * 28 - 8);
						tempFile.writeLong(currentPosition);

						if (currentPosition > endPosition) {
							break;
						}
					}
					System.out.println("Thread " + id + " startPosition="
							+ startPosition + ",endPosition=" + endPosition
							+ ",currentPosition=" + currentPosition);

				}
				System.out.println("Thread " + id + " finished.");
				break;
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				file.close();
				tempFile.close();
				inputFile.close();
			}
		}
		latch.countDown();
		return "finish";
	}
}



CopyFileTest.java
package copyFileTest;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CopyFileTest {
	// 线程数
	private int poolSize = 10;
	// 每个线程的缓冲区大小
	public int bufferSize = 10240;
	private int loopSize = 10000; // 每个线程循环读的次数

	/**
	 * 复制文件
	 * 
	 * @param fileSource
	 *            要复制的文件
	 * @param fileTarget
	 *            要复制到的文件路径
	 */
	public void copy(String fileSource, String fileTarget) {
		long startTime = System.currentTimeMillis();
		File file = null;
		File tempFile = null;
		File targetFile = null;
		CountDownLatch latch;
		ExecutorService pool = Executors.newCachedThreadPool();
		long contentLength = 0;
		long threadLength = 0;
		try {
			new File(fileTarget).mkdirs();
			file = new File(fileSource);
			targetFile = new File(fileTarget + File.separator + file.getName());
			tempFile = new File(fileTarget + File.separator + "_temp");

			// 得到content的长度
			contentLength = file.length();
			poolSize = this.getPoolSize(contentLength);
			System.out.println("total length=" + contentLength
					+ " use ThreadSize:" + poolSize);

			threadLength = loopSize * bufferSize;

			if (file.exists() && tempFile.exists()) {
				// 如果文件已存在,根据临时文件中记载的线程数量,继续上次的任务
				int tmpFileLength = (int) tempFile.length() / 28 + 1;
				latch = new CountDownLatch(tmpFileLength);
				for (int i = 1; i < tmpFileLength; i++) {
					pool.submit(new CopyTask(targetFile, tempFile, file, i,
							latch, bufferSize));
				}
			} else {
				// 如果下载的目标文件不存在,则创建新文件
				latch = new CountDownLatch(poolSize);
				targetFile.createNewFile();
				tempFile.createNewFile();
				DataOutputStream os = new DataOutputStream(
						new FileOutputStream(tempFile));
				for (int i = 0; i < this.poolSize; i++) {
					os.writeInt(i + 1);
					os.writeLong(i * threadLength);
					if (i == this.poolSize - 1) {// 最后一个线程的结束位置应为文件末端
						os.writeLong(contentLength);
					} else {
						os.writeLong((i + 1) * threadLength);
					}
					os.writeLong(i * threadLength);
					pool.submit(new CopyTask(targetFile, tempFile, file, i + 1,
							latch, bufferSize));
				}
				os.close();
			}
			// 等待下载任务完成
			latch.await();
			// 删除临时文件
			System.out.println("删除临时文件" + tempFile.delete());
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			pool.shutdown();
			System.out.println("完成复制,所用时:"
					+ (System.currentTimeMillis() - startTime));
		}
	}

	/**
	 * 根据总大小 缓存区与每个线程默认循环次数,判断用几个线程
	 * 
	 * @param contentLength
	 * @return
	 * 
	 */
	public int getPoolSize(long contentLength) {
		int poolSize = 1;
		BigDecimal contentBig = new BigDecimal(contentLength);
		BigDecimal bufferBig = new BigDecimal(bufferSize);
		BigDecimal loopBig = new BigDecimal(loopSize);

		poolSize = contentBig.divide(bufferBig.multiply(loopBig), 0,
				BigDecimal.ROUND_HALF_UP).intValue();

		return poolSize;
	}

	public static void main(String[] args) {
		String fileSource = "D:\\add.rar"; // 要复制的文件
		String fileTarget = "D:\\copyTest";
		new CopyFileTest().copy(fileSource, fileTarget);
	}
}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics