代码如下:
static void Main(string[] args)
{
Console.WriteLine("开始执行...");
var handle_ok = Handler();
handle_ok.Wait();
// 如果处理完了...
Console.SetBufferSize(120, 20000);
Console.WriteLine("完事.....");
Console.Read();
}
public static async Task Handler()
{
var BlockingQueue = new BlockingCollection<string>();
Task producer = Pipeline.ReadFileName(BlockingQueue, "E:\\程序文件\\C#程序代码\\DeepExtend\\DeepExtend\\");
Task Consumer = Pipeline.LoadFileContent(BlockingQueue);
await Task.WhenAll(producer, Consumer);
}
pipeline.cs
static class Pipeline
{
public static Task ReadFileName(BlockingCollection<string> BlockingQueue, string path)
{
bool success = false;
return Task.Run(() =>
{
foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
{
try
{
success = BlockingQueue.TryAdd(filename);
ConsoleHelper.WriteLine("生产者正在读取文件名....");
}
catch(OperationCanceledException)
{
Console.WriteLine("出错了...");
break;
}
if(!success)
{
Console.WriteLine("发生了阻塞....");
}
}
BlockingQueue.CompleteAdding();
});
}
public static async Task LoadFileContent(BlockingCollection<string> BlockQueue)
{
//string text = null;
//int nextItem = 0;
while (!BlockQueue.IsCompleted)
{
try
{
foreach (var filename in BlockQueue.GetConsumingEnumerable())
{
using (FileStream stream = File.OpenRead(filename))
{
var reader = new StreamReader(stream);
string line = null;
while ((line = await reader.ReadLineAsync()) != null)
{
ConsoleHelper.WriteLine(line);
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("出错了...");
break;
}
}
}
}
截图: