ADORe
ADORe is a modular open source software library and toolkit for decision making, planning, control and simulation of automated vehicles
zmqobjectsink.h
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2017-2020 German Aerospace Center (DLR).
3  * Eclipse ADORe, Automated Driving Open Research https://eclipse.org/adore
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License 2.0 which is available at
7  * http://www.eclipse.org/legal/epl-2.0.
8  *
9  * SPDX-License-Identifier: EPL-2.0
10  *
11  * Contributors:
12  * Daniel Heß - initial API and implementation
13  ********************************************************************************/
14 
15 #pragma once
16 #include <zmq.hpp>
17 #include <pthread.h>
18 #include <iostream>
19 #include <string>
20 #include <sstream>
21 #include <list>
22 #include <string>
23 #include <unistd.h>
24 
25 
26 template<typename T>
27 class ZMQObjectSink;
28 
29 template<typename T>
31 {
33  zmq::message_t msg;
34  int count = 0;
35  char* buffer = new char[sizeof(T)];
36  for(;;)
37  {
38  bool flag = s->m_socket->recv(&msg);
39  if(flag)
40  {
41  if(count+msg.size()<sizeof(T))//continue aggregation
42  {
43  //copy message data to buffer
44  memcpy(&buffer[count],msg.data(),msg.size());
45  count += msg.size();
46  }
47  else //insert brake
48  {
49  //finish old buffer and place complete buffer in receive queue
50  int oldrem = sizeof(T)-count;
51  memcpy(&buffer[count],msg.data(),oldrem);
52  s->push_data((T*)(void*)&buffer[0]);
53  //create new buffer and place remaining bytes
54  buffer = new char[sizeof(T)];
55  memcpy(&buffer[0],&(((char*)msg.data())[oldrem]),msg.size()-oldrem);
56  count = msg.size()-oldrem;
57  }
58  }
59  else
60  {
61  usleep(1);
62  }
63  if(s->terminate)
64  {
65  break;
66  }
67  }
68 
69  return 0;
70 }
71 
72 template<typename T>
74 {
75  friend void* pthread_ZMQObjectSink_worker<T>(void* that);
76 private:
77  zmq::socket_t* m_socket;
78  pthread_t m_thread;
79  pthread_mutex_t m_mutex;
80  bool terminate;
81  std::list<T*> buffer;
82  zmq::context_t* m_context;
83  void push_data(T* value)
84  {
85  pthread_mutex_lock(&m_mutex);
86  buffer.push_back(value);
87  pthread_mutex_unlock(&m_mutex);
88  }
89  virtual void initialize(zmq::context_t& context,unsigned int Port)
90  {
91  m_socket = new zmq::socket_t(context, ZMQ_PULL);
92  std::ostringstream address;
93  address<<"tcp://*:"<<Port;
94  m_socket->bind(address.str().c_str());
95  m_mutex = PTHREAD_MUTEX_INITIALIZER;
96  terminate = false;
97  // Has to be int according to ZMQ docs
98  typedef int ZmqRcvBufSizeType;
99  const ZmqRcvBufSizeType msgsize = sizeof(T);
100  //m_socket->setsockopt(ZMQ_RCVBUF,&msgsize,sizeof(ZmqRcvBufSizeType));//hess_da, 01.08.2018: Some version of zmq seems to be incompatible with this command
101  pthread_create(&m_thread,NULL,pthread_ZMQObjectSink_worker<T>,this);
102  }
103 public:
104  ZMQObjectSink(zmq::context_t& context,unsigned int Port)
105  {
106  m_context = &context;
107  initialize(context,Port);
108  }
109  ZMQObjectSink(unsigned int Port)
110  {
111  m_context = new zmq::context_t(1);
112  initialize(*m_context,Port);
113  }
114  virtual ~ZMQObjectSink()
115  {
116  terminate = true;
117  pthread_join(m_thread,NULL);
118  delete m_socket;
119  }
120  bool has_data()
121  {
122  pthread_mutex_lock(&m_mutex);
123  bool value = buffer.size()>0;
124  pthread_mutex_unlock(&m_mutex);
125  return value;
126  }
127  T* pop_data()
128  {
129  pthread_mutex_lock(&m_mutex);
130  T* value = buffer.front();
131  buffer.pop_front();
132  pthread_mutex_unlock(&m_mutex);
133  return value;
134  }
135 };
Definition: zmqobjectsink.h:74
zmq::socket_t * m_socket
Definition: zmqobjectsink.h:77
T * pop_data()
Definition: zmqobjectsink.h:127
pthread_mutex_t m_mutex
Definition: zmqobjectsink.h:79
bool has_data()
Definition: zmqobjectsink.h:120
virtual void initialize(zmq::context_t &context, unsigned int Port)
Definition: zmqobjectsink.h:89
std::list< T * > buffer
Definition: zmqobjectsink.h:81
ZMQObjectSink(zmq::context_t &context, unsigned int Port)
Definition: zmqobjectsink.h:104
bool terminate
Definition: zmqobjectsink.h:80
pthread_t m_thread
Definition: zmqobjectsink.h:78
ZMQObjectSink(unsigned int Port)
Definition: zmqobjectsink.h:109
zmq::context_t * m_context
Definition: zmqobjectsink.h:82
void push_data(T *value)
Definition: zmqobjectsink.h:83
virtual ~ZMQObjectSink()
Definition: zmqobjectsink.h:114
void * pthread_ZMQObjectSink_worker(void *that)
Definition: zmqobjectsink.h:30