In the Post Process tab, click Edit.
Then select the External Application option.
Then specify the IP address of the target machine (either server or edge device) that contains Post processing Python code:
There will be two modes:
- Binary mode: The data communication between the target machine and the ANSVIS will be in binary format. Here is the protocol.
- Text mode: The format of the data is human readable JSON format.
The binary mode will be faster and recommended for real-time applications.
Select either Text Mode or Binary Mode, specify the Port Number (the default is 10016), and hit Test Connection to test the communication connection.
Text Mode
Select Text Mode, specify the Port Number, and then click on the Test Connection button.
In the Test Connection Dialog, user can select if he/she wants to send image(s). Sample image(s) can be added by selecting image(s) in the specified location.
The data sent to the Post-Processing Python application is in the JSON format and will be in the following form.
{"DetectionDataPerRule":[{"DetectionData":[{"Name":"Motor Bike Detection","ModelClassID":{"ModelID":646,"Class":2},"DeploymentGroupID":132,"CameraID":667,"TrackID":833,"Detections":[{"Time":"2023-05-11T00:16:34.466Z","BoundingBox":{"Left":323,"Top":247,"Right":374,"Bottom":402},"Score":0.630180273931362,"Image":85,"ModelClassID":{"ModelID":646,"Class":2}},{"Time":"2023-05-11T00:16:34.466Z","BoundingBox":{"Left":331,"Top":200,"Right":347,"Bottom":297},"Score":0.744244955742642,"Image":85,"ModelClassID":{"ModelID":646,"Class":2}},{"Time":"2023-05-11T00:16:34.466Z","BoundingBox":{"Left":126,"Top":172,"Right":234,"Bottom":244},"Score":0.820398603955675,"Image":85,"ModelClassID":{"ModelID":646,"Class":2}}],"Result":"1.9s","ExtraVisionResult":""}]}],"Images":[{"ImageID":85,"Image":"/9j/4AAQSk"}]}\r\n
In this example, we demonstrate that the Post-Processing Python code will get the data from ANSVIS and modify the BoundingBox information, then send back to ANSVIS.
Here is the code:
import socket
import json
class Buffer:
def __init__(self,sock):
self.sock = sock
self.buffer = b''
def get_line(self):
while b'\r\n' not in self.buffer:
data = self.sock.recv(1024)
if not data: # socket closed
return None
self.buffer += data
line,sep,self.buffer = self.buffer.partition(b'\r\n')
return line.decode()
# Server information
server_address = '192.168.1.3'
server_port = 10016
# Create a TCP/IP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Bind the socket to a specific address and port
client_socket.bind((server_address, server_port))
# Listen for incoming connections
client_socket.listen(1)
print('Waiting for a server connection...')
# Accept a client connection
connection, address = client_socket.accept()
print('Connected to server at {}:{}'.format(address[0], address[1]))
# Receive data from the server
b = Buffer(connection)
while True:
line = b.get_line()
if line is None:
break
data = json.loads(line)
print('Received data from server:', data)
# Access the first DetectionData item and its BoundingBox dictionary
detection_data_per_rule = data['DetectionDataPerRule'][0]
detection_data = detection_data_per_rule['DetectionData'][0]
bounding_box = detection_data['Detections'][0]['BoundingBox']
# Change the values of Left, Top, Right, and Bottom
bounding_box['Left'] = 200
bounding_box['Top'] = 150
bounding_box['Right'] = 300
bounding_box['Bottom'] = 250
# Convert the modified data back to JSON
modified_json_data = json.dumps(data)
response = modified_json_data +'\r\n' # Example: just send back exactly the server sent
# Send the response back to the server
connection.sendall(response.encode('utf-8'))
print('Sent response to server:', response)
except ConnectionRefusedError:
print('Connection to the server was refused.')
finally:
# Close the client socket
client_socket.close()
Binary Mode
To operate in the Binary Mode, please select the Binary Mode, then click Test Connection.
In the Test Connection dialog, the user can test sending an image and follow the protocol shown at the bottom of the dialog.
Assuming that we want to create a Python code that modifies the bounding box information.
The Python code would be:
import socket
import struct
import cv2
import json
def convert_data_to_ints(data, big_endian=True):
int_count = len(data) // 4 # Assuming uint is 4 bytes long !!!
fmt = ">" if big_endian else "<"
fmt += "I" * int_count
return struct.unpack(fmt, data[:int_count * 4])
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + int(b)
return result
def int_to_bytes(value, length):
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
result.reverse()
return result
# Global variables
totalSize =0
jsonTextSize=0;
jsonData =b''
jsonTextData=''
modifiedJsonTextData =''
modifiedJsonData=b''
modifiedJsonDataLenght =0
noImages =0
imageSize=0
imageId =0
imageData =b''
# Set up a TCP/IP server
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to server address and port 10016
server_address = ("192.168.1.3", 10016)
tcp_socket.bind(server_address)
tcp_socket.listen(1)
while True:
print("Waiting for connection")
connection, client = tcp_socket.accept()
try:
print("Connected to client IP: {}".format(client))
while True:
data = connection.recv(4) # Read first 4 bytes (Total Size)
if data != b'':
totalSize = convert_data_to_ints(data)[0]
print("Raw total size: {}\n".format(data))
if totalSize>14:
print("Total Size: {}".format(totalSize))
data = connection.recv(4) # Read next 4 bytes (JSONText Size)
print("Raw JSON text size: {}\n".format(data))
jsonTextSize = convert_data_to_ints(data)[0]
print("JSON TextData size: {}".format(jsonTextSize))
jsonData = connection.recv(jsonTextSize) # Read JSON content
print("Raw JSON data: {}\n".format(jsonData))
jsonTextData = json.loads(jsonData)
print("JSON Text data: {}".format(jsonTextData))
# Modify results
# Access the first DetectionData item and its BoundingBox dictionary
detection_data = jsonTextData[0]['DetectionData'][0]
bounding_box = detection_data['Detections'][0]['BoundingBox']
# Change the values of Left, Top, Right, and Bottom
bounding_box['Left'] = 200
bounding_box['Top'] = 150
bounding_box['Right'] = 300
bounding_box['Bottom'] = 250
# Convert the modified data back to JSON
modifiedJsonTextData = json.dumps(jsonTextData) +'\r\n'
print('Modified data:')
print(modifiedJsonTextData)
# Convert JSON data to bytes
modifiedJsonData = modifiedJsonTextData.encode('utf-8')
# Get the size of JSON data in bytes
modifiedJsonDataLenght = len(modifiedJsonData)
print('Modified JSON Data Length:')
print(modifiedJsonDataLenght)
data = connection.recv(1) # Read next 1 byte for number of Images (Image Size)
if data != b'':
noImages = bytes_to_int(data)
print("Image Size: {}".format(noImages))
else:
noImages=0
if noImages>=1:
# Should use for loop if the noImages >0
data = connection.recv(4) # Read next 4 bytes (Image Size)
imageSize = convert_data_to_ints(data)[0]
print("Image Size: {}".format(imageSize))
data = connection.recv(4) # Read next 4 bytes (Image Id)
imageId = convert_data_to_ints(data)[0]
print("Image Size: {}".format(imageId))
imageData = connection.recv(imageSize-4) # Read Image JPG
f = open('data.jpg', 'w+b')
f.write(imageData) # Write to HDD
f.close()
# Send back results to ANSVIS
# For more information, please see protocol
# We now send back (assume that we do not want to send images back)
# 4 bytes: Total Size
# 1 byte: trigger signal. If trigger =1 that means we want to override trigger signal in ANSVIS
# jsonText_bytes: For modified Json in binary
# 1 bytes: Number of images
print('Sending modified results back to ANSVIS...')
totalSize = 4 + 1+modifiedJsonDataLenght +1
triggerOut =0
total_bytes = struct.pack('!I', totalSize)
triggerout_byte = bytes([triggerOut])
jsonText_bytes = struct.pack('!I', modifiedJsonDataLenght)
value_byte = bytes([noImages])
response = total_bytes+triggerout_byte+jsonText_bytes+modifiedJsonData+value_byte
connection.sendall(response)
else: # no image or result detected
# We want to send data back to ANSVIS
# For more information, please see protocol
# We now send back
# 4 bytes: Total Size
# 1 byte: trigger signal. If trigger =1 that means we want to override trigger signal in ANSVIS
# jsonText_bytes: For modified Json in binary
# 1 bytes: Number of images
print('Sending modified results back to ANSVIS...')
totalSize = 4 + 1+modifiedJsonDataLenght +1
triggerOut =0
total_bytes = struct.pack('!I', totalSize)
triggerout_byte = bytes([triggerOut])
jsonText_bytes = struct.pack('!I', modifiedJsonDataLenght)
value_byte = bytes([noImages])
response = total_bytes+triggerout_byte+jsonText_bytes+modifiedJsonData+value_byte
connection.sendall(response)
break
else:
data = connection.recv(totalSize)
if not totalSize:
break
else:
break
finally:
connection.close()
Click Add to add the Post-Processing
Run the system and see the Python Post Processing works