国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

基于haddop的HDFS和Excel開源庫POI導出大數據報表(一)

luffyZh / 1001人閱讀

摘要:說明這里用到的項目都是基于的項目。但同時,它和其他的分布式文件系統的區別也是很明顯的。能提供高吞吐量的數據訪問,非常適合大規模數據集上的應用。放寬了一部分約束,來實現流式讀取文件系統數據的目的。是項目的一部分。

關鍵詞

JavaPHPhdfsmqrocketexcelpoi報表

需求背景

在業務需求方面,每個企業或多或少都會有報表導出的作業,量少則可是使用輸出流或者字符串的輸出即可完成,只要指定respose的相應Content-Type即可。如果大量的數據需要導出,尤其是訂單這類業務邏輯復雜的報表,導出的時候需要加入各種條件和權限,從數據處理方面就已經很費力了,更何況導出的需求不是一天兩天,而是半月一月的數據量,小公司的業務,數量級也可能達到了十多萬。

function generateExcel($filename, $header, array &$data)
{
    generateDownHeader($filename);

    $rs = "";
    if (is_string($header)) {
        $header = explode(",", $header);
    }
    foreach ($header as $v) {
        $rs .= "";
    }
    $rs .= "";
    foreach ($data as $coll) {
        $rs .= "";
        foreach ($coll as $v) {
            if (AppHelper::isDouble($v)) {
                $rs .= "";
            } else {
                $rs .= "";
            }
        }
        $rs .= "";
    }

    $rs .= "
".$v."
".$v."".$v."
"; echo $rs; exit; } function generateDownHeader($filename) { header("Content-Type: application/force-download"); header("Content-Type: application/octet-stream"); header("Content-Type: application/download"); header("Content-Disposition:inline;filename="".$filename."""); header("Content-Transfer-Encoding: binary"); header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT"); header("Cache-Control: must-revalidate, post-check=0, pre-check=0"); header("Pragma: no-cache"); }

這十多萬的數據,如果使用一般的方法(上面代碼所示)或許是不可行的(其他一般方法沒有嘗試過),php處理中一般使用curl調用接口,nginx服務器和php中的curl請求超時一般都是30s,30s處理1w條數據的導出工作,如果服務器的性能好,并且是多核的,可以使用multi_curl多線程處理,如果服務器的性能不是很好,這種處理方法或許更耗時。

下面是我使用的curl處理接口數據:

function curl($url, $option = null, $method = "POST", $getCode = false, $header = [])
{
    $curl = curl_init ();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_TIMEOUT, 30);
    if (!array_key_exists("Content-Type", $header)) {
        $header["Content-Type"] = "application/json;charset=UTF-8";
    }
    $headers = [];
    if ($header) {
        foreach ($header as $k=>$v) {
            $headers[] = $k.": ".$v;
        }
    }
    curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
    if ($option) {
        if (is_array($option)) {
            $option = json_encode($option);
        }
        curl_setopt($curl, CURLOPT_POSTFIELDS, $option);
    }
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method);
    $result = curl_exec($curl);
    if ($getCode) {
        $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        $message = self::isJson($result) ? json_decode($result, true) : $result;
        $result = ["code" => $curl_code];
        if (isset($message["exception"]) && count($message) == 1) {
            $result["exception"] = $message["exception"];
            $result["result"] = null;
        } else {
            $result["result"] = $message;
        }
    }
    curl_close($curl);
    return $result;
}

因為數據量大,后來改為多線程:

function curlMulti(array $urls, $options = null, $method = "POST",  $getCode = false, $header = []) 
{
    $mh = curl_multi_init();
    // 添加curl批處理會話
    $handles = $contents = [];
    foreach ($urls as $key => $url) {
        $handles[$key] = curl_init($url);
        curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handles[$key], CURLOPT_TIMEOUT, 30);
        curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method);

        if (!array_key_exists("Content-Type", $header)) {
            $header["Content-Type"] = "application/json;charset=utf-8";
        }
        $headers = [];
        if ($header) {
            foreach ($header as $k => $val) {
                $headers[] = $k.": ".$val;
            }
        }
        curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers);
        if ($options) {
            if (is_array($options)) {
                $options = json_encode($options);
            }
            curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options);
        }
        curl_multi_add_handle($mh, $handles[$key]);
    }
    // 執行批處理句柄
    /*$active = null;
    do{
        $mrc = curl_multi_exec($mh, $active);
    } while ($mrc == CURLM_CALL_MULTI_PERFORM);

    while ($active and $mrc == CURLM_OK) {
        if (curl_multi_select($mh) === -1) {
            usleep(100);
            do {
                $mrc = curl_multi_exec($mh, $active);
            }while($mrc == CURLM_CALL_MULTI_PERFORM);
        }
    }// 獲取批處理內容
    $errors = [];
    foreach ($handles as $k => $ch) {
        $errors[$k] = curl_error($ch);
        $content = curl_multi_getcontent($ch);
        if ($getCode) {
            $content = curl_errno($ch) == 0 && self::isJson($content)? json_decode($content,true) : [];
        }
        $contents = array_merge($contents,$content);

    }
    $info = curl_multi_info_read($mh);*/
    $output = $errors = $infos = [];
    do {
        while (($execrun =  curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
        if ($execrun != CURLM_OK)
            break;
        while ($done = curl_multi_info_read($mh)) {
            $info= curl_getinfo($done["handle"]);
            $infos["http_code"][] = $info["http_code"];
            $result["code"] = $info["http_code"];
            $infos["url"][] = $info["url"];
            $errors[] = curl_error($done["handle"]);
            $output = self::isJson(curl_multi_getcontent($done["handle"])) ?
                array_merge($output, json_decode(curl_multi_getcontent($done["handle"]),true)) : $output;
            if ($running)
                curl_multi_select($mh, 30);
        }
    } while ($running);

    $result["result"] = $output;
    $result["exception"] = $errors;
    $result["info"] = $infos;
    foreach ($handles as $ch) {
        curl_multi_remove_handle($mh, $ch);
    }
    curl_multi_close($mh);
    return $result;
}

上面的代碼中有一段代碼是注釋掉的,按照道理來說,上面的代碼執行的結果應該和下面的一樣,事實證明,卻是執行的結果是一樣,我這里說的結果不是多線程返回的結果,既然是多線程,那么不同的線程競爭到資源也是不一樣的,返回結果出現了混亂,導出的excel數據并不是根據某種排序而排序的,也就是你不知道那個線程先返回了結果,這是問題一,其二,在導出的過程中,發現不同程度的丟失數據,加熱管每個線程500條數據,結果在驗證數據時,發現僅僅返回了300多條數據,數據變動不一致,第三,過多的數據,依然造成nginx服務器超時,錯誤code 504。

PS: 為什么在php的中沒有使用phpexcel第三方包,原因很簡單,測試發現,phpexcel太耗內存,機器吃不消,所以就沒用。

初步解決方案

既然php的多線程方案不能解決問題,只能找其他的辦法,最可靠的也是大家都能想到的,就是隊列處理,把導出請求放入到隊列中,直接返回給客戶端,告訴客戶業務正在處理,然后具體的導出交由消費端處理,最后把結果反饋到客戶端。

我們都知道php的隊列有很多,常用的比如Swoole,Workman以及Gearman等。我選擇了Gearman,因為方便,而Swoole原來在我們的項目中,后來被踢掉了,不知原由。

Gearman服務端work的代碼demo:

addServer();
        $worker->addFunction("export", function (GearmanJob $job) {
            $workload = $job->workload();

            if (($data = $this->parseJson($workload)) == false) {
                return AppHelper::encodeJson(["code" => "-1", "result" => null, "exception" => "參數錯誤"]);
            }
            $user = isset($data["user"]) && !empty($data["user"]) ? $data["user"] : "guest";
            $path = dirname(Yii::$app->basePath) . "/backend/downloads/" . sha1($user) . "/" . date("Y-m-d") . "/";
            $filename = isset($data["filename"]) && !empty($data["filename"]) ? $data["filename"] : date("Y-m-d") . "-order.xls";
            $rs = $this->getData($data["type"]["data"], $data["type"]["count"], $data["api"], $data["params"]);
            $this->writeExcel($path, $filename, $rs, $data["header"]);
            return 200;
        });
        //無際循環運行,gearman內部已有處理,不會出現占用過高死掉的情況
        while ($worker->work()) {
            if ($worker->returnCode() !== GEARMAN_SUCCESS) {
                echo "error" . PHP_EOL;
            }
        }
    }

    public function parseJson($str)
    {
        $data = json_decode($str, true);
        return (json_last_error() == JSON_ERROR_NONE) ? $data : false;
    }

    public function writeExcel($path, $filename, $data, $header)
    {
        if ($this->mkDir($path)) {
            $data = $this->assembleData($data);
            $rs = $this->generateExcel($header, $data);
            file_put_contents(rtrim($path, "/") . "/" . $filename, $rs);
        } else {
            echo "目錄不存在,寫文件錯誤!";
        }
        return;

    }

    public function getData($dataApi, $countApi, $api, $params)
    {
        $start = microtime(true);
        $count = AppHelper::getData($api . $countApi . "?" . http_build_query($params));
        echo $api . $countApi . "?" . http_build_query($params).PHP_EOL;
        echo "總條數:" . $count . PHP_EOL;
        $params["perpage"] = 500;
        $times = ceil($count / $params["perpage"]);
        $data = [];
        if ($count > 0) {
            for ($i = 0; $i < $times; $i++) {
                $params["page"] = $i + 1;
                $rs = AppHelper::getData($api . $dataApi . "?" . http_build_query($params));
                $data = array_merge($data, $rs);
            }
        }
        $end = microtime(true);
        echo "花費時間:" . ($end - $start) . PHP_EOL;
        return $data;
    }

    public function generateExcel($header, array &$data)
    {

        $rs = "";
        if (is_string($header)) {
            $header = explode(",", $header);
        }
        foreach ($header as $v) {
            $rs .= "";
        }
        $rs .= "";
        foreach ($data as $coll) {
            $rs .= "";
            foreach ($coll as $v) {
                if (AppHelper::isDouble($v)) {
                    $rs .= "";
                } else {
                    $rs .= "";
                }
            }
            $rs .= "";
        }

        $rs .= "
" . $v . "
" . $v . "" . $v . "
"; unset($data); return $rs; } public function assembleData($rs) { $users = []; if ($rs) { $uids = array_column($rs, "uid"); $us = Yii::$app->get("db")->createCommand("select uid,gender,adminflag,mobile,type from {{%user}} where uid in (" . implode(",", $uids) . ")")->queryAll(); if ($us && is_array($us)) { foreach ($us as $u) { $users[$u["uid"]] = $u; } } } $content = []; foreach ($rs as $k => $v) { $data = AppHelper::decodeJson($v["data"], true); $status = "已刪除"; if ($v["status"] == 0) { $status = "已關閉"; } elseif ($v["status"] == 1) { $status = "下單"; } elseif ($v["status"] == 2) { $status = "付款確認中"; } elseif ($v["status"] == 3) { $status = "已付款"; } elseif ($v["status"] == 4) { $status = "已發貨"; } elseif ($v["status"] == 5) { $status = "已確認收貨"; } elseif ($v["status"] == 6) { $status = "已評價"; } elseif ($v["status"] == 7) { $status = "支付價格與訂單價格不一致"; } $refund = "未申請退款"; if (isset($v["refund"])) { if ($v["refund"] == 5) { $refund = "退款已到賬"; } elseif ($v["refund"] == 4) { $refund = "賣家已確認但需人工處理"; } elseif ($v["refund"] == 3) { $refund = "同意退款"; } elseif ($v["refund"] == 2) { $refund = "拒絕退款"; } elseif ($v["refund"] == 1) { $refund = "退款申請中"; } elseif ($v["refund"] == 0) { $refund = "未申請"; } elseif ($v["refund"] == 6) { $refund = "退貨退款申請中"; } elseif ($v["refund"] == 7) { $refund = "同意退貨申請"; } elseif ($v["refund"] == 8) { $refund = "拒絕退貨申請"; } elseif ($v["refund"] == 9) { $refund = "買家退貨已發出"; } elseif ($v["refund"] == 10) { $refund = "賣家確認收貨"; } elseif ($v["refund"] == 11) { $refund = "收到貨拒絕退款"; } elseif ($v["refund"] == 12) { $refund = "退貨退款已到賬"; } } $gender = "未知"; if (isset($users[$v["uid"]]) && $users[$v["uid"]]["gender"] == 1) { $gender = "男"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["gender"] == 2) { $gender = "女"; } $type = "普通用戶"; if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 3) { $type = "審核中的匠人"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 2) { $type = "種子用戶"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 1) { $type = "管理員"; } $itype = "未設置/現貨"; if (isset($data["type"])) { if ($data["type"] == 1) { $itype = "現貨"; } else if ($data["type"] == 2) { $itype = "定制"; } else { $itype = "拍賣"; } } $utype = isset($users[$v["uid"]]["type"]) && $users[$v["uid"]]["type"] == 4 ? "微信購買注冊" : "APP內注冊"; $otype = !$v["otype"] ? "APP內購買" : "微信購買"; $paytype = !$v["prepaytype"] ? "APP內付款" : "微信付款"; $snapshot = AppHelper::getData(Yii::$app->params["imageServer"] . $v["snapshot"]); $content[] = [date("Y/m/d H:i:s", floor($v["createtm"] / 1000)), $v["ooid"], isset($snapshot["item"]["pid"]) ? $snapshot["item"]["pid"] : "", $v["iid"], $data["title"], $itype, (isset($v["parentCategory"]) ? $v["parentCategory"] . "/" : "") . $v["category"], $v["craftsman"], $v["suid"], $v["quantity"], $v["username"], $utype, $v["uid"], $data["address"], $status, $refund, $data["price"], $v["realpay"], $otype, $paytype, isset($users[$v["uid"]]["mobile"]) ? $users[$v["uid"]]["mobile"] : "未知", $gender, $type]; } return $content; } public function mkDir($path) { if (is_dir($path)) { echo "目錄" . $path . "已存在!"; return true; } else { $res = mkdir($path, 0777, true); echo $res ? "目錄創建成功" : "目錄創建失敗"; return $res; } } } }

Gearman的Client端的代碼:

addServer("127.0.0.1", 4730);

    $client->setCompleteCallback(completeCallBack);
    $result2 = $client->doBackground("export", $str);//異步進行,只返回處理句柄。

//        $result1 = $client->do("export", "do");//do是同步進行,進行處理并返回處理結果。
//        $result3 = $client->addTask("export", "addTask");//添加任務到隊列,同步進行?通過添加task可以設置回調函數。
//        $result4 = $client->addTaskBackground("export", "addTaskBackground");//添加后臺任務到隊列,異步進行?

    $client->runTasks();//運行隊列中的任務,只是do系列不需要runTask()
    return $result2;

}
//綁定回調函數,只對addTask有效
function completeCallBack($task)
{
    echo "CompleteCallback!handle result:".$task->data()."
"; }

ps:要運行上面的代碼,需要在服務器或者本地安裝Gearman服務,并且需要安裝php_gearman擴展,安裝教程自行搜索。

如果你的業務邏輯不復雜,到此可以導出幾萬條數據綽綽有余了,然而,我的問題并沒有因此而解決,上司說,不想用Gearman隊列處理,最好還是java處理。嗯,沒關系,我喜歡這種在技術中跳來跳去的解決問題,既然不滿足上司的需求,那就另行方案。

MqRocket+HDFS+POI

說明:這里用到的java項目都是基于spring+dubbo/dubbox的項目。所用到的配置或者注解均在spring的相關配置和注解范疇,除了mapper的配置和注解。

三個項目:

mq項目:提供rest服務,發送消息(@rxl)

biz項目:提供dubbo、restfull接口,處理業務(@lee)

data項目:處理數據導出

如上,三個項目分別是不同的工程師所寫,我們不關心怎么實現的,只需知道我們能使用每個功能即可。

mq提供的restfull接口:
@Path("/message")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
@Component("sendMessageService")
public class SendMessageImpl implements SendMessageService{

    @Resource
    public IProducer producer;

    @PUT
    @Path("send")
    @Consumes({MediaType.APPLICATION_JSON})
    @Override
    public void sendMessage(Message message) {
        System.out.println("message" + message.getMessage());
        producer.send(message.getTopic(),message.getKey(),message.getMessage());
    }
}

這樣我們在php后臺通過put方式,調用該接口,將需要處理的數據發送給導出處理服務端。發送put請求可以使用curl強大的request功能。

 curl_setopt($curl, CURLOPT_CUSTOMREQUEST, "PUT");

假如mq提供的rest接口是:http://localhost:8018/mq/message/send,我們需要傳遞一個json字符串,該字符串原型是一個關聯數組,數組的key分別為“topic”、“key”和“message”,topic是消息的主題,需要指定的mq主題去消費,key是消息的key,該topic下面會有很多key,因此,我們的消費方即數據導出方需要根據key做判斷處理。message里面就是具體的一下參數,比如需要導出哪些字段,比如文件上傳服務器地址等等信息。

$message = [
    "topic" => "order_export",
    "key" => "order_tag_" . $orderNo,
    "message" => [
        "params" => [
            ...
        ],
        "headers" => [
            ...
        ],
        "options" => [
            ...
        ],
    ],

];

完整的接口請求:

http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}

poi工具類封裝

Java的Excel API很多,唯獨Apache POI這款使用最方便最靈活(或許其他的沒有使用過)。

HSSF is the POI Project"s pure Java implementation of the Excel "97(-2007) file format. XSSF is the POI Project"s pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.

HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:

low level structures for those with special needs

an eventmodel api for efficient read-only access

a full usermodel api for creating, reading and modifying XLS files

在gradle引入poi包:

// java excel api
compile "org.apache.poi:poi:3.10.1"
compile "org.apache.poi:poi-ooxml:3.9"
package cn.test.web.utils;

import cn.test.util.Utils;
import org.apache.commons.io.FilenameUtils;
import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey;
import org.apache.poi.hssf.usermodel.HSSFFont;
import org.apache.poi.hssf.usermodel.HSSFFooter;
import org.apache.poi.hssf.usermodel.HSSFHeader;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.Font;
import org.apache.poi.ss.usermodel.Footer;
import org.apache.poi.ss.usermodel.Header;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:02
 */
public class POIUtils {
    private static final short HEADER_FONT_SIZE = 16; // 大綱字體
    private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字體

    public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }
        return wb;
    }

    public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(inputStream);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(inputStream);
                    break;
                default:
                    wb = new HSSFWorkbook(inputStream);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    public static Workbook writeFile(Workbook wb, String file) {
        if (wb == null || Utils.isEmpty(file)) {
            return null;
        }
        FileOutputStream out = null;
        try {
            out = new FileOutputStream(file);
            wb.write(out);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return wb;
    }

    public static Workbook createHSSFWorkbook() {
        //生成Workbook
        HSSFWorkbook wb = new HSSFWorkbook();
        //添加Worksheet(不添加sheet時生成的xls文件打開時會報錯)
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook createXSSFWorkbook() {
        XSSFWorkbook wb = new XSSFWorkbook();
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook openWorkbook(String file) {
        FileInputStream in = null;
        Workbook wb = null;

        try {
            in = new FileInputStream(file);
            wb = WorkbookFactory.create(in);
        } catch (InvalidFormatException | IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return wb;
    }

    public static Workbook openEncryptedWorkbook(String file, String password) {
        FileInputStream input = null;
        BufferedInputStream binput = null;
        POIFSFileSystem poifs = null;
        Workbook wb = null;
        try {
            input = new FileInputStream(file);
            binput = new BufferedInputStream(input);
            poifs = new POIFSFileSystem(binput);
            Biff8EncryptionKey.setCurrentUserPassword(password);
            String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(poifs);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(input);
                    break;
                default:
                    wb = new HSSFWorkbook(poifs);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    /**
     * 追加一個sheet,如果wb為空且isNew為true,創建一個wb
     *
     * @param wb
     * @param isNew
     * @param type  創建wb類型,isNew為true時有效 1:xls,2:xlsx
     * @return
     */
    public static Workbook appendSheet(Workbook wb, boolean isNew, int type) {
        if (wb != null) {
            Sheet sheet = wb.createSheet();
        } else if (isNew) {
            if (type == 1) {
                wb = new HSSFWorkbook();
                wb.createSheet();
            } else {
                wb = new XSSFWorkbook();
                wb.createSheet();
            }
        }
        return wb;
    }


    public static Workbook setSheetName(Workbook wb, int index, String sheetName) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.setSheetName(index, sheetName);
        }
        return wb;
    }

    public static Workbook removeSheet(Workbook wb, int index) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.removeSheetAt(index);
        }
        return wb;
    }

    public static Workbook insert(Workbook wb, String sheetName, int row, int start,
                                  List columns) {
        if (row == 0 || wb == null) return wb;
        for (int i = start; i < (row + start); i++) {
            Row rows = wb.getSheet(sheetName).createRow(i);
            if (columns != null && columns.size() > 0) {
                for (int j = 0; j < columns.size(); j++) {
                    Cell ceil = rows.createCell(j);
                    ceil.setCellValue(String.valueOf(columns.get(j)));
                }
            }
        }
        return wb;
    }

    /**
     * 設置excel頭部
     *
     * @param wb
     * @param sheetName
     * @param columns   比如:["國家","活動類型","年份"]
     * @return
     */
    public static Workbook setHeader(Workbook wb, String sheetName, List columns) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName);

    }

    /**
     * 插入數據
     *
     * @param wb        Workbook
     * @param sheetName sheetName,默認為第一個sheet
     * @param start     開始行數
     * @param data      數據,List嵌套List ,比如:[["中國","奧運會",2008],["倫敦","奧運會",2012]]
     * @return
     */
    public static Workbook setData(Workbook wb, String sheetName, int start,
                                   List data) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        if (data != null || data.size() > 0) {
            if (data instanceof List) {
                int s = start;
                for (Object columns : data) {
                    insert(wb, sheetName, data.size() - (s - 1), s, (List) columns);
                    s++;
                }
            }
        }
        return wb;
    }

    /**
     * 移除某一行
     *
     * @param wb
     * @param sheetName sheet name
     * @param row       行號
     * @return
     */
    public static Workbook delRow(Workbook wb, String sheetName, int row) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Row r = wb.getSheet(sheetName).getRow(row);
        wb.getSheet(sheetName).removeRow(r);
        return wb;
    }

    /**
     * 移動行
     *
     * @param wb
     * @param sheetName
     * @param start     開始行
     * @param end       結束行
     * @param step      移動到那一行后(前) ,負數表示向前移動
     *                  moveRow(wb,null,2,3,5); 把第2和3行移到第5行之后
     *                  moveRow(wb,null,2,3,-1); 把第3行和第4行往上移動1行
     * @return
     */
    public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        wb.getSheet(sheetName).shiftRows(start, end, step);
        return wb;
    }

    public static Workbook setHeaderStyle(Workbook wb, String sheetName) {
        Font font = wb.createFont();
        CellStyle style = wb.createCellStyle();
        font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD);
        font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS);
        font.setFontName("黑體");
        style.setFont(font);
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        int row = wb.getSheet(sheetName).getFirstRowNum();
        int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum();
        for (int i = 0; i < cell; i++) {
            wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style);
        }
        return wb;
    }

    public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Header header = wb.getSheet(sheetName).getHeader();
        header.setLeft(HSSFHeader.startUnderline() +
                HSSFHeader.font("宋體", "Italic") +
                "打雞血的口號!" +  // 比如:愛我中華
                HSSFHeader.endUnderline());
        header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) +
                HSSFHeader.startDoubleUnderline() +
                HSSFHeader.startBold() +
                title +
                HSSFHeader.endBold() +
                HSSFHeader.endDoubleUnderline());
        header.setRight("時間:" + HSSFHeader.date() + " " + HSSFHeader.time());
        return wb;
    }

    public static Workbook setFooter(Workbook wb, String sheetName, String copyright) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Footer footer = wb.getSheet(sheetName).getFooter();
        if (Utils.isEmpty(copyright)) {
            copyright = "中華人民共和國"; // 版權信息,自己公司的名字或者app的名字
        }
        footer.setLeft("Copyright @ " + copyright);
        footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages());
        footer.setRight("File:" + HSSFFooter.file());
        return wb;
    }

    public static String create(String sheetNm, String file, List header, List data, String title, String copyright) {
        Workbook wb = createWorkbook(file);
        if (Utils.isEmpty(sheetNm)) {
            sheetNm = wb.getSheetAt(0).getSheetName();
        }
        setHeaderOutline(wb, sheetNm, title);
        setHeader(wb, sheetNm, header);
        setData(wb, sheetNm, 1, data);
        setFooter(wb, sheetNm, copyright);
        writeFile(wb, file);
        if (wb != null) {
            return file;
        }
        return null;
    }

    public static String getSystemFileCharset() {
        Properties pro = System.getProperties();
        return pro.getProperty("file.encoding");
    }
    // TODO 后面增加其他設置

}
HDFS工具類封裝

Hadoop分布式文件系統(HDFS)被設計成適合運行在通用硬件(commodity hardware)上的分布式文件系統。它和現有的分布式文件系統有很多共同點。但同時,它和其他的分布式文件系統的區別也是很明顯的。HDFS是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的數據訪問,非常適合大規模數據集上的應用。HDFS放寬了一部分POSIX約束,來實現流式讀取文件系統數據的目的。HDFS在最開始是作為Apache Nutch搜索引擎項目的基礎架構而開發的。HDFS是Apache Hadoop Core項目的一部分。

HDFS有著高容錯性(fault-tolerant)的特點,并且設計用來部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)來訪問應用程序的數據,適合那些有著超大數據集(large data set)的應用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實現流的形式訪問(streaming access)文件系統中的數據。

在gradle中引入hdfs:

    // jersey
    compile "com.sun.jersey:jersey-core:1.19.1"
    compile "com.sun.jersey:jersey-server:1.19.1"
    compile "com.sun.jersey:jersey-client:1.19.1"
    compile "com.sun.jersey:jersey-json:1.19.1"

    // hadoop
    compile ("org.apache.hadoop:hadoop-common:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }
    compile ("org.apache.hadoop:hadoop-hdfs:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }
    compile ("org.apache.hadoop:hadoop-client:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }`
package cn.test.web.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.poi.ss.usermodel.Workbook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:41
 */
public class HDFSUtils {
    private static FileSystem fs = null;

    public static FileSystem getFileSystem(Configuration conf) throws IOException,
            URISyntaxException {
        fs = FileSystem.get(conf);
        //fs = FileSystem.newInstance(conf);
        return fs;
    }

    /**
     * 判斷路徑是否存在
     *
     * @param conf
     * @param path
     * @return
     * @throws IOException
     */
    public static boolean exits(Configuration conf, String path) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 創建文件
     *
     * @param conf
     * @param filePath
     * @param contents
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath, byte[] contents)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents, 0, contents.length);
        outputStream.hflush();
        outputStream.close();
        fs.close();
    }

    /**
     * 創建文件
     *
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(Configuration conf, String fileContent, String filePath)
            throws IOException, URISyntaxException {
        createFile(conf, filePath, fileContent.getBytes());
    }

    /**
     * 上傳文件
     *
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(true, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 刪除目錄或文件
     *
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }

    /**
     * 刪除目錄或文件(如果有子目錄,則級聯刪除)
     *
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath)
            throws IOException, URISyntaxException {
        return deleteFile(conf, remoteFilePath, true);
    }

    /**
     * 文件重命名
     *
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(Configuration conf, String oldFileName, String newFileName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }

    /**
     * 創建目錄
     *
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(Configuration conf, String dirName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path dir = new Path(dirName);
        boolean result = fs.mkdirs(dir);
        fs.close();
        return result;
    }

    /**
     * 列出指定路徑下的所有文件(不包含目錄)
     *
     * @param fs
     * @param basePath
     * @param recursive
     */
    public static RemoteIterator listFiles(FileSystem fs, String basePath, boolean recursive)
            throws IOException {

        RemoteIterator fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive);

        return fileStatusRemoteIterator;
    }

    /**
     * 列出指定路徑下的文件(非遞歸)
     *
     * @param conf
     * @param basePath
     * @return
     * @throws IOException
     */
    public static RemoteIterator listFiles(Configuration conf, String basePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        RemoteIterator remoteIterator = fs.listFiles(new Path(basePath), false);
        fs.close();
        return remoteIterator;
    }

    /**
     * 列出指定目錄下的文件子目錄信息(非遞歸)
     *
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
        fs.close();
        return fileStatuses;
    }


    /**
     * 讀取文件內容并寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 文件路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }

    /**
     * 讀取文件內容并返回
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static String readFile(Configuration conf, String filePath) throws IOException,
            URISyntaxException {
        String fileContent = null;
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        InputStream inputStream = null;
        ByteArrayOutputStream outputStream = null;
        try {
            inputStream = fs.open(path);
            outputStream = new ByteArrayOutputStream(inputStream.available());
            IOUtils.copyBytes(inputStream, outputStream, conf);
            byte[] lens = outputStream.toByteArray();
            fileContent = new String(lens, "UTF-8");
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
            fs.close();
        }
        return fileContent;
    }
}

對于hdfs我多帶帶有謝了兩個類,一個是HDFSFileUploader,一個是Configuration。如類名,前者用于文件上傳,后者用于hdfs的配置。

HDFSFileUploader
package cn.test.web.utils.hadoop;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.common.util.Utils;
import cn.test.web.utils.HDFSUtils;
import org.apache.commons.lang.NullArgumentException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:42
 */
public class HDFSFileUploader {
    public static final byte FROM_LOCAL_COPY = 1; // 從本地上傳文件
    public static final byte FROM_CONTENT_WRITE = 2; // 讀取字符串或字節,生成文件

    private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class);
    private static final String HDFS_SCHEMA = "hdfs://";
    private static final String SEPARATOR = "/";
    private static final String SUFFIX_PREFIX = ".";

    private static final int BUFFER_SIZE = 1024;
    private static final Configuration CONF = new Configuration();


    /**
     * 上傳二進制文件,使用默認配置的域名,隨機生成文件名
     *
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String path, String suffix, byte[] contents) {
        return upload(null, path, suffix, contents);
    }

    /**
     * 上傳二進制文件,隨機生成文件名
     *
     * @param domain
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String domain, String path, String suffix, byte[] contents) {
        return upload(domain, path, null, suffix, contents);
    }

    /**
     * 上傳二進制文件,指定文件名,只能通過流上傳
     *
     * @param domain
     * @param path
     * @param filename
     * @param suffix
     * @param content
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix,
                                final byte[] content) {
        return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE);
    }

    /**
     * 上傳文件,默認域名和隨機文件名
     *
     * @param path
     * @param suffix
     * @param src
     * @return
     */
    public static String upload(String path, String suffix, String src, byte fromLocal) {
        return upload(null, path, suffix, src, fromLocal);
    }

    /**
     * 上傳文件到指定域名的指定目錄,文件名隨機生成
     *
     * @param domain 域名,比如 10.25.126.28:9000
     * @param path   文件路徑,比如 /usr/local/com.hd.test/2016-08-08/
     * @param suffix 文件后綴,比如 .xsl,xsl
     * @param src    文件內容,字符串 || 本地文件路徑
     * @return String 完整的文件名
     */
    public static String upload(String domain, String path, String suffix, String src, byte
            fromLocal) {
        return upload(domain, path, null, suffix, src, fromLocal);
    }

    /**
     * 上傳文件,指定了域名,路徑,文件名,后綴
     *
     * @param domain   域名
     * @param path     路徑
     * @param filename 文件名
     * @param suffix   后綴
     * @param src      內容 || 本地路徑
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix, String
            src, byte fromLocal) {
        String filePath = getRealAddr(domain, path, suffix, filename);
        System.out.println(filePath);
        try {
            switch (fromLocal) {
                case FROM_LOCAL_COPY:
                    HDFSUtils.copyFromLocalFile(CONF, src, filePath);
                    break;
                case FROM_CONTENT_WRITE:
                    HDFSUtils.createFile(CONF, src, filePath);
                    break;
            }
            return filePath;
        } catch (IOException | URISyntaxException e) {
            LOGGER.warn("上傳文件失敗:{}",e.getMessage());
        }
        return null;
    }

    /**
     * 文件完整的路徑
     *
     * @param domain   域名
     * @param path     目錄路徑
     * @param suffix   后綴
     * @param filename 文件名
     * @return
     */
    private static String getRealAddr(String domain, String path, String suffix, String filename) {
        if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) {
            domain = HDFS_SCHEMA + domain;
        } else {
            domain = "";
        }
        path = getPath(path);
        filename = getFilename(filename, suffix);
        return String.format("%s%s%s", domain, path, filename);

    }

    /**
     * 文件路徑
     *
     * @param path
     * @return
     */
    private static String getPath(String path) {
        if (Utils.isEmpty(path)) {
            throw new NullArgumentException("path id null");
        }
        if (!path.startsWith(SEPARATOR)) {
            path = SEPARATOR + path;
        }
        if (!path.endsWith(SEPARATOR)) {
            path = path + SEPARATOR;
        }
        return path;
    }

    /**
     * 生成文件名
     *
     * @param filename
     * @param suffix
     * @return
     */
    private static String getFilename(String filename, String suffix) {
        if (Utils.isEmpty(filename)) {
            filename = generateFilename();
        }
        if (!Utils.isEmpty(suffix)) {
            filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ?
                    filename : ((filename.endsWith(SUFFIX_PREFIX)
                    || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix
                    : filename + SUFFIX_PREFIX + suffix));
        }
        return filename;
    }

    /**
     * 生成文件名
     *
     * @return
     */
    private static String generateFilename() {
        return getUuid(false);
    }

    /**
     * 生成UUID
     *
     * @param isNeedHyphen
     * @return
     */
    public static String getUuid(boolean isNeedHyphen) {
        UUID uuid = UUID.randomUUID();
        String str = uuid.toString();
        if (isNeedHyphen) {
            str = str.replaceAll("-", "");
        }
        return str;
    }

    public static void setConfResource(final Configuration config) {
        CONF.addResource(config);
    }
}

HDFSFileUploader中的一系列方法,用于上傳不同類型的文件,比如二進制文件,字符串等,還有hdfs的copy本地文件以及文件名uuid生成等方法。

Configuration
package cn.test.web.utils.hadoop;

import cn.test.web.utils.CommonUtils;
import org.apache.commons.io.FilenameUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 上午9:30
 * 建議使用方法:
 * 
 * 
 * 
 * classpath:/spring/core-site.xml
 * 
 * 
 * 
 * 在使用的地方直接注入hadoopConfig:
 *
 * @Resource private Configuration hadoopConfig;
 */
public class Configuration extends org.apache.hadoop.conf.Configuration {
    private Resource[] resources;

    public void setResources(List filenames) throws IOException {
        List resources = new ArrayList<>();
        if (filenames != null && filenames.size() > 0) {
            for (String filename : filenames) {
                filename = filename.trim();
                String realName = getFileName(filename);
                String ext = FilenameUtils.getExtension(realName);
                if (ext.equals("xml")) {
                    PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver =
                            new PathMatchingResourcePatternResolver();
                    try {
                        Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename);
                        Collections.addAll(resources, resourceList);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        for (Resource resource : resources) {
            this.addResource(resource.getURL());
        }
    }

    private String getFileName(String fileName) {
        return CommonUtils.getFileName(fileName);
    }
}

這個類很簡單,其實是集成了hadoop的org.apache.hadoop.conf.Configuration類,目的是為了在spring配置文件中,靈活的指定hadoop的配置文件,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。

    
    
        
            
                classpath:META-INF/hadoop/*.xml
            
        
    
導出訂單處理(mq消費端)
package cn.test.web.mq.consumer;
... // 很多依賴包

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 下午2:14
 */
public class OrderExportHandler implements IMessageHandler {

    private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class);
    private static final int MUL_SEC = 1000;
    private static final Gson GSON = new Gson();
    
    @Value("${image_server}") 
    private String imageServer;
    @Autowired
    private DataManager manager;
 
    @Override
    public void handle(final String key, final String message) {
        System.out.println("message" + message);
        Pattern p = Pattern.compile("-");
        String[] skey = p.split(key);
        if (skey.length < 3) {
            return;
        }
        int res = insert(skey[1], skey[0], skey[2]);
        LOGGER.debug("主鍵:{}", res);
        if (res > 0) {
            //插入數據成功,執行導出數據邏輯
            Map data = manager.parseData(message);
            List header = null;
            List content = null;
            List orders = null;

            DataExportLog log = new DataExportLog();
            log.setDelid(res);
            log.setUid(Integer.valueOf(skey[2]));

            if (data.containsKey("params")) {
                LOGGER.debug("params:{}", data.get("params"));
                orders = manager.getOrders(data.get("params"));
                LOGGER.debug("導出數據的條數:{}", orders.size());
            }
            if (orders == null || orders.size() == 0) {
                log.setStatus((byte) 4);
            } else if (data.containsKey("header") && (data.get("header") instanceof Map)) {
                Object obj = data.get("header");
                Map map = (obj instanceof Map) ?
                        manager.parseHeader((Map) obj) : null;

                if (map != null && map.size() > 0) {
                    if (map.containsKey("header")) {
                        header = getHeader(map.get("header"));
                    }
                    if (map.containsKey("key")) {
                        content = getContent(orders, map.get("key"));
                    }
                }
                // 調用hdfs 接口,上傳文件
                if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) {
                    // 生成excel文件
                    String fName = getFilename(data);
                    String localFile = manager.writeExecelFile(fName, header, content, null, null);
                    String file = manager.copyFileFromLocal(skey[0], localFile);

                    if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) {
                        log.setStatus((byte) 3);
                    } else {
                        log.setStatus((byte) 1);
                        log.setLink(file);
                    }
                    LOGGER.info("本地臨時文件:{}", localFile);
                    LOGGER.info("上傳到hadoop服務器中的文件:{}", file);
                }

            }
            update(log);
        }
    }
    
    // TODO 
    // 處理數據,這里面會調用biz項目的dubbo接口
    // 具體的操作不在這里面寫
    
}

訂單導出邏輯都在上面的類,以及DataManager中進行處理,期間獲取數據等接口則由biz項目的dubbo接口提供,具體業務邏輯在此不涉及。

下面會給出manager.writeExecelFile(fName, header, content, null, null);方法和manager.copyFileFromLocal(skey[0], localFile);方法的code:

public String writeExecelFile(String filename, List header, List datas, String title, String copyright) {
    SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd");
       String date = sd.format(new Date());
    if (Utils.isEmpty(filename)) {
        filename = HDFSFileUploader.getUuid(true) + this.ext;
    }
    String filePath = this.tmpDir + "/" + date + "/" + filename;
    filePath = filePath.replaceAll("http://", "/");
    File f = new File(CommonUtils.getFilePath(filePath));
    if (!f.exists() && !f.isDirectory()) {
        f.mkdir();
    }
    if (Utils.isEmpty(title)) {
        title = DEFAULT_TITLE;
    }
    if (Utils.isEmpty(copyright)) {
        copyright = this.copyright;
    }
    return POIUtils.create(null, filePath, header, datas, title, copyright);
}

writeExecelFile方法調用了poi的create方法,此時臨時文件已生成。
還有一點需要說一下,比如臨時路徑,上傳到hdfs的路徑,版權信息等最好是在配置文件中可配置的,這就依賴予spring的org.springframework.beans.factory.config.PropertyPlaceholderConfigurer類,他可以做到,我們只需要在代碼中這么寫并且在properties文件中寫入相應的配置即可:

    @Value("${hdfs_upload_dir}")
    private String uploadDir;

    @Value("${file_tmp_dir}")
    private String tmpDir;

    @Value("${copyright}")
    private String copyright;

    @Value("${default_file_ext}")
    private String ext;

再看看copyFileFromLocal這個方法:

    /**
     * 寫hdfs文件
     *
     * @param type
     * @param file
     * @return
     */
    public String copyFileFromLocal(String type, String file) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String date = format.format(new Date());
        String path = this.uploadDir + type + "/" + date + "/";
        HDFSFileUploader.setConfResource(hadoopConfig);
        return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY);
    }

這個方法中調用了HDFSFileUploader.upload的方法,即上面展示的一個封裝類中的方法。需要注意的是,這地方注入了hadoop的配置文件HDFSFileUploader.setConfResource(hadoopConfig);。而hadoop得Configuration這樣引入在DataMananager類中:

@Resource
private Configuration hadoopConfig;

到此,我們把生成的excel文件上傳到了hdfs的指定文件路徑。可以使用hadoop客戶端的命令查看:

hadoop fs -ls /cn/test/order/ (這里是上傳路徑)
訂單導出(下載)

訂單導出,這里由java后端直接提供rest接口,如果使用php的hdfs第三方包phdfs(github),用起來并不那么書順暢,編譯時報錯。

好吧,看看這個接口是怎么寫的:

package cn.test.web.impl;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.util.Utils;
import cn.test.web.manager.DataManager;
import cn.test.web.service.DownloadService;
import cn.test.web.utils.CommonUtils;
import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/16
 * Time 下午5:21
 */
@Path("download")
@Component("downloads")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
public class DownloadServiceImpl implements DownloadService {
    private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class);
    @Autowired
    private DataManager manager;
    @Override
    @GET
    @Path("order")
    public void down(@Context HttpServletResponse response, @QueryParam("url") String url,
                             @QueryParam("uid") Integer uid) {
        LOGGER.debug("下載地址:{}", url);
        if (Utils.isEmpty(url)) {
            return;
        }
        String filename = CommonUtils.getFileName(url);
        // 設置頭部
        response.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        response.setContentType("application/vnd.ms-excel;charset=gb2312");
        response.setHeader("Content-Disposition", "attachment;filename=" + filename);
        try {
            // 讀取并寫入下載數據
            manager.readFile(url, response.getOutputStream());
            response.flushBuffer();
        } catch (IOException | URISyntaxException e) {
            LOGGER.error(e.getMessage());
        }
    }
}

PHP頁面只需要一個超級鏈接即可。優化了一下,線上接口全部走內網的,因此,在a標簽中不可能直接把該接口的ip暴露出去,因此在nginx服務器做了代理配置,只需要訪問一個downloads/order?url=xxx&uid=xxx即可。

location /downloads/ {
    proxy_pass http://127.0.0.1:8086/presentation/download/;
}
踩過的坑 多線程獲取調用biz接口
public List getOrders(Object params) {
        OrderSearch search = null;
        if (params != null && (params instanceof Map)) {
            System.out.println("params:" + params);
            search = GSON.fromJson(GSON.toJson(params), OrderSearch.class);
            System.out.println("title:" + search.getTitle());
        } else {
            search = new OrderSearch();
        }
        int count = orderService.searchCount(search);
        int cycleTimes = (int) Math.ceil(count * 1.0 / TIMES_IN_SIGNEL_PROCESSOR);
        LOGGER.debug("數據總條數count:{},外部循壞執行次數:times:{}", count, cycleTimes);
        // 獲取所有并發任務的運行結果
        List orders = new ArrayList<>();
        int page = 0;
        for (int j = 0; j < cycleTimes; j++) {
            int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count;
            count = count - signel;
            int poolNum = (int) Math.ceil(signel * 1.0 / LIMIT);
            LOGGER.debug("線程池數量:{}", poolNum);
            // 創建一個線程池
            ExecutorService pool = Executors.newFixedThreadPool(poolNum);
            // 創建多個有返回值的任務
            List list = new ArrayList();
            for (int i = 0; i < poolNum; i++) {
                Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);
                // 執行任務并獲取Future對象
                Future f = pool.submit(c);
                list.add(f);
            }
            // 關閉線程池
            pool.shutdown();
            try {
                Thread.sleep(THREAD_SLEEP);
            } catch (InterruptedException e) {
                LOGGER.debug("線程休眠時,引起中斷異常:{}", e.getMessage());
            }
            for (Future f : list) {
                // 從Future對象上獲取任務的返回值
                try {
                    orders.addAll((Collection) f.get());
                    LOGGER.debug(">>>線程:{}返回的數據條數:{}", f.toString(),
                            ((Collection) f.get()).size());
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.warn("調用OrderService接口的search方法失敗:{}", e.getMessage());
                    return null;
                }
            }

        }

        re           
               
                                           
                       
                 

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65066.html

相關文章

  • 基于haddopHDFSExcel開源POI導出數據報表()

    摘要:說明這里用到的項目都是基于的項目。但同時,它和其他的分布式文件系統的區別也是很明顯的。能提供高吞吐量的數據訪問,非常適合大規模數據集上的應用。放寬了一部分約束,來實現流式讀取文件系統數據的目的。是項目的一部分。 關鍵詞 Java、PHP、hdfs、mqrocket、excel、poi、報表 需求背景 在業務需求方面,每個企業或多或少都會有報表導出的作業,量少則可是使用輸出流或者字符串的...

    Nekron 評論0 收藏0
  • 基于haddopHDFSExcel開源POI導出數據報表(二)

    摘要:接著上一篇基于的和開源庫導出大數據報表一的遺留的問題開始,這篇做優化處理。這個錯誤造成的直接問題是數據空白,因為只會執行一次,第二次條件就為了。幾經波折,終于知道,引起錯誤的原因是包沖突,和包的沖突。 接著上一篇《基于haddop的HDFS和Excel開源庫POI導出大數據報表(一)》的遺留的問題開始,這篇做優化處理。 優化導出流程 在一開始的時候,當我獲取到訂單的數量,遍歷訂單,獲取...

    WalkerXu 評論0 收藏0
  • POI使用及導出excel報表

    摘要:的使用及導出報表首先,了解是什么一基本概念是軟件基金會的開放源碼函式庫,提供給程序對格式檔案讀和寫的功能。 POI的使用及導出excel報表 首先,了解poi是什么? 一、基本概念 ? Apache POI是Apache軟件基金會的開放源碼函式庫,POI提供API給Java程序對Microsoft Office格式檔案讀和寫的功能。 二、基本結構 ? HSSF - 提供讀寫...

    Ilikewhite 評論0 收藏0
  • poi導出excel

    摘要:積分消費明細對賬單其中,有四個參數,分別是,,,。導出讀取數據庫的信息,轉成。 public void detailExport() { String sourceSystem = getPara(source_system); String dataDate = getPara(data_date); Integer pointsType = get...

    RayKr 評論0 收藏0
  • POI如何高效導出百萬級Excel數據

    摘要:閱讀原文如何高效導出百萬級數據在一個具有統計功能的系統中,導出功能幾乎是一定的,如何導出導出的數據有多少如何高效的導出簡介什么是就不用介紹了,這里主要說明不同版本下每個下的行列限制。 閱讀原文:POI如何高效導出百萬級Excel數據? 在一個具有統計功能的系統中,導出excel功能幾乎是一定的,如何導出excel?導出的數據有多少?如何高效的導出? Excel簡介什么是excel就不用...

    lemanli 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<