![](/media/cache/img/default_profile.jpg.50x50_q85.jpg)
repo time
Dependencies: mbed MAX14720 MAX30205 USBDevice
Diff: HspGuiSourceV301/GuiDLLs/RPCSupport/Streaming/Streaming.cs
- Revision:
- 20:6d2af70c92ab
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/HspGuiSourceV301/GuiDLLs/RPCSupport/Streaming/Streaming.cs Tue Apr 06 06:41:40 2021 +0000 @@ -0,0 +1,391 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.ComponentModel; +using System.Collections; +using System.Globalization; +using System.Threading; + +namespace RPCSupport.Streaming +{ + public class Streaming + { + BackgroundWorker _testWorker; + Pipelines.Pipeline pipeline; + + public int Timestamp; + public int NumberPacketCrcErrors; + public int NumberPacketHeaderErrors; + public ArrayList ArrayListRed = new ArrayList(); + public ArrayList ArrayListIR = new ArrayList(); + public ArrayList ArrayListGreen = new ArrayList(); + public event EventHandler<PartialArrayIntAvailableEventArgs> PartialArrayIntAvailable; + private bool handlersInited = false; + + public void Init(Pipelines.Pipeline pipeline) + { + this.pipeline = pipeline; + + if (handlersInited == false) + { + handlersInited = true; + _testWorker = new BackgroundWorker(); + _testWorker.WorkerSupportsCancellation = true; + _testWorker.ProgressChanged += new ProgressChangedEventHandler(_testWorker_ProgressChanged); + _testWorker.DoWork += new DoWorkEventHandler(_testWorker_DoWork); + _testWorker.RunWorkerCompleted += new RunWorkerCompletedEventHandler(_testWorker_RunWorkerCompleted); + _testWorker.WorkerReportsProgress = true; + } + } + + protected virtual void OnPartialArrayIntAvailable(PartialArrayIntAvailableEventArgs e) + { + EventHandler<PartialArrayIntAvailableEventArgs> handler = PartialArrayIntAvailable; + if (handler != null) + { + handler(this, e); + } + } + + public void Start() + { + ArrayListRed.Clear(); + ArrayListIR.Clear(); + ArrayListGreen.Clear(); + _testWorker.RunWorkerAsync(); + } + public void Stop() + { + _testWorker.CancelAsync(); + } + void _testWorker_ProgressChanged(object sender, ProgressChangedEventArgs e) + { + var data = (System.Tuple<int, int, int[], int[], int[]>)(e.UserState); + int reportID = data.Item1; + int timestamp = data.Item2; + int[] list1_Red___or_X = data.Item3; + int[] list2_IR____or_Y = data.Item4; + int[] list3_Green_or_Z = data.Item5; + + if (list1_Red___or_X != null) + for (int index = 0; index < list1_Red___or_X.Length; index++) + { + ArrayListRed.Add(list1_Red___or_X[index]); + } + if (list2_IR____or_Y != null) + for (int index = 0; index < list2_IR____or_Y.Length; index++) + { + ArrayListIR.Add(list2_IR____or_Y[index]); + } + if (list3_Green_or_Z != null) + for (int index = 0; index < list3_Green_or_Z.Length; index++) + { + ArrayListGreen.Add(list3_Green_or_Z[index]); + } + OnPartialArrayIntAvailable(new PartialArrayIntAvailableEventArgs() + { + reportID = reportID, + timestamp = timestamp, + array1 = (int[])ArrayListRed.ToArray(typeof(int)), + array2 = (int[])ArrayListIR.ToArray(typeof(int)), + array3 = (int[])ArrayListGreen.ToArray(typeof(int)) + }); + ArrayListRed.Clear(); + ArrayListIR.Clear(); + ArrayListGreen.Clear(); + } + const int STREAM_PACKET_HRMULTI = 0x03; + enum ePacketPhase + { + eId, + eTimestamp, + eLength, + eData + } + ePacketPhase packetPhase; + enum ePacketBuildStage + { + eAssemble, + eProcess, + eResync + } + ePacketBuildStage packetBuildStage; + StringBuilder packetStringBuild; + int packetDataIndex; + + class Packet + { + public int id; + public int rawId; + public int timestamp; + public int length; + public int crc; + public int[] data; + + internal string ToLogString() + { + StringBuilder sb = new StringBuilder(); + sb.Append(id.ToString("X2") + " "); + sb.Append(timestamp.ToString("X2") + " "); + sb.Append(length.ToString("X2") + " "); + for (int i = 0; i < length; i++) + { + sb.Append(data[i].ToString("X2") + " "); + } + return sb.ToString(); + } + } + + Queue packetFifo = new Queue(256); + Packet currentPacket; + private void ProcessIncomingStream(Queue incomingFifo, Queue packetFifo) + { + while (incomingFifo.Count != 0) + { + if (packetBuildStage == ePacketBuildStage.eAssemble) + { + char ch = (char)incomingFifo.Dequeue(); + if (ch != ' ') + { + packetStringBuild.Append(ch); + } + else + { + //pipeline.LogStream(packetStringBuild.ToString()); + packetBuildStage = ePacketBuildStage.eProcess; + } + } + if (packetBuildStage == ePacketBuildStage.eResync) + { + char ch = (char)incomingFifo.Dequeue(); + if (ch != ' ') + { + packetStringBuild.Append(ch); + } + else + { + //pipeline.LogStream(packetStringBuild.ToString()); + int val; + int.TryParse(packetStringBuild.ToString(), NumberStyles.HexNumber, CultureInfo.CurrentCulture, out val); + if (PartialArrayIntAvailableEventArgs.ValidatePacketID(val) == true) + { + packetBuildStage = ePacketBuildStage.eProcess; + packetPhase = ePacketPhase.eId; + } + else + { + packetStringBuild.Clear(); + } + } + } + + if (packetBuildStage == ePacketBuildStage.eProcess) + { + int val; + int.TryParse(packetStringBuild.ToString(), NumberStyles.HexNumber, CultureInfo.CurrentCulture, out val); + switch (packetPhase) + { + case ePacketPhase.eId: + currentPacket = new Packet(); + currentPacket.id = val & 0xFF; + currentPacket.rawId = val; + if (PartialArrayIntAvailableEventArgs.ValidatePacketID(val) == false) + { + packetBuildStage = ePacketBuildStage.eResync; + packetPhase = ePacketPhase.eId; + packetStringBuild.Clear(); + NumberPacketHeaderErrors++; + break; + } + packetPhase = ePacketPhase.eTimestamp; + //pipeline.LogStream("Packet ID = " + packetStringBuild.ToString()); + break; + case ePacketPhase.eTimestamp: + currentPacket.timestamp = val; + //if (val != 0x11223344) + //{ + // throw new Exception("timestamp invalid"); + //} + packetPhase = ePacketPhase.eLength; + break; + case ePacketPhase.eLength: + currentPacket.length = val & 0xFFFF; + currentPacket.crc = (val >> 16) & 0xFFFF; + packetDataIndex = 0; + currentPacket.data = new int[currentPacket.length]; + packetPhase = ePacketPhase.eData; + break; + case ePacketPhase.eData: + currentPacket.data[packetDataIndex++] = val; + if (packetDataIndex >= currentPacket.length) + { + // validate the packet before enqueing + UInt32 crc = 0; + crc = 0; + crc = Crc32.crc32Value(crc, (UInt32)currentPacket.rawId); + crc = Crc32.crc32Value(crc, (UInt32)currentPacket.timestamp); + crc = Crc32.crc32Value(crc, (UInt32)currentPacket.length); + crc = Crc32.crc32(crc, currentPacket.data, (UInt32)currentPacket.length); + // if crc checks out then put in queue + if ((crc & 0xFFFF) == currentPacket.crc || (currentPacket.crc == 0)) + { + packetFifo.Enqueue(currentPacket); + } + else + { + NumberPacketCrcErrors++; + packetFifo.Enqueue(currentPacket); + } + pipeline.LogStream(currentPacket.ToLogString()); + packetPhase = ePacketPhase.eId; + } + break; + } + packetStringBuild.Clear(); + if (packetBuildStage != ePacketBuildStage.eResync) + packetBuildStage = ePacketBuildStage.eAssemble; + } + } + } + + bool debugFlag = false; + public bool DebugFlag() + { + return debugFlag; + } + + int packetCount; + Queue incomingFifo = new Queue(256); + private void _testWorker_DoWork(object sender, DoWorkEventArgs e) + { + incomingFifo.Clear(); + packetFifo.Clear(); + packetStringBuild = new StringBuilder(); + packetPhase = ePacketPhase.eId; + packetBuildStage = ePacketBuildStage.eAssemble; + //Thread.Sleep(500); + //pipeline.Discard(); + packetCount = 0; + char[] serialData = new char[128]; + NumberPacketCrcErrors = 0; + NumberPacketHeaderErrors = 0; + while (true) + { + if (_testWorker.CancellationPending) + { + return; // end thread + } + // read from pipe + //Thread.Sleep(1000); + + // if this pipe is not connected then shutdown the thread + if (pipeline.IsConnected() == false) + { + return; + } + int charsRead = pipeline.Read(serialData, 0, serialData.Length); + // get out if there are no longer bytes to be read + if (charsRead == 0) + { + _testWorker.ReportProgress(0, new System.Tuple<int, int, int[], int[], int[]>(PartialArrayIntAvailableEventArgs.PACKET_END_OF_STREAM, 0, null, null, null)); + break; + } + for (int i = 0; i < charsRead; i++) + { + incomingFifo.Enqueue(serialData[i]); + } + debugFlag = true; + ProcessIncomingStream(incomingFifo, packetFifo); + if (packetFifo.Count != 0) + { + packetCount++; + for (int i = 0; i < packetFifo.Count; i++) + { + ArrayList arrayList1 = new ArrayList(); + ArrayList arrayList2 = new ArrayList(); + ArrayList arrayList3 = new ArrayList(); + Packet packet = (Packet)packetFifo.Dequeue(); + if ((packet.id & 0xF0) == 0x10) // check for MAX30101 data + { + int index = 0; + int ledNumber = packet.id & 0x0F; + if (ledNumber == 1) + { + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + } + } + if (ledNumber == 2) + { + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + arrayList2.Add(packet.data[index++]); + } + } + if (ledNumber == 3) + { + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + arrayList2.Add(packet.data[index++]); + arrayList3.Add(packet.data[index++]); + } + } + int[] array1 = (int[])arrayList1.ToArray(typeof(int)); + int[] array2 = (int[])arrayList2.ToArray(typeof(int)); + int[] array3 = (int[])arrayList3.ToArray(typeof(int)); + _testWorker.ReportProgress(0, new System.Tuple<int, int, int[], int[], int[]>(packet.id, packet.timestamp, array1, array2, array3)); + } else + if ((packet.id & 0xF0) == 0x20) // check for LIS2DH data + { + int index = 0; + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + arrayList2.Add(packet.data[index++]); + arrayList3.Add(packet.data[index++]); + } + int[] array1 = (int[])arrayList1.ToArray(typeof(int)); + int[] array2 = (int[])arrayList2.ToArray(typeof(int)); + int[] array3 = (int[])arrayList3.ToArray(typeof(int)); + _testWorker.ReportProgress(0, new System.Tuple<int, int, int[], int[], int[]>(packet.id, packet.timestamp, array1, array2, array3)); + } + if ((packet.id & 0xF0) == 0x30) // check for MAX30001 data + { + int index = 0; + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + } + int[] array1 = (int[])arrayList1.ToArray(typeof(int)); + int[] array2 = (int[])arrayList2.ToArray(typeof(int)); + int[] array3 = (int[])arrayList3.ToArray(typeof(int)); + _testWorker.ReportProgress(0, new System.Tuple<int, int, int[], int[], int[]>(packet.id, packet.timestamp, array1, array2, array3)); + } + if ((packet.id & 0xF0) == PartialArrayIntAvailableEventArgs.PACKET_BMP280_PRESSURE || // check for BMP280 data + (packet.id & 0xF0) == PartialArrayIntAvailableEventArgs.PACKET_MAX31725_TEMP1 || // check for BMP280 data + (packet.id & 0xF0) == PartialArrayIntAvailableEventArgs.PACKET_MAX31725_TEMP2 // check for BMP280 data + ) // check for BMP280 data + { + int index = 0; + while (index < packet.length) + { + arrayList1.Add(packet.data[index++]); + } + int[] array1 = (int[])arrayList1.ToArray(typeof(int)); + int[] array2 = (int[])arrayList2.ToArray(typeof(int)); + int[] array3 = (int[])arrayList3.ToArray(typeof(int)); + _testWorker.ReportProgress(0, new System.Tuple<int, int, int[], int[], int[]>(packet.id, packet.timestamp, array1, array2, array3)); + } + } + } + } + } + void _testWorker_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e) + { + } + } +}