17 using System.Collections.Generic;
18 using System.Diagnostics;
20 using System.IO.Compression;
22 using System.Threading;
23 using System.Threading.Tasks;
29 using Processors = Dictionary<Symbol, List<List<AlgoSeekFuturesProcessor>>>;
36 private readonly DirectoryInfo _source;
37 private readonly DirectoryInfo _remote;
38 private readonly
string _destination;
39 private readonly List<Resolution> _resolutions;
40 private readonly DateTime _referenceDate;
41 private readonly HashSet<string> _symbolFilter;
52 public AlgoSeekFuturesConverter(List<Resolution> resolutions, DateTime referenceDate,
string remote,
string source,
string destination, HashSet<string> symbolFilter =
null)
54 _source =
new DirectoryInfo(source);
55 _remote =
new DirectoryInfo(remote);
56 _referenceDate = referenceDate;
57 _destination = destination;
58 _resolutions = resolutions;
59 _symbolFilter = symbolFilter;
67 Log.
Trace(
"AlgoSeekFuturesConverter.Convert(): Copying remote raw data files locally.");
70 var files = GetFilesInRawFolder()
71 .Where(f => (f.Extension ==
".gz" || f.Extension ==
".bz2") && !f.Name.Contains(
"option"))
72 .Select(remote => remote.CopyTo(Path.Combine(Path.GetTempPath(), remote.Name),
true))
75 Log.
Trace(
"AlgoSeekFuturesConverter.Convert(): Loading {0} AlgoSeekFuturesReader for {1} ", files.Count, _referenceDate);
78 var totalLinesProcessed = 0L;
79 var totalFiles = files.Count;
80 var totalFilesProcessed = 0;
81 var start = DateTime.MinValue;
83 var symbolMultipliers = LoadSymbolMultipliers();
86 Parallel.ForEach(files, file =>
92 var csvFile = Path.Combine(_source.FullName, Path.GetFileNameWithoutExtension(file.Name));
94 Log.
Trace(
"Source File :" + csvFile);
96 if (!File.Exists(csvFile))
99 var csvFileInfo = new FileInfo(csvFile);
100 Directory.CreateDirectory(csvFileInfo.DirectoryName);
102 Log.Trace(
"AlgoSeekFuturesConverter.Convert(): Extracting " + file);
106 Compression.Extract7ZipArchive(file.FullName, _source.FullName, -1);
110 var processors =
new Processors();
113 if (start == DateTime.MinValue)
115 start = DateTime.Now;
118 if (reader.Current !=
null)
122 var tick = reader.Current as
Tick;
124 if (tick.Symbol.ID.Symbol ==
"VX" && (
125 tick.BidPrice >= 998m || tick.AskPrice >= 998m))
131 List<List<AlgoSeekFuturesProcessor>> symbolProcessors;
132 if (!processors.TryGetValue(tick.Symbol, out symbolProcessors))
134 symbolProcessors =
new List<List<AlgoSeekFuturesProcessor>>(3)
141 processors[tick.Symbol] = symbolProcessors;
145 foreach (var processor
in symbolProcessors[(
int)tick.TickType])
147 processor.Process(tick);
150 if (Interlocked.Increment(ref totalLinesProcessed) % 1000000m == 0)
152 var pro = (double)processors.Values.SelectMany(p => p.SelectMany(x => x)).Count();
153 var symbols = (double)processors.Keys.Count;
154 Log.
Trace(
"AlgoSeekFuturesConverter.Convert(): Processed {0,3}M ticks( {1}k / sec); Memory in use: {2} MB; Total progress: {3}%, Processor per symbol {4}", Math.Round(totalLinesProcessed / 1000000m, 2), Math.Round(totalLinesProcessed / 1000L / (DateTime.Now - start).TotalSeconds), Process.GetCurrentProcess().WorkingSet64 / (1024 * 1024), 100 * totalFilesProcessed / totalFiles, pro / symbols);
158 while (reader.MoveNext());
160 Log.
Trace(
"AlgoSeekFuturesConverter.Convert(): Performing final flush to disk... ");
161 Flush(processors, DateTime.MaxValue,
true);
166 GC.WaitForPendingFinalizers();
168 Log.
Trace(
"AlgoSeekFuturesConverter.Convert(): Finished processing file: " + file);
169 Interlocked.Increment(ref totalFilesProcessed);
173 Log.
Error(
"Exception caught! File: {0} Err: {1} Source {2} Stack {3}", file, err.Message, err.Source, err.StackTrace);
184 private IEnumerable<FileInfo> GetFilesInRawFolder()
186 var files =
new List<FileInfo>();
188 var command =
OS.
IsLinux ?
"ls" :
"cmd.exe";
189 var arguments =
OS.
IsWindows ?
"/c dir /b /a-d" :
string.Empty;
191 var processStartInfo =
new ProcessStartInfo(command, arguments)
193 CreateNoWindow =
true,
194 WindowStyle = ProcessWindowStyle.Hidden,
195 UseShellExecute =
false,
196 RedirectStandardOutput =
true,
197 WorkingDirectory = _remote.FullName
200 using (var process =
new Process())
203 process.StartInfo = processStartInfo;
206 while (!process.StandardOutput.EndOfStream)
208 var line = process.StandardOutput.ReadLine();
211 files.Add(
new FileInfo(Path.Combine(_remote.FullName, line)));
214 process.WaitForExit();
225 private Dictionary<string, decimal> LoadSymbolMultipliers()
227 const int columnUnderlying = 0;
228 const int columnMultipleFactor = 2;
230 return File.ReadAllLines(
"AlgoSeekFuturesConverter/AlgoSeek.US.Futures.PriceMultipliers.1.1.csv")
231 .Select(line => line.ToCsvData())
233 .Where(line => !
string.IsNullOrEmpty(line[columnUnderlying]) &&
234 !
string.IsNullOrEmpty(line[columnMultipleFactor]))
237 .ToDictionary(line => line[columnUnderlying],
238 line => line[columnMultipleFactor].ConvertInvariant<decimal>());
241 private void Flush(Processors processors, DateTime time,
bool final)
243 foreach (var symbol
in processors.Keys)
245 processors[symbol].ForEach(p => p.ForEach(x => x.FlushBuffer(time,
final)));
254 var zipper =
OS.
IsWindows ?
"C:/Program Files/7-Zip/7z.exe" :
"7z";
256 Log.
Trace(
"AlgoSeekFuturesConverter.Package(): Zipping all files ...");
258 var destination = Path.Combine(_destination,
"future");
259 Directory.CreateDirectory(destination);
263 Directory.EnumerateFiles(destination, dateMask +
"*.csv", SearchOption.AllDirectories)
264 .GroupBy(x => Directory.GetParent(x).FullName)
268 Parallel.ForEach(files, file =>
273 var outputFileName = file.Key +
".zip";
276 var filesToCompress = Directory.GetFiles(file.Key,
"*.csv", SearchOption.AllDirectories);
277 var zip = ZipFile.Open(outputFileName, ZipArchiveMode.Create);
279 foreach (var fileToCompress in filesToCompress)
282 zip.CreateEntryFromFile(fileToCompress, Path.GetFileName(fileToCompress), CompressionLevel.Optimal);
290 Directory.Delete(file.Key,
true);
292 catch (Exception err)
294 Log.
Error(
"Directory.Delete returned error: " + err.Message);
297 catch (Exception err)
299 Log.
Error(
"File: {0} Err: {1} Source {2} Stack {3}", file, err.Message, err.Source, err.StackTrace);