21 using System.Globalization;
23 using System.Threading.Tasks;
26 using System.Collections.Generic;
47 private readonly
Symbol _symbol;
48 private readonly
bool _mapSymbol;
49 private readonly
string _dataDirectory;
77 _mapSymbol = mapSymbol;
88 throw new NotImplementedException(
"Sorry this security type is not yet supported by the LEAN data writer: " + _securityType);
104 _dataDirectory = dataDirectory;
105 _resolution = resolution;
106 _securityType = securityType;
107 _tickType = tickType;
108 if (writePolicy ==
null)
114 _writePolicy = writePolicy.Value;
123 public void Write(IEnumerable<BaseData> source)
125 var lastTime = DateTime.MinValue;
126 var outputFile =
string.Empty;
128 var currentFileData =
new List<TimedLine>();
129 var writeTasks =
new Queue<Task>();
131 foreach (var data
in source)
134 if (data.Time < lastTime)
throw new Exception(
"The data must be pre-sorted from oldest to newest");
138 if (data.Time.Date != lastTime.Date)
140 var mappedSymbol = GetMappedSymbol(data.Time, data.Symbol);
142 var latestOutputFile = GetZipOutputFileName(_dataDirectory, data.Time, mappedSymbol);
143 var latestSymbol = mappedSymbol;
144 if (outputFile.IsNullOrEmpty() || outputFile != latestOutputFile)
146 if (!currentFileData.IsNullOrEmpty())
149 var file = outputFile;
150 var fileData = currentFileData;
151 var fileSymbol = symbol;
152 writeTasks.Enqueue(Task.Run(() =>
154 WriteFile(file, fileData, fileSymbol);
159 currentFileData =
new List<TimedLine>();
160 outputFile = latestOutputFile;
161 symbol = latestSymbol;
167 currentFileData.Add(
new TimedLine(data.Time, line));
170 lastTime = data.Time;
174 if (!currentFileData.IsNullOrEmpty())
177 WriteFile(outputFile, currentFileData, symbol);
181 while (writeTasks.Count > 0)
183 var task = writeTasks.Dequeue();
197 if (symbols.Count == 0)
199 throw new ArgumentException(
"DownloadAndSave(): The symbol list cannot be empty.");
204 throw new ArgumentException(
"DownloadAndSave(): The tick type must be Trade or Quote.");
207 if (symbols.Any(x => x.SecurityType != _securityType))
209 throw new ArgumentException($
"DownloadAndSave(): All symbols must have {_securityType} security type.");
212 if (symbols.DistinctBy(x => x.ID.Symbol).Count() > 1)
214 throw new ArgumentException(
"DownloadAndSave(): All symbols must have the same root ticker.");
221 var ticker = symbols.First().ID.Symbol;
222 var market = symbols.First().ID.Market;
224 var canonicalSymbol =
Symbol.
Create(ticker, _securityType, market);
226 var exchangeHours = marketHoursDatabase.GetExchangeHours(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
227 var dataTimeZone = marketHoursDatabase.GetDataTimeZone(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
229 foreach (var symbol
in symbols)
246 var history = brokerage.GetHistory(historyRequest)?
251 x.Time = x.Time.ConvertTo(exchangeHours.TimeZone, dataTimeZone);
262 var writer =
new LeanDataWriter(_resolution, symbol, _dataDirectory, _tickType);
263 writer.Write(history);
270 private bool TryLoadFile(
string fileName,
string entryName, DateTime date, out SortedDictionary<DateTime, string> rows)
272 rows =
new SortedDictionary<DateTime, string>();
274 using (var stream = _dataCacheProvider.Fetch($
"{fileName}#{entryName}"))
281 using (var reader =
new StreamReader(stream))
284 while ((line = reader.ReadLine()) !=
null)
305 private void WriteFile(
string filePath, List<TimedLine> data, Symbol symbol)
307 filePath = FileExtension.ToNormalizedPath(filePath);
308 if (data ==
null || data.Count == 0)
315 _keySynchronizer.
Execute(filePath, singleExecution:
false, () =>
317 var date = data[0].Time;
322 var fileExists =
File.Exists(filePath);
327 Directory.CreateDirectory(Path.GetDirectoryName(filePath));
332 string finalData =
null;
335 var streamWriter = new ZipStreamWriter(filePath, entryName);
336 foreach (var tuple in data)
338 streamWriter.WriteLine(tuple.Line);
340 streamWriter.DisposeSafely();
342 else if (_writePolicy ==
WritePolicy.Merge && fileExists && TryLoadFile(filePath, entryName, date, out var rows))
345 foreach (var timedLine
in data)
347 rows[timedLine.Time] = timedLine.Line;
351 finalData =
string.Join(
"\n", rows.Values);
356 finalData =
string.Join(
"\n", data.Select(x => x.Line));
359 if (finalData !=
null)
361 var bytes = Encoding.UTF8.GetBytes(finalData);
362 _dataCacheProvider.Store($
"{filePath}#{entryName}", bytes);
367 var from = data[0].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
368 var to = data[data.Count - 1].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
369 Log.Debug($
"LeanDataWriter.Write({symbol.ID}): Appended: {filePath} @ {entryName} {from}->{to}");
381 private string GetZipOutputFileName(
string baseDirectory, DateTime time, Symbol symbol)
389 private Symbol GetMappedSymbol(DateTime time, Symbol symbol)
395 if (symbol.RequiresMapping())
398 var mapFile = mapFileResolver.ResolveMapFile(symbol);
399 var mappedTicker = mapFile.GetMappedSymbol(time);
400 if(!
string.IsNullOrEmpty(mappedTicker))
403 symbol = symbol.UpdateMappedSymbol(mappedTicker);
410 private class TimedLine
412 public string Line {
get; }
413 public DateTime Time {
get; }
414 public TimedLine(DateTime time,
string line)