"use strict"; var __extends = (this && this.__extends) || (function () { var extendStatics = Object.setPrototypeOf || ({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; }; return function (d, b) { extendStatics(d, b); function __() { this.constructor = d; } d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); }; })(); var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); var stream_1 = require("stream"); var Parameters_1 = require("./Parameters"); var ParseRuntime_1 = require("./ParseRuntime"); var bluebird_1 = __importDefault(require("bluebird")); // import { ProcessorFork } from "./ProcessFork"; var ProcessorLocal_1 = require("./ProcessorLocal"); var Result_1 = require("./Result"); var Converter = /** @class */ (function (_super) { __extends(Converter, _super); function Converter(param, options) { if (options === void 0) { options = {}; } var _this = _super.call(this, options) || this; _this.options = options; _this.params = Parameters_1.mergeParams(param); _this.runtime = ParseRuntime_1.initParseRuntime(_this); _this.result = new Result_1.Result(_this); // if (this.params.fork) { // this.processor = new ProcessorFork(this); // } else { _this.processor = new ProcessorLocal_1.ProcessorLocal(_this); // } _this.once("error", function (err) { // console.log("BBB"); setTimeout(function () { _this.result.processError(err); _this.emit("done", err); }, 0); }); _this.once("done", function () { _this.processor.destroy(); }); return _this; } Converter.prototype.preRawData = function (onRawData) { this.runtime.preRawDataHook = onRawData; return this; }; Converter.prototype.preFileLine = function (onFileLine) { this.runtime.preFileLineHook = onFileLine; return this; }; Converter.prototype.subscribe = function (onNext, onError, onCompleted) { this.parseRuntime.subscribe = { onNext: onNext, onError: onError, onCompleted: onCompleted }; return this; }; Converter.prototype.fromFile = function (filePath, options) { var _this = this; var fs = require("fs"); // var rs = null; // this.wrapCallback(cb, function () { // if (rs && rs.destroy) { // rs.destroy(); // } // }); fs.exists(filePath, function (exist) { if (exist) { var rs = fs.createReadStream(filePath, options); rs.pipe(_this); } else { _this.emit('error', new Error("File does not exist. Check to make sure the file path to your csv is correct.")); } }); return this; }; Converter.prototype.fromStream = function (readStream) { readStream.pipe(this); return this; }; Converter.prototype.fromString = function (csvString) { var csv = csvString.toString(); var read = new stream_1.Readable(); var idx = 0; read._read = function (size) { if (idx >= csvString.length) { this.push(null); } else { var str = csvString.substr(idx, size); this.push(str); idx += size; } }; return this.fromStream(read); }; Converter.prototype.then = function (onfulfilled, onrejected) { var _this = this; return new bluebird_1.default(function (resolve, reject) { _this.parseRuntime.then = { onfulfilled: function (value) { if (onfulfilled) { resolve(onfulfilled(value)); } else { resolve(value); } }, onrejected: function (err) { if (onrejected) { resolve(onrejected(err)); } else { reject(err); } } }; }); }; Object.defineProperty(Converter.prototype, "parseParam", { get: function () { return this.params; }, enumerable: true, configurable: true }); Object.defineProperty(Converter.prototype, "parseRuntime", { get: function () { return this.runtime; }, enumerable: true, configurable: true }); Converter.prototype._transform = function (chunk, encoding, cb) { var _this = this; this.processor.process(chunk) .then(function (result) { // console.log(result); if (result.length > 0) { _this.runtime.started = true; return _this.result.processResult(result); } }) .then(function () { _this.emit("drained"); cb(); }, function (error) { _this.runtime.hasError = true; _this.runtime.error = error; _this.emit("error", error); cb(); }); }; Converter.prototype._flush = function (cb) { var _this = this; this.processor.flush() .then(function (data) { if (data.length > 0) { return _this.result.processResult(data); } }) .then(function () { _this.processEnd(cb); }, function (err) { _this.emit("error", err); cb(); }); }; Converter.prototype.processEnd = function (cb) { this.result.endProcess(); this.emit("done"); cb(); }; Object.defineProperty(Converter.prototype, "parsedLineNumber", { get: function () { return this.runtime.parsedLineNumber; }, enumerable: true, configurable: true }); return Converter; }(stream_1.Transform)); exports.Converter = Converter; //# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"file":"/Users/kxiang/work/projects/csv2json/src/Converter.ts","sources":["/Users/kxiang/work/projects/csv2json/src/Converter.ts"],"names":[],"mappings":";;;;;;;;;;;;;;;AAAA,iCAA+D;AAC/D,2CAA0D;AAC1D,+CAAgE;AAChE,sDAAyB;AAOzB,iDAAiD;AACjD,mDAAkD;AAClD,mCAAkC;AAMlC;IAA+B,6BAAS;IAuFtC,mBAAY,KAA8B,EAAS,OAA8B;QAA9B,wBAAA,EAAA,YAA8B;QAAjF,YACE,kBAAM,OAAO,CAAC,SAuBf;QAxBkD,aAAO,GAAP,OAAO,CAAuB;QAE/E,KAAI,CAAC,MAAM,GAAG,wBAAW,CAAC,KAAK,CAAC,CAAC;QACjC,KAAI,CAAC,OAAO,GAAG,+BAAgB,CAAC,KAAI,CAAC,CAAC;QACtC,KAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,KAAI,CAAC,CAAC;QAC/B,0BAA0B;QAC1B,8CAA8C;QAC9C,WAAW;QACX,KAAI,CAAC,SAAS,GAAG,IAAI,+BAAc,CAAC,KAAI,CAAC,CAAC;QAC1C,IAAI;QACJ,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,UAAC,GAAQ;YAC1B,sBAAsB;YAEtB,UAAU,CAAC;gBACT,KAAI,CAAC,MAAM,CAAC,YAAY,CAAC,GAAG,CAAC,CAAC;gBAC9B,KAAI,CAAC,IAAI,CAAC,MAAM,EAAE,GAAG,CAAC,CAAC;YACzB,CAAC,EAAE,CAAC,CAAC,CAAC;QAER,CAAC,CAAC,CAAC;QACH,KAAI,CAAC,IAAI,CAAC,MAAM,EAAE;YAChB,KAAI,CAAC,SAAS,CAAC,OAAO,EAAE,CAAC;QAC3B,CAAC,CAAC,CAAA;QAEF,OAAO,KAAI,CAAC;IACd,CAAC;IA9GD,8BAAU,GAAV,UAAW,SAA6B;QACtC,IAAI,CAAC,OAAO,CAAC,cAAc,GAAG,SAAS,CAAC;QACxC,OAAO,IAAI,CAAC;IACd,CAAC;IACD,+BAAW,GAAX,UAAY,UAA+B;QACzC,IAAI,CAAC,OAAO,CAAC,eAAe,GAAG,UAAU,CAAC;QAC1C,OAAO,IAAI,CAAC;IACd,CAAC;IACD,6BAAS,GAAT,UACE,MAAoE,EACpE,OAAiC,EACjC,WAAwB;QACxB,IAAI,CAAC,YAAY,CAAC,SAAS,GAAG;YAC5B,MAAM,QAAA;YACN,OAAO,SAAA;YACP,WAAW,aAAA;SACZ,CAAA;QACD,OAAO,IAAI,CAAC;IACd,CAAC;IACD,4BAAQ,GAAR,UAAS,QAAgB,EAAE,OAAqD;QAAhF,iBAiBC;QAhBC,IAAM,EAAE,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;QACzB,iBAAiB;QACjB,sCAAsC;QACtC,4BAA4B;QAC5B,oBAAoB;QACpB,MAAM;QACN,MAAM;QACN,EAAE,CAAC,MAAM,CAAC,QAAQ,EAAE,UAAC,KAAK;YACxB,IAAI,KAAK,EAAE;gBACT,IAAM,EAAE,GAAG,EAAE,CAAC,gBAAgB,CAAC,QAAQ,EAAE,OAAO,CAAC,CAAC;gBAClD,EAAE,CAAC,IAAI,CAAC,KAAI,CAAC,CAAC;aACf;iBAAM;gBACL,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,IAAI,KAAK,CAAC,+EAA+E,CAAC,CAAC,CAAC;aAChH;QACH,CAAC,CAAC,CAAC;QACH,OAAO,IAAI,CAAC;IACd,CAAC;IACD,8BAAU,GAAV,UAAW,UAAoB;QAC7B,UAAU,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;QACtB,OAAO,IAAI,CAAC;IACd,CAAC;IACD,8BAAU,GAAV,UAAW,SAAiB;QAC1B,IAAM,GAAG,GAAG,SAAS,CAAC,QAAQ,EAAE,CAAC;QACjC,IAAM,IAAI,GAAG,IAAI,iBAAQ,EAAE,CAAC;QAC5B,IAAI,GAAG,GAAG,CAAC,CAAC;QACZ,IAAI,CAAC,KAAK,GAAG,UAAU,IAAI;YACzB,IAAI,GAAG,IAAI,SAAS,CAAC,MAAM,EAAE;gBAC3B,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,CAAC;aACjB;iBAAM;gBACL,IAAM,GAAG,GAAG,SAAS,CAAC,MAAM,CAAC,GAAG,EAAE,IAAI,CAAC,CAAC;gBACxC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC;gBACf,GAAG,IAAI,IAAI,CAAC;aACb;QACH,CAAC,CAAA;QACD,OAAO,IAAI,CAAC,UAAU,CAAC,IAAI,CAAC,CAAC;IAC/B,CAAC;IACD,wBAAI,GAAJ,UAAyC,WAAgE,EAAE,UAA8D;QAAzK,iBAmBC;QAlBC,OAAO,IAAI,kBAAC,CAAC,UAAC,OAAO,EAAE,MAAM;YAC3B,KAAI,CAAC,YAAY,CAAC,IAAI,GAAG;gBACvB,WAAW,EAAE,UAAC,KAAY;oBACxB,IAAI,WAAW,EAAE;wBACf,OAAO,CAAC,WAAW,CAAC,KAAK,CAAC,CAAC,CAAC;qBAC7B;yBAAM;wBACL,OAAO,CAAC,KAAY,CAAC,CAAC;qBACvB;gBACH,CAAC;gBACD,UAAU,EAAE,UAAC,GAAU;oBACrB,IAAI,UAAU,EAAE;wBACd,OAAO,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC,CAAC;qBAC1B;yBAAM;wBACL,MAAM,CAAC,GAAG,CAAC,CAAC;qBACb;gBACH,CAAC;aACF,CAAA;QACH,CAAC,CAAC,CAAC;IACL,CAAC;IACD,sBAAW,iCAAU;aAArB;YACE,OAAO,IAAI,CAAC,MAAM,CAAC;QACrB,CAAC;;;OAAA;IACD,sBAAW,mCAAY;aAAvB;YACE,OAAO,IAAI,CAAC,OAAO,CAAC;QACtB,CAAC;;;OAAA;IA8BD,8BAAU,GAAV,UAAW,KAAU,EAAE,QAAgB,EAAE,EAAY;QAArD,iBAmBC;QAlBC,IAAI,CAAC,SAAS,CAAC,OAAO,CAAC,KAAK,CAAC;aAC1B,IAAI,CAAC,UAAC,MAAM;YACX,uBAAuB;YACvB,IAAI,MAAM,CAAC,MAAM,GAAG,CAAC,EAAE;gBACrB,KAAI,CAAC,OAAO,CAAC,OAAO,GAAG,IAAI,CAAC;gBAE5B,OAAO,KAAI,CAAC,MAAM,CAAC,aAAa,CAAC,MAAM,CAAC,CAAC;aAC1C;QACH,CAAC,CAAC;aACD,IAAI,CAAC;YACJ,KAAI,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YACrB,EAAE,EAAE,CAAC;QACP,CAAC,EAAE,UAAC,KAAK;YACP,KAAI,CAAC,OAAO,CAAC,QAAQ,GAAG,IAAI,CAAC;YAC7B,KAAI,CAAC,OAAO,CAAC,KAAK,GAAG,KAAK,CAAC;YAC3B,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,KAAK,CAAC,CAAC;YAC1B,EAAE,EAAE,CAAC;QACP,CAAC,CAAC,CAAC;IACP,CAAC;IACD,0BAAM,GAAN,UAAO,EAAY;QAAnB,iBAcC;QAbC,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE;aACnB,IAAI,CAAC,UAAC,IAAI;YACT,IAAI,IAAI,CAAC,MAAM,GAAG,CAAC,EAAE;gBAEnB,OAAO,KAAI,CAAC,MAAM,CAAC,aAAa,CAAC,IAAI,CAAC,CAAC;aACxC;QACH,CAAC,CAAC;aACD,IAAI,CAAC;YACJ,KAAI,CAAC,UAAU,CAAC,EAAE,CAAC,CAAC;QACtB,CAAC,EAAE,UAAC,GAAG;YACL,KAAI,CAAC,IAAI,CAAC,OAAO,EAAE,GAAG,CAAC,CAAC;YACxB,EAAE,EAAE,CAAC;QACP,CAAC,CAAC,CAAA;IACN,CAAC;IACO,8BAAU,GAAlB,UAAmB,EAAE;QACnB,IAAI,CAAC,MAAM,CAAC,UAAU,EAAE,CAAC;QACzB,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC;QAClB,EAAE,EAAE,CAAC;IACP,CAAC;IACD,sBAAI,uCAAgB;aAApB;YACE,OAAO,IAAI,CAAC,OAAO,CAAC,gBAAgB,CAAC;QACvC,CAAC;;;OAAA;IACH,gBAAC;AAAD,CAAC,AA3JD,CAA+B,kBAAS,GA2JvC;AA3JY,8BAAS","sourcesContent":["import { Transform, TransformOptions, Readable } from \"stream\";\nimport { CSVParseParam, mergeParams } from \"./Parameters\";\nimport { ParseRuntime, initParseRuntime } from \"./ParseRuntime\";\nimport P from \"bluebird\";\nimport { stringToLines } from \"./fileline\";\nimport { map } from \"lodash/map\";\nimport { RowSplit, RowSplitResult } from \"./rowSplit\";\nimport getEol from \"./getEol\";\nimport lineToJson, { JSONResult } from \"./lineToJson\";\nimport { Processor, ProcessLineResult } from \"./Processor\";\n// import { ProcessorFork } from \"./ProcessFork\";\nimport { ProcessorLocal } from \"./ProcessorLocal\";\nimport { Result } from \"./Result\";\nimport CSVError from \"./CSVError\";\nimport { bufFromString } from \"./util\";\n\n\n\nexport class Converter extends Transform implements PromiseLike<Array<any>> {\n  preRawData(onRawData: PreRawDataCallback): Converter {\n    this.runtime.preRawDataHook = onRawData;\n    return this;\n  }\n  preFileLine(onFileLine: PreFileLineCallback): Converter {\n    this.runtime.preFileLineHook = onFileLine;\n    return this;\n  }\n  subscribe(\n    onNext?: (data: any, lineNumber: number) => void | PromiseLike<void>,\n    onError?: (err: CSVError) => void,\n    onCompleted?: () => void): Converter {\n    this.parseRuntime.subscribe = {\n      onNext,\n      onError,\n      onCompleted\n    }\n    return this;\n  }\n  fromFile(filePath: string, options?: string | CreateReadStreamOption | undefined): Converter {\n    const fs = require(\"fs\");\n    // var rs = null;\n    // this.wrapCallback(cb, function () {\n    //   if (rs && rs.destroy) {\n    //     rs.destroy();\n    //   }\n    // });\n    fs.exists(filePath, (exist) => {\n      if (exist) {\n        const rs = fs.createReadStream(filePath, options);\n        rs.pipe(this);\n      } else {\n        this.emit('error', new Error(\"File does not exist. Check to make sure the file path to your csv is correct.\"));\n      }\n    });\n    return this;\n  }\n  fromStream(readStream: Readable): Converter {\n    readStream.pipe(this);\n    return this;\n  }\n  fromString(csvString: string): Converter {\n    const csv = csvString.toString();\n    const read = new Readable();\n    let idx = 0;\n    read._read = function (size) {\n      if (idx >= csvString.length) {\n        this.push(null);\n      } else {\n        const str = csvString.substr(idx, size);\n        this.push(str);\n        idx += size;\n      }\n    }\n    return this.fromStream(read);\n  }\n  then<TResult1 = any[], TResult2 = never>(onfulfilled?: (value: any[]) => TResult1 | PromiseLike<TResult1>, onrejected?: (reason: any) => TResult2 | PromiseLike<TResult2>): PromiseLike<TResult1 | TResult2> {\n    return new P((resolve, reject) => {\n      this.parseRuntime.then = {\n        onfulfilled: (value: any[]) => {\n          if (onfulfilled) {\n            resolve(onfulfilled(value));\n          } else {\n            resolve(value as any);\n          }\n        },\n        onrejected: (err: Error) => {\n          if (onrejected) {\n            resolve(onrejected(err));\n          } else {\n            reject(err);\n          }\n        }\n      }\n    });\n  }\n  public get parseParam(): CSVParseParam {\n    return this.params;\n  }\n  public get parseRuntime(): ParseRuntime {\n    return this.runtime;\n  }\n  private params: CSVParseParam;\n  private runtime: ParseRuntime;\n  private processor: Processor;\n  private result: Result;\n  constructor(param?: Partial<CSVParseParam>, public options: TransformOptions = {}) {\n    super(options);\n    this.params = mergeParams(param);\n    this.runtime = initParseRuntime(this);\n    this.result = new Result(this);\n    // if (this.params.fork) {\n    //   this.processor = new ProcessorFork(this);\n    // } else {\n    this.processor = new ProcessorLocal(this);\n    // }\n    this.once(\"error\", (err: any) => {\n      // console.log(\"BBB\");\n\n      setTimeout(() => {\n        this.result.processError(err);\n        this.emit(\"done\", err);\n      }, 0);\n\n    });\n    this.once(\"done\", () => {\n      this.processor.destroy();\n    })\n\n    return this;\n  }\n  _transform(chunk: any, encoding: string, cb: Function) {\n    this.processor.process(chunk)\n      .then((result) => {\n        // console.log(result);\n        if (result.length > 0) {\n          this.runtime.started = true;\n\n          return this.result.processResult(result);\n        }\n      })\n      .then(() => {\n        this.emit(\"drained\");\n        cb();\n      }, (error) => {\n        this.runtime.hasError = true;\n        this.runtime.error = error;\n        this.emit(\"error\", error);\n        cb();\n      });\n  }\n  _flush(cb: Function) {\n    this.processor.flush()\n      .then((data) => {\n        if (data.length > 0) {\n\n          return this.result.processResult(data);\n        }\n      })\n      .then(() => {\n        this.processEnd(cb);\n      }, (err) => {\n        this.emit(\"error\", err);\n        cb();\n      })\n  }\n  private processEnd(cb) {\n    this.result.endProcess();\n    this.emit(\"done\");\n    cb();\n  }\n  get parsedLineNumber(): number {\n    return this.runtime.parsedLineNumber;\n  }\n}\nexport interface CreateReadStreamOption {\n  flags?: string;\n  encoding?: string;\n  fd?: number;\n  mode?: number;\n  autoClose?: boolean;\n  start?: number;\n  end?: number;\n  highWaterMark?: number;\n}\nexport type CallBack = (err: Error, data: Array<any>) => void;\n\n\nexport type PreFileLineCallback = (line: string, lineNumber: number) => string | PromiseLike<string>;\nexport type PreRawDataCallback = (csvString: string) => string | PromiseLike<string>;\n"]}