22 using System.Collections.Concurrent;
23 using System.Collections.Generic;
24 using System.Globalization;
27 using System.Threading;
28 using System.Threading.Tasks;
30 using DateTime = System.DateTime;
48 private readonly DirectoryInfo _dailyDataFolder;
49 private readonly DirectoryInfo _destinationFolder;
52 private readonly
string _market;
53 private readonly FileInfo _blackListedTickersFile;
63 var blackListedTickersFile =
new FileInfo(
"blacklisted-tickers.txt");
64 var reservedWordPrefix =
Config.
Get(
"reserved-words-prefix",
"quantconnect-");
67 mapFileProvider.Initialize(dataProvider);
69 factorFileProvider.Initialize(mapFileProvider, dataProvider);
72 return generator.Run(out _, out _);
87 DirectoryInfo dailyDataFolder,
88 DirectoryInfo destinationFolder,
90 FileInfo blackListedTickersFile,
91 string reservedWordsPrefix,
94 bool debugEnabled =
false)
96 _blackListedTickersFile = blackListedTickersFile;
98 _factorFileProvider = factorFileProvider;
99 _mapFileProvider = mapFileProvider;
100 _destinationFolder = destinationFolder;
101 _dailyDataFolder = dailyDataFolder;
103 Log.DebuggingEnabled = debugEnabled;
110 public bool Run(out ConcurrentDictionary<
SecurityIdentifier, List<CoarseFundamental>> coarsePerSecurity, out DateTime[] dates)
112 var startTime = DateTime.UtcNow;
114 Log.Trace($
"CoarseUniverseGeneratorProgram.ProcessDailyFolder(): Processing: {_dailyDataFolder.FullName}");
116 var symbolsProcessed = 0;
118 var dailyFilesNotFound = 0;
119 var coarseFilesGenerated = 0;
123 var result = coarsePerSecurity =
new();
124 dates = Array.Empty<DateTime>();
126 var blackListedTickers =
new HashSet<string>();
127 if (_blackListedTickersFile.Exists)
129 blackListedTickers = File.ReadAllLines(_blackListedTickersFile.FullName).ToHashSet();
132 var securityIdentifierContexts = PopulateSidContex(mapFileResolver, blackListedTickers);
133 var dailyPricesByTicker =
new ConcurrentDictionary<string, List<TradeBar>>();
134 var outputCoarseContent =
new ConcurrentDictionary<DateTime, List<CoarseFundamental>>();
136 var parallelOptions =
new ParallelOptions { MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2) };
139 Parallel.ForEach(securityIdentifierContexts, parallelOptions, sidContext =>
141 var coarseForSecurity =
new List<CoarseFundamental>();
142 var symbol =
new Symbol(sidContext.SID, sidContext.LastTicker);
143 var symbolCount = Interlocked.Increment(ref symbolsProcessed);
144 Log.Debug($
"CoarseUniverseGeneratorProgram.Run(): Processing {symbol} with tickers: '{string.Join(",
", sidContext.Tickers)}'");
145 var factorFile = _factorFileProvider.Get(symbol);
148 foreach (var ticker
in sidContext.Tickers)
150 var pathFile = Path.Combine(_dailyDataFolder.FullName, $
"{ticker}.zip");
151 var dailyFile =
new FileInfo(pathFile);
152 if (!dailyFile.Exists)
154 Log.Debug($
"CoarseUniverseGeneratorProgram.Run(): {dailyFile.FullName} not found, looking for daily data in data folder");
156 dailyFile =
new FileInfo(Path.Combine(
Globals.
DataFolder,
"equity",
"usa",
"daily", $
"{ticker}.zip"));
157 if (!dailyFile.Exists)
159 Log.Error($
"CoarseUniverseGeneratorProgram.Run(): {dailyFile} not found!");
160 Interlocked.Increment(ref dailyFilesNotFound);
165 if (!dailyPricesByTicker.ContainsKey(ticker))
167 dailyPricesByTicker.AddOrUpdate(ticker, ParseDailyFile(dailyFile));
168 Interlocked.Increment(ref filesRead);
173 for (
int mapFileRowIndex = sidContext.MapFileRows.Length - 1; mapFileRowIndex >= 1; mapFileRowIndex--)
175 var ticker = sidContext.MapFileRows[mapFileRowIndex].Item2.ToLowerInvariant();
176 var endDate = sidContext.MapFileRows[mapFileRowIndex].Item1;
177 var startDate = sidContext.MapFileRows[mapFileRowIndex - 1].Item1;
178 List<TradeBar> tickerDailyData;
179 if (!dailyPricesByTicker.TryGetValue(ticker, out tickerDailyData))
181 Log.Error($
"CoarseUniverseGeneratorProgram.Run(): Daily data for ticker {ticker.ToUpperInvariant()} not found!");
186 foreach (var tradeBar
in tickerDailyData.Where(tb => tb.Time >= startDate && tb.Time <= endDate))
188 var coarseFundamental = GenerateFactorFileRow(ticker, sidContext, factorFile as
CorporateFactorProvider, tradeBar);
189 coarseForSecurity.Add(coarseFundamental);
191 outputCoarseContent.AddOrUpdate(tradeBar.Time,
192 new List<CoarseFundamental> { coarseFundamental },
197 list.Add(coarseFundamental);
204 if(coarseForSecurity.Count > 0)
206 result[sidContext.SID] = coarseForSecurity;
208 if (symbolCount % 1000 == 0)
210 var elapsed = DateTime.UtcNow - startTime;
211 Log.Trace($
"CoarseUniverseGeneratorProgram.Run(): Processed {symbolCount} in {elapsed:g} at {symbolCount / elapsed.TotalMinutes:F2} symbols/minute ");
215 _destinationFolder.Create();
216 var startWriting = DateTime.UtcNow;
217 Parallel.ForEach(outputCoarseContent, coarseByDate =>
219 var filename = $
"{coarseByDate.Key.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture)}.csv";
220 var filePath = Path.Combine(_destinationFolder.FullName, filename);
221 Log.Debug($
"CoarseUniverseGeneratorProgram.Run(): Saving {filename} with {coarseByDate.Value.Count} entries.");
222 File.WriteAllLines(filePath, coarseByDate.Value.Select(x =>
CoarseFundamental.
ToRow(x)).OrderBy(cr => cr));
223 var filesCount = Interlocked.Increment(ref coarseFilesGenerated);
224 if (filesCount % 1000 == 0)
226 var elapsed = DateTime.UtcNow - startWriting;
227 Log.Trace($
"CoarseUniverseGeneratorProgram.Run(): Processed {filesCount} in {elapsed:g} at {filesCount / elapsed.TotalSeconds:F2} files/second ");
231 dates = outputCoarseContent.Keys.OrderBy(x => x).ToArray();
232 Log.Trace($
"\n\nTotal of {coarseFilesGenerated} coarse files generated in {DateTime.UtcNow - startTime:g}:\n" +
233 $
"\t => {filesRead} daily data files read.\n");
237 Log.Error(e, $
"CoarseUniverseGeneratorProgram.Run(): FAILED!");
256 var date = tradeBar.
Time;
258 var dollarVolume = Math.Truncate((
double)(tradeBar.
Close * tradeBar.
Volume));
259 var priceFactor = factorFileRow?.
PriceFactor.Normalize() ?? 1m;
260 var splitFactor = factorFileRow?.SplitFactor.Normalize() ?? 1m;
261 var hasFundamentalData = CheckFundamentalData(date, sidContext.SID);
264 return new CoarseFundamentalSource
267 Value = tradeBar.
Close.Normalize(),
269 VolumeSetter = decimal.ToInt64(tradeBar.
Volume),
270 DollarVolumeSetter = dollarVolume,
271 PriceFactorSetter = priceFactor,
272 SplitFactorSetter = splitFactor,
273 HasFundamentalDataSetter = hasFundamentalData
283 private static bool CheckFundamentalData(DateTime date, SecurityIdentifier sid)
285 return !
string.IsNullOrEmpty(
FundamentalService.Get<
string>(date, sid, HasFundamentalSource));
293 private static List<TradeBar> ParseDailyFile(FileInfo dailyFile)
295 var scaleFactor = 1 / 10000m;
297 var output =
new List<TradeBar>();
298 using (var fileStream = dailyFile.OpenRead())
299 using (var stream = Compression.UnzipStreamToStreamReader(fileStream))
301 while (!stream.EndOfStream)
305 Time = stream.GetDateTime(),
306 Open = stream.GetDecimal() * scaleFactor,
307 High = stream.GetDecimal() * scaleFactor,
308 Low = stream.GetDecimal() * scaleFactor,
309 Close = stream.GetDecimal() * scaleFactor,
310 Volume = stream.GetDecimal()
312 output.Add(tradeBar);
325 private IEnumerable<SecurityIdentifierContext> PopulateSidContex(
MapFileResolver mapFileResolver, HashSet<string> exclusions)
327 Log.Trace(
"CoarseUniverseGeneratorProgram.PopulateSidContex(): Generating SID context from QuantQuote's map files.");
328 foreach (var mapFile
in mapFileResolver)
330 if (exclusions.Contains(mapFile.Last().MappedSymbol))
335 yield
return new SecurityIdentifierContext(mapFile, _market);