diff --git a/CMakeLists.txt b/CMakeLists.txt
index 641df7f..9d6c8a5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -3,8 +3,9 @@ cmake_minimum_required (VERSION 2.8)
Project(library_curation NONE)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "${CMAKE_HOME_DIRECTORY}/dist")
+set(EMPTY " ")
if (NOT DEFINED REGISTRY)
- set(REGISTRY "")
+ set(REGISTRY "${EMPTY}")
elseif (NOT ${REGISTRY} MATCHES "/$")
set(REGISTRY "${REGISTRY}/")
endif()
@@ -18,7 +19,28 @@ if (NOT DEFINED INGESTION)
set(INGESTION "object,face")
endif()
-if(NOT REGISTRY)
+if (NOT DEFINED IN_SOURCE)
+ set(IN_SOURCE "videos") # Use videos
+ set(STREAM_URL "${EMPTY}")
+endif()
+
+if (NOT DEFINED NCPU)
+ set(NCPU 0)
+endif()
+
+if (NOT IN_SOURCE MATCHES "stream")
+ set(STREAM_URL "${EMPTY}") # Use videos
+endif()
+
+if (NOT DEFINED STREAM_URL)
+ set(STREAM_URL "${EMPTY}") # Use videos
+ if (IN_SOURCE MATCHES "stream")
+ set(IN_SOURCE "videos")
+ endif()
+endif()
+
+
+if(REGISTRY MATCHES "${EMPTY}")
add_custom_target(update ${CMAKE_HOME_DIRECTORY}/script/update-image.sh)
endif()
add_custom_target(dist ${CMAKE_HOME_DIRECTORY}/script/mk-dist.sh)
@@ -27,7 +49,11 @@ file(GLOB dirs "deployment" "*")
list(REMOVE_DUPLICATES dirs)
foreach (dir ${dirs})
if(EXISTS ${dir}/CMakeLists.txt)
- add_subdirectory(${dir})
+ if(${dir} MATCHES "stream" AND IN_SOURCE MATCHES "stream")
+ add_subdirectory(${dir})
+ elseif(NOT ${dir} MATCHES "stream")
+ add_subdirectory(${dir})
+ endif()
endif()
endforeach()
@@ -35,5 +61,6 @@ endforeach()
execute_process(COMMAND printf "\n\nThis script will build third party components licensed under various open source licenses into your container images. The terms under which those components may be used and distributed can be found with the license document that is provided with those components. Please familiarize yourself with those terms to ensure your distribution of those components complies with the terms of those licenses.\n\n")
execute_process(COMMAND printf "PLATFORM=${PLATFORM}, NCURATIONS=${NCURATIONS}, INGESTION=${INGESTION}\n")
+execute_process(COMMAND printf "IN_SOURCE=${IN_SOURCE}, STREAM_URL=${STREAM_URL}, NCPU=${NCPU}\n")
execute_process(COMMAND printf "REGISTRY=${REGISTRY}\n\n")
diff --git a/README.md b/README.md
index d0cd3b2..895a782 100644
--- a/README.md
+++ b/README.md
@@ -1,101 +1,113 @@
-
-This sample implements libraries of video files content analysis, database ingestion, content search and visualization:
-- **Ingest**: Analyze video content and ingest the data into the VDMS.
-- **VDMS**: Store metadata efficiently in a graph-based database.
-- **Visualization**: Visualize content search based on video metadata.
+This sample implements libraries of video files content analysis, database ingestion, content search and visualization:
+- **Ingest**: Analyze video content and ingest the data into the VDMS.
+- **VDMS**: Store metadata efficiently in a graph-based database.
+- **Visualization**: Visualize content search based on video metadata.
-**This is a concept sample in active development.**
+**This is a concept sample in active development.**
+
+
+## Software Stacks
-### Software Stacks
+The sample is powered by the following Open Visual Cloud software stacks:
+- **Media Analytics**:
+ - [The GStreamer-based media analytics stack](https://github.com/OpenVisualCloud/Dockerfiles/tree/v21.3/Xeon/ubuntu-18.04/analytics/gst) is used for object, face and emotion detection. The software stack is optimized for [Intel® Xeon® Scalable Processors](https://github.com/OpenVisualCloud/Dockerfiles/tree/v21.3/Xeon/ubuntu-18.04/analytics/gst).
-The sample is powered by the following Open Visual Cloud software stacks:
-- **Media Analytics**:
- - [The GStreamer-based media analytics stack](https://github.com/OpenVisualCloud/Dockerfiles/tree/master/Xeon/ubuntu-16.04/analytics/gst) is used for object, face and emotion detection. The software stack is optimized for [Intel® Xeon® Scalable Processors](https://github.com/OpenVisualCloud/Dockerfiles/tree/master/Xeon/ubuntu-16.04/analytics/gst).
-
- **NGINX Web Service**:
- - [The NGINX/FFmpeg-based web serving stack](https://github.com/OpenVisualCloud/Dockerfiles/tree/master/Xeon/centos-7.6/media/nginx) is used to store and segment video content and serve web services. The software stack is optimized for [Intel Xeon Scalable Processors](https://github.com/OpenVisualCloud/Dockerfiles/tree/master/Xeon/centos-7.6/media/nginx).
+ - [The NGINX/FFmpeg-based web serving stack](https://github.com/OpenVisualCloud/Dockerfiles/tree/v23.1/Xeon/ubuntu-20.04/media/nginx) is used to store and segment video content and serve web services. The software stack is optimized for [Intel Xeon Scalable Processors](https://github.com/OpenVisualCloud/Dockerfiles/tree/v23.1/Xeon/ubuntu-20.04/media/nginx).
+
-### Install Prerequisites:
+### License Obligations
-- **Time Zone**: Check that the timezone setting of your host machine is correctly configured. Timezone is used during build. If you plan to run the sample on a cluster of machines managed by Docker Swarm or Kubernetes, please make sure to synchronize time among the manager/master node and worker nodes.
+- FFmpeg is an open source project licensed under LGPL and GPL. See https://www.ffmpeg.org/legal.html. You are solely responsible for determining if your use of FFmpeg requires any additional licenses. Intel is not responsible for obtaining any such licenses, nor liable for any licensing fees due, in connection with your use of FFmpeg.
+
-- **Build Tools**: Install ```cmake``` and ```m4``` if they are not available on your system.
+- GStreamer is an open source framework licensed under LGPL. See https://gstreamer.freedesktop.org/documentation/frequently-asked-questions/licensing.html?gi-language=c. You are solely responsible for determining if your use of Gstreamer requires any additional licenses. Intel is not responsible for obtaining any such licenses, nor liable for any licensing fees due, in connection with your use of Gstreamer.
+
-- **Docker Engine**:
- - Install [docker engine](https://docs.docker.com/install).
- - Install [docker compose](https://docs.docker.com/compose/install), if you plan to deploy through docker compose. Version 1.20+ is required.
- - Setup [docker swarm](https://docs.docker.com/engine/swarm), if you plan to deploy through docker swarm. See [Docker Swarm Setup](deployment/docker-swarm/README.md) for additional setup details.
- - Setup [Kubernetes](https://kubernetes.io/docs/setup), if you plan to deploy through Kubernetes. See [Kubernetes Setup](deployment/kubernetes/README.md) for additional setup details.
- - Setup docker proxy as follows if you are behind a firewall:
-```bash
-sudo mkdir -p /etc/systemd/system/docker.service.d
-printf "[Service]\nEnvironment=\"HTTPS_PROXY=$https_proxy\" \"NO_PROXY=$no_proxy\"\n" | sudo tee /etc/systemd/system/docker.service.d/proxy.conf
-sudo systemctl daemon-reload
-sudo systemctl restart docker
-```
+## Install Prerequisites:
+
+- **Time Zone**: Check that the timezone setting of your host machine is correctly configured. Timezone is used during build. If you plan to run the sample on a cluster of machines managed by Docker Swarm or Kubernetes, please make sure to synchronize time among the manager/master node and worker nodes.
+
+- **Build Tools**: Install ```cmake``` and ```m4``` if they are not available on your system.
+
+- **Docker Engine**:
+ - Install [docker engine](https://docs.docker.com/install).
+ - Install [docker compose](https://docs.docker.com/compose/install), if you plan to deploy through docker compose. Version 1.20+ is required.
+ - Setup [docker swarm](https://docs.docker.com/engine/swarm), if you plan to deploy through docker swarm. See [Docker Swarm Setup](deployment/docker-swarm/README.md) for additional setup details.
+ - Setup [Kubernetes](https://kubernetes.io/docs/setup), if you plan to deploy through Kubernetes. See [Kubernetes Setup](deployment/kubernetes/README.md) for additional setup details.
+ - Setup docker proxy as follows if you are behind a firewall:
+ ```bash
+ sudo mkdir -p /etc/systemd/system/docker.service.d
+ printf "[Service]\nEnvironment=\"HTTPS_PROXY=$https_proxy\" \"NO_PROXY=$no_proxy\"\n" | sudo tee /etc/systemd/system/docker.service.d/proxy.conf
+ sudo systemctl daemon-reload
+ sudo systemctl restart docker
+ ```
+
+
-### Build Sample:
+## Build Streaming Sample:
```bash
-mkdir build
-cd build
-cmake ..
-make
+mkdir build
+cd build
+cmake -DSTREAM_URL="udp://localhost:8088" -DIN_SOURCE=stream ..
+make
```
-See also: [Customize Build Process](doc/cmake.md).
+See also [Customize Build Process](doc/cmake.md) for additional options.
+
-### Start/stop Sample:
+## Start/stop Sample:
-Use the following commands to start/stop services via docker-compose:
+Use the following commands to start/stop services via docker-compose:
```bash
-make start_docker_compose
-make stop_docker_compose
+make start_docker_compose
+make stop_docker_compose
```
-Use the following commands to start/stop services via docker swarm:
+Use the following commands to start/stop services via docker swarm:
```bash
make update # optional for private registry
-make start_docker_swarm
-make stop_docker_swarm
+make start_docker_swarm
+make stop_docker_swarm
```
-
-See also: [Docker Swarm Setup](deployment/docker-swarm/README.md).
+See also: [Docker Swarm Setup](deployment/docker-swarm/README.md).
+
Use the following commands to start/stop Kubernetes services:
-```
+```bash
make update # optional for private registry
make start_kubernetes
make stop_kubernetes
```
-See also: [Kubernetes Setup](deployment/kubernetes/README.md).
+See also: [Kubernetes Setup](deployment/kubernetes/README.md).
+
-### Launch Sample UI:
+## Launch Sample UI:
-Launch your browser and browse to ```https://```. The sample UI is similar to the following:
+Launch your browser and browse to ```https://:30007```. The sample UI is similar to the following:
----
-
* For Kubernetes/Docker Swarm, `````` is the hostname of the manager/master node.
-* If you see a browser warning of self-signed certificate, please accept it to proceed to the sample UI.
-
+* If you see a browser warning of self-signed certificate, please accept it to proceed to the sample UI.
+
---
-### See Also
+## See Also
-- [Configuration Options](doc/cmake.md)
-- [Docker Swarm Setup](deployment/docker-swarm/README.md)
+- [Configuration Options](doc/cmake.md)
+- [Docker Swarm Setup](deployment/docker-swarm/README.md)
- [Kubernetes Setup](deployment/kubernetes/README.md)
-- [Sample Distribution](doc/dist.md)
-- [Visual Data Management System](https://github.com/intellabs/vdms)
+- [Sample Distribution](doc/dist.md)
+- [Visual Data Management System](https://github.com/intellabs/vdms)
+
diff --git a/deployment/certificate/CMakeLists.txt b/deployment/certificate/CMakeLists.txt
index eccae39..87326c2 100644
--- a/deployment/certificate/CMakeLists.txt
+++ b/deployment/certificate/CMakeLists.txt
@@ -1,2 +1,3 @@
set(service "lcc_certificate")
include("${CMAKE_SOURCE_DIR}/script/service.cmake")
+add_custom_target(sign_certificate ${CMAKE_CURRENT_SOURCE_DIR}/self-sign.sh)
diff --git a/deployment/certificate/Dockerfile b/deployment/certificate/Dockerfile
index 06dfa57..a20c501 100644
--- a/deployment/certificate/Dockerfile
+++ b/deployment/certificate/Dockerfile
@@ -1,6 +1,7 @@
-FROM centos:7.6.1810
-RUN yum install -y -q openssl && rm -rf /var/cache/yum/*
+FROM alpine:3.16
+RUN apk add --no-cache --upgrade openssl bash && rm -rf /var/cache/apk/*
+
####
ARG USER
@@ -8,8 +9,8 @@ ARG GROUP
ARG UID
ARG GID
## must use ; here to ignore user exist status code
-RUN [ ${GID} -gt 0 ] && groupadd -f -g ${GID} ${GROUP}; \
- [ ${UID} -gt 0 ] && useradd -d /home/${USER} -g ${GID} -K UID_MAX=${UID} -K UID_MIN=${UID} ${USER}; \
+RUN if [ ${GID} -gt 0 ]; then addgroup -g ${GID} ${GROUP}; fi; \
+ if [ ${UID} -gt 0 ]; then adduser -h /home/${USER} -G ${GROUP} -u ${UID} ${USER}; fi; \
echo
USER ${UID}
####
diff --git a/deployment/certificate/self-sign.sh b/deployment/certificate/self-sign.sh
index 954eff0..799718d 100755
--- a/deployment/certificate/self-sign.sh
+++ b/deployment/certificate/self-sign.sh
@@ -2,11 +2,10 @@
IMAGE="lcc_certificate"
DIR=$(dirname $(readlink -f "$0"))
-USER="docker"
case "$(cat /proc/1/sched | head -n 1)" in
*self-sign*)
- openssl req -x509 -nodes -days 30 -newkey rsa:4096 -keyout /home/$USER/self.key -out /home/$USER/self.crt << EOL
+ openssl req -x509 -nodes -days 30 -newkey rsa:4096 -keyout /home/self.key -out /home/self.crt << EOL
US
OR
Portland
@@ -16,11 +15,11 @@ Intel Corporation
$1
nobody@intel.com
EOL
- chmod 640 "/home/$USER/self.key"
- chmod 644 "/home/$USER/self.crt"
+ chmod 640 "/home/self.key"
+ chmod 644 "/home/self.crt"
;;
*)
- OPTIONS=("--volume=$DIR:/home/$USER:rw")
- . "$DIR/../../script/shell.sh" /home/$USER/self-sign.sh "$(hostname -f)"
+ OPTIONS=("--volume=$DIR:/home:rw")
+ . "$DIR/../../script/shell.sh" /home/self-sign.sh "$(hostname -f)"
;;
esac
diff --git a/deployment/docker-swarm/.gitignore b/deployment/docker-swarm/.gitignore
index 1120be9..4ca9e5f 100644
--- a/deployment/docker-swarm/.gitignore
+++ b/deployment/docker-swarm/.gitignore
@@ -1 +1,2 @@
docker-compose.yml
+docker-compose_stream.yml
diff --git a/deployment/docker-swarm/CMakeLists.txt b/deployment/docker-swarm/CMakeLists.txt
index 9507718..2fc2432 100644
--- a/deployment/docker-swarm/CMakeLists.txt
+++ b/deployment/docker-swarm/CMakeLists.txt
@@ -7,3 +7,5 @@ if (NOT PLATFORM STREQUAL "VCAC-A")
include("${CMAKE_SOURCE_DIR}/script/service.cmake")
include("${CMAKE_SOURCE_DIR}/script/deployment.cmake")
endif()
+
+add_dependencies(start_${service} sign_certificate)
\ No newline at end of file
diff --git a/deployment/docker-swarm/build.sh b/deployment/docker-swarm/build.sh
index bea9fe6..6dfcbdd 100755
--- a/deployment/docker-swarm/build.sh
+++ b/deployment/docker-swarm/build.sh
@@ -1,11 +1,23 @@
#!/bin/bash -e
DIR=$(dirname $(readlink -f "$0"))
-NCURATIONS=${2:-1}
+PLATFORM="${1:-Xeon}"
+NCURATIONS="$2"
INGESTION="$3"
-REGISTRY="$4"
+IN_SOURCE="$4"
+STREAM_URL="$5"
+NCPU="$6"
+REGISTRY="$7"
-if test -f "${DIR}/docker-compose.yml.m4"; then
- echo "Generating docker-compose.yml"
- m4 -DREGISTRY_PREFIX=$REGISTRY -DINGESTION="$INGESTION" -DNCURATIONS=${NCURATIONS} -I "${DIR}" "${DIR}/docker-compose.yml.m4" > "${DIR}/docker-compose.yml"
+echo "Generating templates with PLATFORM=${PLATFORM},NCURATIONS=${NCURATIONS},INGESTION=${INGESTION},IN_SOURCE=${IN_SOURCE},STREAM_URL=${STREAM_URL},NCPU=${NCPU},HOSTIP=${HOSTIP}"
+if [[ $IN_SOURCE == *"videos"* ]]; then
+ if test -f "${DIR}/docker-compose.yml.m4"; then
+ echo "Generating docker-compose.yml"
+ m4 -DREGISTRY_PREFIX=$REGISTRY -DINGESTION="$INGESTION" -DNCURATIONS="${NCURATIONS}" -DIN_SOURCE="${IN_SOURCE}" -DSTREAM_URL="${STREAM_URL}" -DNCPU="${NCPU}" -I "${DIR}" "${DIR}/docker-compose.yml.m4" > "${DIR}/docker-compose.yml"
+ fi
+else
+ if test -f "${DIR}/docker-compose_stream.yml.m4"; then
+ echo "Generating docker-compose.yml"
+ m4 -DREGISTRY_PREFIX=$REGISTRY -DINGESTION="$INGESTION" -DNCURATIONS="${NCURATIONS}" -DIN_SOURCE="${IN_SOURCE}" -DSTREAM_URL="${STREAM_URL}" -DNCPU="${NCPU}" -I "${DIR}" "${DIR}/docker-compose_stream.yml.m4" > "${DIR}/docker-compose.yml"
+ fi
fi
diff --git a/deployment/docker-swarm/docker-compose_stream.yml.m4 b/deployment/docker-swarm/docker-compose_stream.yml.m4
new file mode 100644
index 0000000..2cff75c
--- /dev/null
+++ b/deployment/docker-swarm/docker-compose_stream.yml.m4
@@ -0,0 +1,18 @@
+
+version: "3.7"
+
+services:
+
+include(zookeeper.m4)
+include(kafka.m4)
+include(frontend.m4)
+include(vdms.m4)
+include(stream.m4)
+include(video_stream.m4)
+include(ingest.m4)
+include(secret.m4)
+include(network.m4)
+
+
+volumes:
+ stream-content:
diff --git a/deployment/docker-swarm/frontend.m4 b/deployment/docker-swarm/frontend.m4
index fd28f69..279bc2f 100644
--- a/deployment/docker-swarm/frontend.m4
+++ b/deployment/docker-swarm/frontend.m4
@@ -1,9 +1,9 @@
frontend-service:
- image: defn(`REGISTRY_PREFIX')lcc_frontend:latest
+ image: defn(`REGISTRY_PREFIX')lcc_frontend:stream
ports:
- target: 8443
- published: 443
+ published: 30007
protocol: tcp
mode: host
environment:
@@ -11,19 +11,19 @@
VDHOST: "http://video-service:8080"
no_proxy: "video-service,${no_proxy}"
NO_PROXY: "video-service,${NO_PROXY}"
- volumes:
- - /etc/localtime:/etc/localtime:ro
secrets:
- source: self_crt
- target: self.crt
+ target: /var/run/secrets/self.crt
uid: ${USER_ID}
gid: ${GROUP_ID}
mode: 0444
- source: self_key
- target: self.key
+ target: /var/run/secrets/self.key
uid: ${USER_ID}
gid: ${GROUP_ID}
mode: 0440
+ volumes:
+ - /etc/localtime:/etc/localtime:ro
networks:
- appnet
restart: always
diff --git a/deployment/docker-swarm/ingest.m4 b/deployment/docker-swarm/ingest.m4
index 98e4254..194bff5 100644
--- a/deployment/docker-swarm/ingest.m4
+++ b/deployment/docker-swarm/ingest.m4
@@ -1,11 +1,17 @@
ingest:
- image: defn(`REGISTRY_PREFIX')lcc_ingest:latest
+ image: defn(`REGISTRY_PREFIX')lcc_ingest:stream
environment:
KKHOST: "kafka-service:9092"
VDHOST: "http://video-service:8080"
DBHOST: "vdms-service"
ZKHOST: "zookeeper-service:2181"
+ `IN_SOURCE': "defn(`IN_SOURCE')"
+ `NCPU': "defn(`NCPU')"
+ http_proxy: "${http_proxy}"
+ HTTP_PROXY: "${HTTP_PROXY}"
+ https_proxy: "${https_proxy}"
+ HTTPS_PROXY: "${HTTPS_PROXY}"
no_proxy: "video-service,${no_proxy}"
NO_PROXY: "video-service,${NO_PROXY}"
volumes:
diff --git a/deployment/docker-swarm/start.sh b/deployment/docker-swarm/start.sh
index 9ac0ef2..00b9e28 100755
--- a/deployment/docker-swarm/start.sh
+++ b/deployment/docker-swarm/start.sh
@@ -3,8 +3,6 @@
DIR=$(dirname $(readlink -f "$0"))
yml="$DIR/docker-compose.yml"
-export USER_ID="$(id -u)"
-export GROUP_ID="$(id -g)"
case "$1" in
docker_compose)
dcv="$(docker-compose --version | cut -f3 -d' ' | cut -f1 -d',')"
@@ -22,13 +20,11 @@ docker_compose)
docker volume prune -f; echo
docker network prune -f; echo
- "$DIR/../certificate/self-sign.sh"
shift
. "$DIR/build.sh"
docker-compose -f "$yml" -p lcc --compatibility up
;;
*)
- "$DIR/../certificate/self-sign.sh"
shift
. "$DIR/build.sh"
docker stack deploy -c "$yml" lcc
diff --git a/deployment/docker-swarm/stop.sh b/deployment/docker-swarm/stop.sh
index 84ec4f9..07c4d9e 100755
--- a/deployment/docker-swarm/stop.sh
+++ b/deployment/docker-swarm/stop.sh
@@ -24,3 +24,7 @@ docker_compose)
done
;;
esac
+
+docker container prune -f; echo
+docker volume prune -f; echo
+docker network prune -f; echo
diff --git a/deployment/docker-swarm/stream.m4 b/deployment/docker-swarm/stream.m4
new file mode 100644
index 0000000..5e2b96e
--- /dev/null
+++ b/deployment/docker-swarm/stream.m4
@@ -0,0 +1,26 @@
+
+ stream-service:
+ image: defn(`REGISTRY_PREFIX')lcc_stream:stream
+ ports:
+ - target: 8088
+ published: 30009
+ protocol: udp
+ mode: host
+ environment:
+ KKHOST: "kafka-service:9092"
+ VDHOST: "http://video-service:8080"
+ DBHOST: "vdms-service"
+ ZKHOST: "zookeeper-service:2181"
+ `STREAM_URL': "defn(`STREAM_URL')"
+ http_proxy: "${http_proxy}"
+ HTTP_PROXY: "${HTTP_PROXY}"
+ https_proxy: "${https_proxy}"
+ HTTPS_PROXY: "${HTTPS_PROXY}"
+ no_proxy: "video-service,${no_proxy}"
+ NO_PROXY: "video-service,${NO_PROXY}"
+ volumes:
+ - /etc/localtime:/etc/localtime:ro
+ - stream-content:/var/www/mp4
+ networks:
+ - appnet
+ restart: always
diff --git a/deployment/docker-swarm/vdms.m4 b/deployment/docker-swarm/vdms.m4
index c6a2e1d..6c5983c 100644
--- a/deployment/docker-swarm/vdms.m4
+++ b/deployment/docker-swarm/vdms.m4
@@ -1,7 +1,12 @@
vdms-service:
- image: intellabs/vdms:base
- command: ["/bin/sh","-c","cd /vdms;vdms"]
+ image: intellabs/vdms:latest
+ command: ["/bin/sh","-c","cd /vdms/build;./vdms"]
+ ports:
+ - target: 55555
+ published: 55555
+ protocol: tcp
+ mode: host
volumes:
- /etc/localtime:/etc/localtime:ro
networks:
diff --git a/deployment/docker-swarm/video.m4 b/deployment/docker-swarm/video.m4
index b31904f..12b983c 100644
--- a/deployment/docker-swarm/video.m4
+++ b/deployment/docker-swarm/video.m4
@@ -1,11 +1,19 @@
video-service:
- image: defn(`REGISTRY_PREFIX')lcc_video:latest
+ image: defn(`REGISTRY_PREFIX')lcc_video:stream
environment:
RETENTION_MINS: "60"
CLEANUP_INTERVAL: "10m"
KKHOST: "kafka-service:9092"
+ ZKHOST: "zookeeper-service:2181"
`INGESTION': "defn(`INGESTION')"
+ `IN_SOURCE': "defn(`IN_SOURCE')"
+ http_proxy: "${http_proxy}"
+ HTTP_PROXY: "${HTTP_PROXY}"
+ https_proxy: "${https_proxy}"
+ HTTPS_PROXY: "${HTTPS_PROXY}"
+ no_proxy: "${no_proxy}"
+ NO_PROXY: "${NO_PROXY}"
volumes:
- /etc/localtime:/etc/localtime:ro
networks:
diff --git a/deployment/docker-swarm/video_stream.m4 b/deployment/docker-swarm/video_stream.m4
new file mode 100644
index 0000000..11ad2e4
--- /dev/null
+++ b/deployment/docker-swarm/video_stream.m4
@@ -0,0 +1,23 @@
+
+ video-service:
+ image: defn(`REGISTRY_PREFIX')lcc_video:stream
+ environment:
+ RETENTION_MINS: "60"
+ CLEANUP_INTERVAL: "10m"
+ KKHOST: "kafka-service:9092"
+ SHOST: "http://stream-service:8080"
+ ZKHOST: "zookeeper-service:2181"
+ `INGESTION': "defn(`INGESTION')"
+ `IN_SOURCE': "defn(`IN_SOURCE')"
+ http_proxy: "${http_proxy}"
+ HTTP_PROXY: "${HTTP_PROXY}"
+ https_proxy: "${https_proxy}"
+ HTTPS_PROXY: "${HTTPS_PROXY}"
+ no_proxy: "stream-service,${no_proxy}"
+ NO_PROXY: "stream-service,${NO_PROXY}"
+ volumes:
+ - /etc/localtime:/etc/localtime:ro
+ - stream-content:/var/www/streams
+ networks:
+ - appnet
+ restart: always
diff --git a/deployment/kubernetes/build.sh b/deployment/kubernetes/build.sh
index ce80a58..9a017e7 100755
--- a/deployment/kubernetes/build.sh
+++ b/deployment/kubernetes/build.sh
@@ -1,13 +1,19 @@
#!/bin/bash -e
DIR=$(dirname $(readlink -f "$0"))
+PLATFORM="${1:-Xeon}"
NCURATIONS=${2:-1}
INGESTION="$3"
-REGISTRY="$4"
+IN_SOURCE="$4"
+STREAM_URL="$5"
+NCPU=$6
+REGISTRY="$7"
HOSTIP=$(ip route get 8.8.8.8 | awk '/ src /{split(substr($0,index($0," src ")),f);print f[2];exit}')
+echo "Generating templates with PLATFORM=${PLATFORM},NCURATIONS=${NCURATIONS},INGESTION=${INGESTION},IN_SOURCE=${IN_SOURCE},STREAM_URL=${STREAM_URL},NCPU=${NCPU},HOSTIP=${HOSTIP}"
find "${DIR}" -maxdepth 1 -mindepth 1 -name "*.yaml" -exec rm -rf "{}" \;
for template in $(find "${DIR}" -maxdepth 1 -mindepth 1 -name "*.yaml.m4" -print); do
yaml=${template/.m4/}
- m4 -DREGISTRY_PREFIX=$REGISTRY -DINGESTION="$INGESTION" -DNCURATIONS=$NCURATIONS -DHOSTIP=$HOSTIP -DUSERID=$(id -u) -DGROUPID=$(id -g) -I "${DIR}" "${template}" > "${yaml}"
+ m4 -DREGISTRY_PREFIX=${REGISTRY} -DPLATFORM=${PLATFORM} -DNCURATIONS=${NCURATIONS} -DINGESTION="${INGESTION}" -DIN_SOURCE=${IN_SOURCE} -DSTREAM_URL=${STREAM_URL} -DNCPU=${NCPU} -DUSERID=$(id -u) -DGROUPID=$(id -g) -DHOSTIP=${HOSTIP} -I "${DIR}" "${template}" > "${yaml}"
+ # m4 -DREGISTRY_PREFIX=$REGISTRY -DINGESTION="$INGESTION" -DNCURATIONS=$NCURATIONS -DIN_SOURCE=$IN_SOURCE -DSTREAM_URL=$STREAM_URL -I "${DIR}" "${template}" > "${yaml}"
done
diff --git a/deployment/kubernetes/frontend.yaml.m4 b/deployment/kubernetes/frontend.yaml.m4
index a7f4215..0e35f25 100644
--- a/deployment/kubernetes/frontend.yaml.m4
+++ b/deployment/kubernetes/frontend.yaml.m4
@@ -1,4 +1,3 @@
-
apiVersion: v1
kind: Service
metadata:
@@ -6,17 +5,16 @@ metadata:
labels:
app: frontend
spec:
+ type: NodePort
ports:
- port: 443
targetPort: 8443
+ nodePort: 30007
name: https
- externalIPs:
- - defn(`HOSTIP')
selector:
app: frontend
-
---
-
+# Part of this could be jobs not deployments.
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -36,19 +34,22 @@ spec:
enableServiceLinks: false
containers:
- name: frontend
- image: defn(`REGISTRY_PREFIX')lcc_frontend:latest
+ image: defn(`REGISTRY_PREFIX')lcc_frontend:stream
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8443
+ envFrom:
+ - configMapRef:
+ name: proxy-config
env:
- name: DBHOST
value: "vdms-service"
- name: VDHOST
value: "http://video-service:8080"
- name: NO_PROXY
- value: "video-service,${NO_PROXY}"
+ value: "video-service,$(NO_PROXY)"
- name: no_proxy
- value: "video-service,${NO_PROXY}"
+ value: "video-service,$(no_proxy)"
volumeMounts:
- mountPath: /etc/localtime
name: timezone
@@ -56,6 +57,8 @@ spec:
- mountPath: /var/run/secrets
name: self-signed-certificate
readOnly: true
+ imagePullSecrets:
+ - name:
volumes:
- name: timezone
hostPath:
diff --git a/deployment/kubernetes/ingest.yaml.m4 b/deployment/kubernetes/ingest.yaml.m4
index 3052aaf..c5d4f31 100644
--- a/deployment/kubernetes/ingest.yaml.m4
+++ b/deployment/kubernetes/ingest.yaml.m4
@@ -18,8 +18,11 @@ spec:
enableServiceLinks: false
containers:
- name: ingest
- image: defn(`REGISTRY_PREFIX')lcc_ingest:latest
+ image: defn(`REGISTRY_PREFIX')lcc_ingest:stream
imagePullPolicy: IfNotPresent
+ resources:
+ requests:
+ cpu: defn(`NCPU')
env:
- name: KKHOST
value: "kafka-service:9092"
@@ -29,6 +32,10 @@ spec:
value: "vdms-service"
- name: ZKHOST
value: "zookeeper-service:2181"
+ - name: `IN_SOURCE'
+ value: "defn(`IN_SOURCE')"
+ - name: `NCPU'
+ value: "defn(`NCPU')"
- name: NO_PROXY
value: "video-service,${NO_PROXY}"
- name: no_proxy
@@ -37,8 +44,14 @@ spec:
- mountPath: /etc/localtime
name: timezone
readOnly: true
+ - mountPath: /var/run/secrets
+ name: self-signed-certificate
+ readOnly: true
volumes:
- name: timezone
hostPath:
path: /etc/localtime
type: File
+ - name: self-signed-certificate
+ secret:
+ secretName: self-signed-certificate
\ No newline at end of file
diff --git a/deployment/kubernetes/kafka.yaml.m4 b/deployment/kubernetes/kafka.yaml.m4
index 9f071c8..c5b9b20 100644
--- a/deployment/kubernetes/kafka.yaml.m4
+++ b/deployment/kubernetes/kafka.yaml.m4
@@ -62,4 +62,3 @@ spec:
value: "-Xmx256m -Xms256m"
- name: "KAFKA_LOG4J_ROOT_LOGLEVEL"
value: "ERROR"
-
diff --git a/deployment/kubernetes/start.sh b/deployment/kubernetes/start.sh
index eb72bb5..1175f6d 100755
--- a/deployment/kubernetes/start.sh
+++ b/deployment/kubernetes/start.sh
@@ -1,19 +1,42 @@
#!/bin/bash -e
DIR=$(dirname $(readlink -f "$0"))
+PLATFORM="${1:-Xeon}"
+NCURATIONS="$2"
+INGESTION="$3"
+IN_SOURCE="$4"
+STREAM_URL="$5"
+NCPU="$6"
+REGISTRY="$7"
shift
. "$DIR/build.sh"
function create_secret {
kubectl create secret generic self-signed-certificate "--from-file=${DIR}/../certificate/self.crt" "--from-file=${DIR}/../certificate/self.key"
+ kubectl create configmap proxy-config \
+ --from-literal=http_proxy="$http_proxy" \
+ --from-literal=HTTP_PROXY="$http_proxy" \
+ --from-literal=https_proxy="$https_proxy" \
+ --from-literal=HTTPS_PROXY="$http_proxy" \
+ --from-literal=no_proxy="$no_proxy" \
+ --from-literal=NO_PROXY="$no_proxy"
}
# create secrets
"$DIR/../certificate/self-sign.sh"
create_secret 2>/dev/null || (kubectl delete secret self-signed-certificate; create_secret)
+# Choose video yaml based on IN_SOURCE
+if [ "$IN_SOURCE" == "stream" ]; then
+ skip_name="video.yaml"
+else
+ skip_name="video_stream.yaml"
+fi
+
for yaml in $(find "$DIR" -maxdepth 1 -name "*.yaml" -print); do
- kubectl apply -f "$yaml"
+ if [[ $yaml != *"$skip_name"* ]]; then
+ kubectl apply -f "$yaml"
+ fi
done
diff --git a/deployment/kubernetes/stop.sh b/deployment/kubernetes/stop.sh
index 40d83f3..ee8e23c 100755
--- a/deployment/kubernetes/stop.sh
+++ b/deployment/kubernetes/stop.sh
@@ -6,3 +6,4 @@ for yaml in $(find "${DIR}" -maxdepth 1 -name "*.yaml" -print); do
kubectl delete -f "$yaml" --ignore-not-found=true 2>/dev/null || echo -n ""
done
kubectl delete secret self-signed-certificate 2> /dev/null || echo -n ""
+kubectl delete configmap proxy-config 2> /dev/null || echo -n ""
diff --git a/deployment/kubernetes/vdms.yaml.m4 b/deployment/kubernetes/vdms.yaml.m4
index 49801bb..6fd9ae5 100644
--- a/deployment/kubernetes/vdms.yaml.m4
+++ b/deployment/kubernetes/vdms.yaml.m4
@@ -1,4 +1,3 @@
-
apiVersion: v1
kind: Service
metadata:
@@ -8,13 +7,30 @@ metadata:
spec:
ports:
- port: 55555
+ targetPort: 55555
+ protocol: TCP
+ name: vdms
+ selector:
+ app: vdms
+---
+# TODO: temporary fix!
+apiVersion: v1
+kind: Service
+metadata:
+ name: vdms-service-nodeport
+ labels:
+ app: vdms
+spec:
+ type: NodePort
+ ports:
+ - port: 55555
+ targetPort: 55555
+ nodePort: 30008
protocol: TCP
name: vdms
selector:
app: vdms
-
---
-
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -34,9 +50,9 @@ spec:
enableServiceLinks: false
containers:
- name: vdms
- image: intellabs/vdms:base
+ image: intellabs/vdms:latest
imagePullPolicy: IfNotPresent
- command: ["/bin/sh","-c","cd /vdms;vdms"]
+ command: ["/bin/sh","-c","cd /vdms/build;./vdms"]
ports:
- containerPort: 55555
volumeMounts:
diff --git a/deployment/kubernetes/video.yaml.m4 b/deployment/kubernetes/video.yaml.m4
index 487c6fa..b48b1a6 100644
--- a/deployment/kubernetes/video.yaml.m4
+++ b/deployment/kubernetes/video.yaml.m4
@@ -1,4 +1,3 @@
-
apiVersion: v1
kind: Service
metadata:
@@ -12,9 +11,8 @@ spec:
name: http
selector:
app: video
-
---
-
+# TODO: temporary fix!
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -34,10 +32,13 @@ spec:
enableServiceLinks: false
containers:
- name: video
- image: defn(`REGISTRY_PREFIX')lcc_video:latest
+ image: defn(`REGISTRY_PREFIX')lcc_video:stream
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
+ envFrom:
+ - configMapRef:
+ name: proxy-config
env:
- name: RETENTION_MINS
value: "60"
@@ -45,14 +46,24 @@ spec:
value: "10m"
- name: KKHOST
value: "kafka-service:9092"
+ - name: ZKHOST
+ value: "zookeeper-service:2181"
- name: `INGESTION'
value: "defn(`INGESTION')"
+ - name: `IN_SOURCE'
+ value: "defn(`IN_SOURCE')"
+ - name: NO_PROXY
+ value: "stream-service,$(NO_PROXY)"
+ - name: no_proxy
+ value: "stream-service,$(no_proxy)"
volumeMounts:
- mountPath: /etc/localtime
name: timezone
readOnly: true
+ imagePullSecrets:
+ - name:
volumes:
- name: timezone
hostPath:
path: /etc/localtime
- type: File
+ type: File
\ No newline at end of file
diff --git a/deployment/kubernetes/video_stream.yaml.m4 b/deployment/kubernetes/video_stream.yaml.m4
new file mode 100644
index 0000000..0f1e5f6
--- /dev/null
+++ b/deployment/kubernetes/video_stream.yaml.m4
@@ -0,0 +1,126 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: video-service
+ labels:
+ app: video
+spec:
+ ports:
+ - port: 8080
+ targetPort: 8080
+ name: http
+ selector:
+ app: video
+---
+# TODO: temporary fix!
+apiVersion: v1
+kind: Service
+metadata:
+ name: video-service-nodeport
+ labels:
+ app: video
+spec:
+ type: NodePort
+ ports:
+ - port: 8088
+ targetPort: 8088
+ nodePort: 30009
+ protocol: UDP
+ name: udp
+ selector:
+ app: video
+---
+# Should be a POD not deployment.
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: video
+ labels:
+ app: video
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: video
+ template:
+ metadata:
+ labels:
+ app: video
+ spec:
+ enableServiceLinks: false
+ containers:
+ - name: stream
+ image: defn(`REGISTRY_PREFIX')lcc_stream:stream
+ imagePullPolicy: IfNotPresent
+ ports:
+ - containerPort: 8080
+ - containerPort: 8088
+ protocol: UDP
+ envFrom:
+ - configMapRef:
+ name: proxy-config
+ env:
+ - name: KKHOST
+ value: "kafka-service:9092"
+ - name: ZKHOST
+ value: "zookeeper-service:2181"
+ - name: VDHOST
+ value: "http://video-service:8080"
+ - name: DBHOST
+ value: "vdms-service"
+ - name: `STREAM_URL'
+ value: "defn(`STREAM_URL')"
+ - name: NO_PROXY
+ value: "video-service,$(NO_PROXY)"
+ - name: no_proxy
+ value: "video-service,$(no_proxy)"
+ volumeMounts:
+ - mountPath: /etc/localtime
+ name: timezone
+ readOnly: true
+ - mountPath: /var/www/mp4
+ name: stream-content
+ readOnly: false
+ - name: video
+ image: defn(`REGISTRY_PREFIX')lcc_video:stream
+ imagePullPolicy: IfNotPresent
+ ports:
+ - containerPort: 8080
+ envFrom:
+ - configMapRef:
+ name: proxy-config
+ env:
+ - name: RETENTION_MINS
+ value: "60"
+ - name: CLEANUP_INTERVAL
+ value: "10m"
+ - name: KKHOST
+ value: "kafka-service:9092"
+ - name: ZKHOST
+ value: "zookeeper-service:2181"
+ - name: SHOST
+ value: "http://stream-service:8080"
+ - name: `INGESTION'
+ value: "defn(`INGESTION')"
+ - name: `IN_SOURCE'
+ value: "defn(`IN_SOURCE')"
+ - name: NO_PROXY
+ value: "stream-service,$(NO_PROXY)"
+ - name: no_proxy
+ value: "stream-service,$(no_proxy)"
+ volumeMounts:
+ - mountPath: /etc/localtime
+ name: timezone
+ readOnly: true
+ - mountPath: /var/www/streams
+ name: stream-content
+ readOnly: false
+ imagePullSecrets:
+ - name:
+ volumes:
+ - name: timezone
+ hostPath:
+ path: /etc/localtime
+ type: File
+ - name: stream-content
+ emptyDir: {}
diff --git a/doc/arch.png b/doc/arch.png
index 727895c..1837c12 100644
Binary files a/doc/arch.png and b/doc/arch.png differ
diff --git a/doc/cmake.md b/doc/cmake.md
index 732d58b..242fedc 100644
--- a/doc/cmake.md
+++ b/doc/cmake.md
@@ -1,28 +1,68 @@
-### CMake Options:
+## CMake Options:
-Use the following definitions to customize the building process:
-- **PLATFORM**: Specify the target platform: `Xeon`
-- **NCURATIONS**: Specify the number of curation processes running in the background.
-- **INGESTION**: Specify the ingestion mode: `face` and/or `object`. Use comma as the deliminator to specify more than 1 ingestion mode.
+Use the following definitions to customize the building process:
+- **PLATFORM**: Specify the target platform: `Xeon`
+- **NCURATIONS**: Specify the number of curation processes running in the background.
+- **INGESTION**: Specify the ingestion mode: `face` and/or `object`. Use comma as the deliminator to specify more than 1 ingestion mode.
+- **IN_SOURCE**: Specify the input video source: `videos` and/or `stream`. Use comma as the deliminator to specify more than 1 source.
+- **STREAM_URL**: Specify the URL for streaming source. If using a webcam stream, please specify `"udp://localhost:8088"`.
+- **REGISTRY**: Name of private registry to push image. If registry secret is available, update `imagePullSecrets` field in [frontend.yaml.m4](../deployment/kubernetes/frontend.yaml.m4), [video_stream.yaml.m4](../deployment/kubernetes/video_stream.yaml.m4), and/or [video.yaml.m4](../deployment/kubernetes/video.yaml.m4) for Kubernetes. `docker login` may be necessary.
+
-### Examples:
+***Optimizations for sharing host with other applications:***
+The following optimizations are helpful if running other applications on the same host.
+- [Assigning CPU resources](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/) is helpful in this case. In this sample, we specify a CPU request for the ingest container by including the resources:requests field in the container resource manifest. Remove the following from [frontend.yaml.m4](../deployment/kubernetes/frontend.yaml.m4) under configurations for ingest container to disable this feature or modify as needed.
+ ```JSON
+ resources:
+ requests:
+ cpu: "1"
+ ```
+- **NCPU**: Use `NCPU` in your cmake command to specify number of CPU cores for Ingestion. The ingest pool will run on randomly selected CPUs. Similar to `taskset` on Linux.
+
+## Examples:
+### Use videos
+This sample uses a list of ten video from Pexel. Please accept the license when prompted. Use the following command to build the sample:
+```bash
+mkdir build
+cd build
+cmake ..
+make
+```
+
+### Stream from webcam
+Build the sample:
+```bash
+mkdir build
+cd build
+cmake -DSTREAM_URL="udp://localhost:8088" -DIN_SOURCE=stream ..
+make
+```
+Start the sample using preferred method, then use FFMPeg to start your webcam locally and send via UDP to the host machine (``) and udp port 30009. A sample command is the following:
+```bash
+ffmpeg -re -f dshow -rtbufsize 100M -i video="HP HD Camera" -vcodec libx264 -crf 28 -threads 1 -s 640x360 -f mpegts -flush_packets 0 udp://:30009?pkt_size=18800
+```
+
+### Stream video using URL
+Use the following command to stream the [face-demographics-walking Sample video](https://github.com/intel-iot-devkit/sample-videos) from [Intel's IoT Libraries & Code Samples](https://github.com/intel-iot-devkit):
```
cd build
-cmake -DPLATFORM=Xeon ..
+cmake -DSTREAM_URL="https://github.com/intel-iot-devkit/sample-videos/raw/master/face-demographics-walking.mp4" -DIN_SOURCE=stream ..
+make
```
-### Make Commands:
-- **build**: Build the sample (docker) images.
-- **update**: Distribute the sample images to worker nodes.
-- **dist**: Create the sample distribution package.
-- **start/stop_docker_compose**: Start/stop the sample orchestrated by docker-compose.
-- **start/stop_docker_swarm**: Start/stop the sample orchestrated by docker swarm.
-- **start/stop_kubernetes**: Start/stop the sample orchestrated by Kubernetes.
+## Make Commands:
+
+- **build**: Build the sample (docker) images.
+- **update**: Distribute the sample images to worker nodes.
+- **dist**: Create the sample distribution package.
+- **start/stop_docker_compose**: Start/stop the sample orchestrated by docker-compose.
+- **start/stop_docker_swarm**: Start/stop the sample orchestrated by docker swarm.
+- **start/stop_kubernetes**: Start/stop the sample orchestrated by Kubernetes.
-### See Also:
+## See Also:
-- [Sample Distribution](dist.md)
+- [Sample Distribution](dist.md)
diff --git a/frontend/Dockerfile b/frontend/Dockerfile
index e4197eb..569abbc 100644
--- a/frontend/Dockerfile
+++ b/frontend/Dockerfile
@@ -1,13 +1,13 @@
-FROM openvisualcloud/xeon-centos76-media-nginx:20.7
+FROM openvisualcloud/xeon-ubuntu2004-media-nginx:23.1
-RUN yum install -y -q epel-release && yum install -y -q python36-tornado python36-requests python36-ply python36-pip python36-psutil && rm -rf /var/cache/yum/*
-RUN pip3 install vdms
+RUN apt-get update && apt-get install -y -q --no-install-recommends python3-tornado python3-requests python3-ply python3-pip python3-psutil && rm -rf /var/lib/apt/lists/*
+RUN pip3 install 'protobuf<=3.20' vdms
+COPY *.py /home/
COPY *.conf /etc/nginx/
-COPY *.py /home/
COPY html /var/www/html
-CMD ["/bin/bash","-c","/home/frontend.py&/usr/local/sbin/nginx"]
+CMD ["/bin/bash","-c","/home/frontend.py&/usr/local/sbin/nginx"]
EXPOSE 8080
####
@@ -16,10 +16,11 @@ ARG GROUP
ARG UID
ARG GID
## must use ; here to ignore user exist status code
-RUN [ ${GID} -gt 0 ] && groupadd -f -g ${GID} ${GROUP}; \
- [ ${UID} -gt 0 ] && useradd -d /home/${USER} -g ${GID} -K UID_MAX=${UID} -K UID_MIN=${UID} ${USER}; \
+RUN if [ ${GID} -gt 0 ]; then groupadd -f -g ${GID} ${GROUP}; fi; \
+ if [ ${UID} -gt 0 ]; then useradd -d /home/${USER} -g ${GID} -K UID_MAX=${UID} -K UID_MIN=${UID} ${USER}; fi; \
touch /var/run/nginx.pid && \
- mkdir -p /var/log/nginx /var/lib/nginx /var/www/cache /var/www/thumbnail && \
- chown -R ${UID}.${GID} /var/run/nginx.pid /var/www /var/log/nginx /var/lib/nginx
+ mkdir -p /var/log/nginx /var/lib/nginx /var/www/cache /var/www/gen /var/www/mp4 /var/www/logwatch && \
+ chown -R ${UID}.${GID} /var/run/nginx.pid /var/log/nginx /var/lib/nginx /var/www /etc/nginx/nginx.conf
+VOLUME ["/var/www"]
USER ${UID}
####
diff --git a/frontend/nginx.conf b/frontend/nginx.conf
index cfb869e..efb4708 100644
--- a/frontend/nginx.conf
+++ b/frontend/nginx.conf
@@ -22,8 +22,8 @@ http {
client_body_timeout 10s;
client_header_timeout 10s;
- ssl_certificate /run/secrets/self.crt;
- ssl_certificate_key /run/secrets/self.key;
+ ssl_certificate /var/run/secrets/self.crt;
+ ssl_certificate_key /var/run/secrets/self.key;
ssl_protocols TLSv1.2;
ssl_prefer_server_ciphers on;
@@ -33,7 +33,7 @@ http {
ssl_session_tickets off;
ssl_stapling off;
ssl_stapling_verify off;
-
+
location / {
#limit_req zone=req1;
#limit_conn addr1 50000;
diff --git a/frontend/search.py b/frontend/search.py
index 1d1e3f1..eba4bd8 100755
--- a/frontend/search.py
+++ b/frontend/search.py
@@ -10,6 +10,7 @@
import json
import vdms
import time
+import traceback
dbhost=os.environ["DBHOST"]
vdhost=os.environ["VDHOST"]
@@ -34,13 +35,13 @@ def _value(self, query1, key):
for kv in query1["params"]:
if kv["name"]==key: return kv["value"]
return None
-
- def _construct_queries(self, queries, ref):
+
+ def _construct_single_query(self, query1, ref):
q_bbox={
"FindBoundingBox": {
"_ref": ref,
"results": {
- "list": [ "frameID", "objectID", "_coordinates" ],
+ "list": [ "frameID", "objectID", "video_name", "_coordinates" ],
},
},
}
@@ -50,51 +51,50 @@ def _construct_queries(self, queries, ref):
"ref2": ref,
"results": {
"list": [ "frameID", "video_name" ],
- },
+ },
},
}
- for query1 in queries:
- if query1["name"]=="video":
- name=self._value(query1, "Video Name")
- if name!="*" and name!="":
- q_conn["FindConnection"].update({
- "constraints": {
- "video_name": [ "==", name ],
- },
- })
- if query1["name"]=="object":
- q_bbox["FindBoundingBox"].update({
+ if query1["name"]=="video":
+ name=self._value(query1, "Video Name")
+ if name!="*" and name!="":
+ q_conn["FindConnection"].update({
"constraints": {
- "objectID": [ "==", self._value(query1, "Object List") ],
+ "video_name": [ "==", name ],
},
})
- if query1["name"]=="person":
- constraints={
- "age": [
- ">=", self._value(query1, "Age Min"),
- "<=", self._value(query1, "Age Max"),
- ],
- "objectID": [
- "==", "face"
- ],
- }
- if "age" not in q_bbox["FindBoundingBox"]["results"]["list"]:
- q_bbox["FindBoundingBox"]["results"]["list"].append("age")
+ if query1["name"]=="object":
+ q_bbox["FindBoundingBox"].update({
+ "constraints": {
+ "objectID": [ "==", self._value(query1, "Object List") ],
+ },
+ })
+ if query1["name"]=="person":
+ constraints={
+ "age": [
+ ">=", self._value(query1, "Age Min"),
+ "<=", self._value(query1, "Age Max"),
+ ],
+ "objectID": [
+ "==", "face"
+ ],
+ }
+ if "age" not in q_bbox["FindBoundingBox"]["results"]["list"]:
+ q_bbox["FindBoundingBox"]["results"]["list"].append("age")
- emotion=self._value(query1, "Emotion List")
- if emotion!="skip":
- constraints["emotion"]=[ "==", emotion ]
- if "emotion" not in q_bbox["FindBoundingBox"]["results"]["list"]:
- q_bbox["FindBoundingBox"]["results"]["list"].append("emotion")
+ emotion=self._value(query1, "Emotion List")
+ if emotion!="skip":
+ constraints["emotion"]=[ "==", emotion ]
+ if "emotion" not in q_bbox["FindBoundingBox"]["results"]["list"]:
+ q_bbox["FindBoundingBox"]["results"]["list"].append("emotion")
- gender=self._value(query1, "Gender")
- if gender!="skip":
- constraints["gender"]=[ "==", gender ]
- if "gender" not in q_bbox["FindBoundingBox"]["results"]["list"]:
- q_bbox["FindBoundingBox"]["results"]["list"].append("gender")
+ gender=self._value(query1, "Gender")
+ if gender!="skip":
+ constraints["gender"]=[ "==", gender ]
+ if "gender" not in q_bbox["FindBoundingBox"]["results"]["list"]:
+ q_bbox["FindBoundingBox"]["results"]["list"].append("gender")
- q_bbox["FindBoundingBox"].update({ "constraints": constraints })
+ q_bbox["FindBoundingBox"].update({ "constraints": constraints })
return [q_bbox, q_conn]
@@ -102,12 +102,12 @@ def _decode_response(self, response):
clips={}
for i in range(0,len(response)-1,2):
if response[i+1]["FindConnection"]["status"]==0 and response[i]["FindBoundingBox"]["status"]==0:
- connections=response[i+1]["FindConnection"]["connections"]
+ # connections=response[i+1]["FindConnection"]["connections"]
bboxes=response[i]["FindBoundingBox"]["entities"]
- if len(connections)!=len(bboxes): continue
+ # if len(connections)!=len(bboxes): continue
- for j in range(0,len(connections)):
- stream=connections[j]["video_name"]
+ for j in range(0,len(bboxes)):
+ stream=bboxes[j]["video_name"]
if stream not in clips:
r=get(vdhost+"/api/info",params={"video":stream}).json()
clips[stream]={
@@ -128,10 +128,10 @@ def _decode_response(self, response):
seg1=[max(ts-segmin,0),min(ts+segmin,stream1["duration"])]
stream1["segs"]=merge_iv(stream1["segs"], seg1)
- if ts not in stream1["frames"]:
- stream1["frames"][ts]={
- "time": ts,
- "objects": []
+ if ts not in stream1["frames"]:
+ stream1["frames"][ts]={
+ "time": ts,
+ "objects": []
}
if "objectID" in bboxes[j]:
@@ -178,71 +178,152 @@ def _decode_response(self, response):
print(segs, flush=True)
return segs
- @run_on_executor
- def _search(self, queries, size):
- vdms_queries=[]
- for query1 in queries:
- vdms_queries.extend(self._construct_queries(query1, len(vdms_queries)+1))
+ def find_common_elements(self,list_a, list_b):
+ set_a = set(list_a)
+ set_b = set(list_b)
+
+ common_elements = set_a.intersection(set_b)
+ if len(common_elements) > 0:
+ return list(common_elements)
+ else:
+ return list()
+
+ def get_details_from_BB(self, response):
+ bb_entities = response[0]["FindBoundingBox"]["entities"]
+ con_entity = response[1]["FindConnection"]["connections"][0]
+
+ bb_info = {}
+ for j in range(0,len(bb_entities)):
+ video_name=bb_entities[j]["video_name"]
+ frameID = bb_entities[j]["frameID"]
+ objectID = bb_entities[j]["objectID"]
+ con_entity["video_name"] = video_name
+ con_entity["frameID"] = frameID
+
+ if video_name not in bb_info:
+ bb_info[video_name] = {frameID: [[bb_entities[j], con_entity]]}
+
+ elif frameID not in bb_info[video_name]:
+ bb_info[video_name][frameID] = [[bb_entities[j], con_entity]]
+
+ else:
+ bb_info[video_name][frameID].append([bb_entities[j], con_entity])
+ return bb_info
+
+ def intersect_reponses(self, responses):
+ # Find reponse with least number of returned elements
+ prev_info = self.get_details_from_BB(responses[0])
+ responses_info = []
+ for ridx in range(1, len(responses)):
+ if prev_info == {}:
+ break
+
+ response_info = self.get_details_from_BB(responses[ridx])
+
+ prev_videos = prev_info.keys()
+ cur_videos = response_info.keys()
+ valid_videos = self.find_common_elements(prev_videos, cur_videos)
+
+ if len(valid_videos) == 0:
+ response_info = {}
+ break
+
+ for vid in set(prev_videos).difference(set(valid_videos)):
+ del prev_info[vid]
+
+ for vid in set(cur_videos).difference(set(valid_videos)):
+ del response_info[vid]
+
+ for vid in valid_videos:
+ prev_frames = prev_info[vid].keys()
+ cur_frames = response_info[vid].keys()
+ valid_frames = self.find_common_elements(prev_frames, cur_frames)
+
+ if len(valid_frames) == 0:
+ del response_info[vid]
+ continue
+
+ for frame in set(prev_frames).difference(set(valid_frames)):
+ del prev_info[vid][frame]
+
+ for frame in set(cur_frames).difference(set(valid_frames)):
+ del response_info[vid][frame]
+
+ for frame in valid_frames:
+ response_info[vid][frame].extend(prev_info[vid][frame])
+
+ responses_info.append(response_info)
+ prev_info = response_info.copy()
+
+ print("responses_info: ", flush=True)
+ print(prev_info, flush=True)
- print(vdms_queries, flush=True)
- vdms_response, vdms_array =self._vdms.query(vdms_queries)
- print(vdms_response, flush=True)
+ new_connections=[]
+ new_bboxes=[]
+ for vid in prev_info.keys():
+ for frame in prev_info[vid].keys():
+ bb_info = prev_info[vid][frame]
+ for j in range(0,len(bb_info)):
+ new_bboxes.append(bb_info[j][0])
+ new_connections.append(bb_info[j][1])
- return self._decode_response(vdms_response)
-
+ final_response = responses[0]
+ print("Number bboxes: ", len(new_bboxes))
+ final_response[0]["FindBoundingBox"]["entities"] = new_bboxes
+ final_response[0]["FindBoundingBox"]['returned'] = len(new_bboxes)
+ final_response[1]["FindConnection"]["connections"] = new_connections
+ final_response[1]["FindConnection"]['returned'] = len(new_connections)
+ return final_response
+
+ def one_shot_query(self, queries):
+ vdms_response = []
+ ref = 1
+ print("Queries: ", flush=True)
+ for query1 in queries: # Query per line in Gui
+ responses = []
+ for q in query1: #Queries on a single line (one icon)
+ # print(f"Icon query: {q}", flush=True)
+
+ # BB & Connection query for each icon
+ vdms_query = self._construct_single_query(q, ref)
+
+ print("vdms_query:", flush=True)
+ print(vdms_query, flush=True)
+
+ response, _ =self._vdms.query(vdms_query)
+
+ responses.append(response)
+ ref += 1
+
+ if len(responses) > 1 and "FindBoundingBox" in responses[0][0]:
+ # And operation; multiple on one line
+ # Find common response/frames for all icons on line
+ final_response = self.intersect_reponses(responses)
+ vdms_response.extend(final_response)
+ else:
+ # Single query
+ vdms_response.extend(responses[0])
+
+ return vdms_response
+
+ @run_on_executor
+ def _search(self, queries, size):
try:
- return [{
- "thumbnail": "images/segment.svg",
- "stream": "video/mock.mp4",
- "time": 0.01, # segment time
- "offset": 0, # segment offset time, usually zero
- "duration": 5.0,
- "fps": 30,
- "width": 1024,
- "height": 1024,
- "frames": [{
- "time": 0.05, # seconds
- "objects": [{
- "detection" : {
- "bounding_box" : {
- "x_max" : 0.37810274958610535,
- "x_min" : 0.2963270843029022,
- "y_max" : 0.8861181139945984,
- "y_min" : 0.784187376499176
- },
- "confidence" : 0.9999198913574219,
- "label" : "vehicle",
- "label_id" : 2
- }
- }],
- },{
- "time": 0.06, # seconds
- "objects": [{
- "detection" : {
- "bounding_box" : {
- "x_max" : 0.37810274958610535,
- "x_min" : 0.2963270843029022,
- "y_max" : 0.8861181139945984,
- "y_min" : 0.784187376499176
- },
- "confidence" : 0.9999198913574219,
- "label" : "vehicle",
- "label_id" : 2
- }
- }],
- }],
- "properties": [], # additional name/value pairs to show in table
- }]
+ vdms_response = self.one_shot_query(queries)
except Exception as e:
- return str(e)
+ vdms_response = []
+ print("Exception: "+str(e)+"\n"+traceback.format_exc(), flush=True)
+ # print("VDMS response:")
+ # print(vdms_response, flush=True)
+ segs = self._decode_response(vdms_response)
+ return segs
@gen.coroutine
def get(self):
queries=json.loads(unquote(str(self.get_argument("queries"))))
size=int(self.get_argument("size"))
- print("queries",flush=True)
- print(queries,flush=True)
-
+ # print("queries",flush=True)
+ # print(queries,flush=True)
r=yield self._search(queries, size)
if isinstance(r, str):
self.set_status(400, str(r))
@@ -251,3 +332,65 @@ def get(self):
self.write({"response":r})
self.set_status(200, 'OK')
self.finish()
+
+
+ # def intersect_reponses2(self, responses):
+ # # Find reponse with least number of returned elements
+ # responses_info = []
+ # for response in responses:
+ # bboxes=response[0]["FindBoundingBox"]["entities"]
+
+ # response_info = {}
+ # for j in range(0,len(bboxes)):
+ # video_name=bboxes[j]["video_name"]
+ # frameID = bboxes[j]["frameID"]
+ # if video_name not in response_info:
+ # response_info[video_name] = [frameID]
+ # else:
+ # response_info[video_name].append(frameID)
+ # responses_info.append(response_info)
+
+ # print("responses_info: ", flush=True)
+ # print(responses_info, flush=True)
+
+ # # List of videos in all reponses
+ # valid_videos = responses_info[0].keys()
+ # for idx in range(1, len(responses_info)):
+ # valid_videos = self.find_common_elements(valid_videos, responses_info[idx].keys())
+
+ # if valid_videos is None:
+ # return []
+
+ # combined_response = {}
+ # for video in valid_videos:
+ # valid_frames = responses_info[0][video]
+ # for idx in range(1, len(responses_info)):
+ # valid_frames = self.find_common_elements(valid_frames, responses_info[idx][video])
+ # if valid_frames:
+ # combined_response[video] = valid_frames
+
+ # if len(combined_response.keys()) == 0:
+ # return []
+
+ # new_connections=[]
+ # new_bboxes=[]
+ # for ridx, response in enumerate(responses):
+ # bboxes=response[0]["FindBoundingBox"]["entities"]
+ # connections=response[1]["FindConnection"]["connections"]
+
+ # for j in range(0,len(bboxes)):
+ # video_name=bboxes[j]["video_name"]
+ # frameID = bboxes[j]["frameID"]
+ # if video_name in combined_response and frameID in combined_response[video_name]:
+ # connections[j]["video_name"] = video_name
+ # connections[j]["frameID"] = frameID
+ # new_connections.append(connections[j])
+ # new_bboxes.append(bboxes[j])
+
+ # final_response = responses[0]
+ # print("Number bboxes: ", len(new_bboxes))
+ # final_response[0]["FindBoundingBox"]["entities"] = new_bboxes
+ # final_response[0]["FindBoundingBox"]['returned'] = len(new_bboxes)
+ # final_response[1]["FindConnection"]["connections"] = new_connections
+ # final_response[1]["FindConnection"]['returned'] = len(new_connections)
+ # return final_response
\ No newline at end of file
diff --git a/frontend/setting.py b/frontend/setting.py
index b258b79..d4ae978 100755
--- a/frontend/setting.py
+++ b/frontend/setting.py
@@ -57,23 +57,20 @@ def _settings(self):
"name": "Object List",
"type": "list",
"values": [
- "person", "bicycle", "car", "motorbike","aeroplane",
- "bus","train", "truck", "boat", "traffic light",
- "fire hydrant", "stop sign", "parking meter", "bench",
- "bird", "cat", "dog", "horse", "sheep", "cow",
- "elephant", "bear", "zebra", "giraffe", "backpack",
- "umbrella", "handbag", "tie", "suitcase", "frisbee",
- "skis", "snowboard", "sports ball", "kite",
- "baseball bat", "baseball glove", "skateboard",
- "surfboard", "tennis racket", "bottle", "wine glass",
- "cup", "fork", "knife", "spoon", "bowl", "banana",
- "apple", "sandwich", "orange", "broccoli", "carrot",
- "hot dog", "pizza", "donut", "cake", "chair", "sofa",
- "pottedplant", "bed", "diningtable", "toilet",
- "tvmonitor", "laptop", "mouse", "remote", "keyboard",
- "cell phone", "microwave", "oven", "toaster", "sink",
- "refrigerator", "book", "clock", "vase", "scissors",
- "teddy bear", "hair drier", "toothbrush"
+ "aeroplane", "apple", "backpack", "banana", "baseball bat",
+ "baseball glove", "bear", "bed", "bench", "bicycle", "bird",
+ "boat", "book", "bottle", "bowl", "broccoli", "bus", "cake",
+ "car", "carrot", "cat", "cell phone", "chair", "clock", "cow",
+ "cup", "diningtable", "dog", "donut", "elephant", "fire hydrant",
+ "fork", "frisbee", "giraffe", "hair drier", "handbag", "horse",
+ "hot dog", "keyboard", "kite", "knife", "laptop", "microwave",
+ "motorbike", "mouse", "orange", "oven", "parking meter", "person",
+ "pizza", "pottedplant", "refrigerator", "remote", "sandwich",
+ "scissors", "sheep", "sink", "skateboard", "skis", "snowboard",
+ "sofa", "spoon", "sports ball", "stop sign", "suitcase",
+ "surfboard", "teddy bear", "tennis racket", "tie", "toaster",
+ "toilet", "toothbrush", "traffic light", "train", "truck",
+ "tvmonitor", "umbrella", "vase", "wine glass", "zebra"
],
"value": "person",
}],
diff --git a/ingest/Dockerfile b/ingest/Dockerfile
index 7e68183..6ce40f0 100644
--- a/ingest/Dockerfile
+++ b/ingest/Dockerfile
@@ -1,8 +1,8 @@
-FROM openvisualcloud/xeon-ubuntu1804-analytics-dev:20.7 as build
+FROM openvisualcloud/xeon-ubuntu1804-analytics-dev:21.3 as build
ENV PYTHONPATH=
-RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y -q libgoogle-glog-dev libgflags-dev libgtest-dev libjsoncpp-dev cmake git scons libprotobuf-dev protobuf-compiler
+RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y -q build-essential libgoogle-glog-dev libgflags-dev libgtest-dev libjsoncpp-dev cmake git scons libprotobuf-dev protobuf-compiler
# build gtest
RUN cd /usr/src/gtest && cmake . && make -j$(nproc) && mv libgtest* /usr/lib
@@ -17,13 +17,17 @@ RUN cd /opt/vdms_converter && scons -j$(nproc)
# metaData_extract
COPY gstreamer_gva /opt/gstreamer_gva
-RUN cd /opt/gstreamer_gva && scons --metaData -Q VDMS_ROOT_PATH=/opt/vdms/ GST_GVA_PLUGIN_ROOT_PATH=/usr/local/lib/x86_64-linux-gnu/gstreamer-1.0
+RUN cd /opt/gstreamer_gva && scons --metaData -Q VDMS_ROOT_PATH=/opt/vdms/ GST_GVA_PLUGIN_ROOT_PATH=/usr/local/lib/gstreamer-1.0
# final image
-FROM openvisualcloud/xeon-ubuntu1804-analytics-gst:20.7
+FROM openvisualcloud/xeon-ubuntu1804-analytics-gst:21.3
+ENV LD_LIBRARY_PATH /usr/local/lib:${LD_LIBRARY_PATH}
# prerequisite
-RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y -q libjsoncpp1 python3-pip wget libdc1394-22 libprotobuf10 && rm -rf /var/lib/apt/lists/* && pip3 install 'kafka-python>=1.4.7' 'kazoo>=2.6.1'
+RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
+ apt-get install -y -q libjsoncpp1 python3-pip wget libdc1394-22 libprotobuf10 && \
+ rm -rf /var/lib/apt/lists/* && \
+ pip3 install 'kafka-python>=1.4.7' 'kazoo>=2.6.1' psutil
COPY --from=build /opt/vdms_converter/libvdms_converter.so /usr/local/lib/libvdms_converter.so
COPY --from=build /opt/vdms/client/cpp/libvdms-client.so /usr/local/lib/libvdms-client.so
@@ -37,4 +41,3 @@ COPY openvino_models /opt/openvino_models
# ingest script
COPY *.py /home/
CMD ["/home/ingest.py"]
-
diff --git a/ingest/gstreamer_gva/metaData_extract/SConscript b/ingest/gstreamer_gva/metaData_extract/SConscript
index 91d8ca9..d216e90 100644
--- a/ingest/gstreamer_gva/metaData_extract/SConscript
+++ b/ingest/gstreamer_gva/metaData_extract/SConscript
@@ -27,13 +27,13 @@ env.Append(
],
LIBPATH = [
"/usr/lib/x86_64-linux-gnu",
+ "/usr/local/lib/gstreamer-1.0",
"/usr/local/lib",
"/usr/lib",
"/opt/vdms_converter",
GST_GVA_PLUGIN_ROOT_PATH + "/build/intel64/Release/lib",
VDMS_ROOT_PATH + "/utils",
VDMS_ROOT_PATH + "/client/cpp",
- "/usr/local/lib/x86_64-linux-gnu/gstreamer-1.0"
],
LIBS = [
"vdms_converter",
@@ -58,6 +58,5 @@ env.Append(
]
)
-# "cpu_extension",
env.Program("metaData_extract", source_files)
diff --git a/ingest/gstreamer_gva/metaData_extract/src/metaData_extract.cpp b/ingest/gstreamer_gva/metaData_extract/src/metaData_extract.cpp
index cb91c64..abf3c4a 100644
--- a/ingest/gstreamer_gva/metaData_extract/src/metaData_extract.cpp
+++ b/ingest/gstreamer_gva/metaData_extract/src/metaData_extract.cpp
@@ -126,7 +126,7 @@ void ParseYOLOV3Output(GVA::Tensor &tensor_yolo, int image_width, int image_heig
throw std::runtime_error("Invalid output size");
}
int side_square = side * side;
-
+
// const float *output_blob = (const float *)RegionYolo->data;
std::vector output_blob = tensor_yolo.data();
for (int i = 0; i < side_square; ++i) {
@@ -164,20 +164,20 @@ void DrawObjects(std::vector &objects, cv::Mat &frame, VDMSConv
std::string video_id, int frameID, bool streamVid, bool noDisplay, std::vector objIDArr){
std::sort(objects.begin(), objects.end());
for (size_t i = 0; i < objects.size(); ++i){
- if (objects[i].confidence == 0)
+ if (objects[i].confidence == 0)
continue;
- for (size_t j = i + 1; j < objects.size(); ++j)
+ for (size_t j = i + 1; j < objects.size(); ++j)
if (IntersectionOverUnion(objects[i], objects[j]) >= 0.4)
objects[j].confidence = 0;
- }
+ }
// Drawing boxes
for (const auto &object : objects) {
- if (object.confidence < 0)
+ if (object.confidence < 0)
continue;
guint label = object.class_id;
float confidence = object.confidence;
-
- if (confidence > 0 && std::find(objIDArr.begin(), objIDArr.end(), coco_labels[label]) != objIDArr.end())
+
+ if (confidence > 0 && std::find(objIDArr.begin(), objIDArr.end(), coco_labels[label]) != objIDArr.end())
{
// Drawing only objects when >confidence_threshold probability
// Do not draw when noDisplay is TRUE
@@ -193,11 +193,11 @@ void DrawObjects(std::vector &objects, cv::Mat &frame, VDMSConv
std::string metadata_str = std::to_string(frameID) + ","
+ std::to_string(object.xmin) + ","
- + std::to_string(object.ymin) + ","
+ + std::to_string(object.ymin) + ","
+ std::to_string(object.xmax - object.xmin) + ","
+ std::to_string(object.ymax - object.ymin) + ","
+ coco_labels[label];
- if(converter != NULL)
+ if(converter != NULL)
{
bbox_obj bbox;
bbox.props[VDMS_OBJID] = coco_labels[label];
@@ -206,13 +206,13 @@ void DrawObjects(std::vector &objects, cv::Mat &frame, VDMSConv
bbox.y = object.ymin;
bbox.w = object.xmax - object.xmin;
bbox.h = object.ymax - object.ymin;
- bbox.props[VDMS_FRAMEID] = frameID;
- if(converter->upload_data(bbox, frameID, video_id, streamVid) != 0)
+ bbox.props["video_name"] = video_id;
+ if(converter->upload_data(bbox, frameID, video_id, streamVid) != 0)
{
std::cout << "Upload Failed" << std::endl;
}
}
- if(outFD != NULL)
+ if(outFD != NULL)
{
outFD[0] << metadata_str << std::endl;
outFD->flush();
@@ -229,14 +229,14 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
VDMSConverter *converter = args->converter;
std::ofstream *outFD = args->outFD;
int *frameID = args->frameID;
-
+
auto buffer = GST_PAD_PROBE_INFO_BUFFER(info);
-
+
// Making a buffer writable can fail (for example if it cannot be copied and is used more than once)
// buffer = gst_buffer_make_writable(buffer);
if (buffer == NULL)
return GST_PAD_PROBE_OK;
-
+
// Get width and height
GstCaps *caps = gst_pad_get_current_caps(pad);
if (!caps)
@@ -245,13 +245,13 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
GVA::VideoFrame video_frame(buffer, caps);
gint width = video_frame.video_info()->width;
gint height = video_frame.video_info()->height;
-
+
// Map buffer and create OpenCV image
GstMapInfo map;
- if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
+ if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
return GST_PAD_PROBE_OK;
cv::Mat mat(height, width, CV_8UC4, map.data);
-
+
// Iterate detected objects and all attributes
for (GVA::RegionOfInterest &roi : video_frame.regions()) {
auto meta = roi.rect(); //roi.meta();
@@ -263,17 +263,17 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
for (auto tensor : roi.tensors()) {
std::string model_name = tensor.model_name();
std::string layer_name = tensor.layer_name();
- std::vector data = tensor.data();
+ std::vector data = tensor.data();
if (model_name.find("gender") != std::string::npos && layer_name.find("prob") != std::string::npos) {
label += (data[1] > 0.5) ? " M " : " F ";
gender = (data[1] > 0.5) ? "male" : "female";
}
-
+
if (layer_name.find("age") != std::string::npos) {
label += std::to_string((int)(data[0] * 100));
age = std::to_string((int)(data[0] * 100));
}
-
+
if (model_name.find("EmoNet") != std::string::npos) {
static const std::vector emotionsDesc = {"neutral", "happy", "sad", "surprise", "anger"};
int index = max_element(begin(data), end(data)) - begin(data);
@@ -284,7 +284,7 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
if (!label.empty()) {
std::string metadata_str = std::to_string(frameID[0]) + ","
+ std::to_string(meta.x) + ","
- + std::to_string(meta.y) + ","
+ + std::to_string(meta.y) + ","
+ std::to_string(meta.w) + ","
+ std::to_string(meta.h) + ","
+ age + ","
@@ -297,7 +297,8 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
bbox.y = meta.y;
bbox.w = meta.w;
bbox.h = meta.h;
- if(converter->upload_data(bbox, frameID[0], args->video_id, args->streamVid) != 0)
+ bbox.props["video_name"] = args->video_id;
+ if(converter->upload_data(bbox, frameID[0], args->video_id, args->streamVid) != 0)
std::cout << "Upload Failed" << std::endl;
}
if(outFD != NULL) {
@@ -307,7 +308,7 @@ static GstPadProbeReturn pad_probe_callback_face(GstPad *pad, GstPadProbeInfo *i
std::cout << metadata_str << std::endl;
}
}
-
+
// Release the memory previously mapped with gst_buffer_map
gst_buffer_unmap(buffer, &map);
// Unref a GstCaps and and free all its structures and the structures' values
@@ -325,12 +326,12 @@ static GstPadProbeReturn pad_probe_callback_obj(GstPad *pad, GstPadProbeInfo *in
std::ofstream *outFD = args->outFD;
int *frameID = args->frameID;
gdouble threshold = args->threshold;
-
+
auto buffer = GST_PAD_PROBE_INFO_BUFFER(info);
-
+
if (buffer == NULL)
return GST_PAD_PROBE_OK;
-
+
GstCaps *caps = gst_pad_get_current_caps(pad);
if (!caps)
throw std::runtime_error("Can't get current caps");
@@ -338,13 +339,13 @@ static GstPadProbeReturn pad_probe_callback_obj(GstPad *pad, GstPadProbeInfo *in
GVA::VideoFrame video_frame(buffer, caps);
gint width = video_frame.video_info()->width;
gint height = video_frame.video_info()->height;
-
+
// Map buffer and create OpenCV image
GstMapInfo map;
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
- return GST_PAD_PROBE_OK;
+ return GST_PAD_PROBE_OK;
cv::Mat mat(height, width, CV_8UC4, map.data);
-
+
// Parse and draw outputs
std::vector objects;
for (GVA::Tensor &tensor : video_frame.tensors()) {
@@ -352,10 +353,10 @@ static GstPadProbeReturn pad_probe_callback_obj(GstPad *pad, GstPadProbeInfo *in
ParseYOLOV3Output(tensor, width, height, objects, threshold);
}
}
-
+
DrawObjects(objects, mat, converter, outFD, args->video_id, frameID[0], args->streamVid, args->noDisplay, args->objIDArr);
GST_PAD_PROBE_INFO_DATA(info) = buffer;
-
+
gst_buffer_unmap(buffer, &map);
gst_caps_unref(caps);
frameID[0]++;
@@ -364,18 +365,18 @@ static GstPadProbeReturn pad_probe_callback_obj(GstPad *pad, GstPadProbeInfo *in
int parse_args(
- int argc,
- char* argv[],
+ int argc,
+ char* argv[],
GOptionEntry opt_entries[],
gchar** metaDataType,
gchar** input_file,
- const std::string env_models_path,
+ const std::string env_models_path,
gchar const** detection_model,
gboolean* streamVid,
gchar const** classification_models,
const std::vector default_classification_model_names,
gchar const** model_precision) {
-
+
// Parse arguments
GOptionContext *context = g_option_context_new(" ");
g_option_context_add_main_entries(context, opt_entries, " ");
@@ -389,12 +390,12 @@ int parse_args(
g_print("Please specify input file:\n%s\n", g_option_context_get_help(context, TRUE, NULL));
return 1;
}
-
+
if(!metaDataType[0]) {
g_print("Please Specify metadata to query for:\n%s\n", g_option_context_get_help(context, TRUE, NULL));
return 1;
}
-
+
if(strcmp(metaDataType[0], "object") != 0 && strcmp(metaDataType[0], "face") != 0) {
g_print("Incorrect type of metadata to query for:\n%s\n", g_option_context_get_help(context, TRUE, NULL));
return 1;
@@ -408,10 +409,10 @@ int parse_args(
g_print("Video is too large to upload rerun without -v option:\n%s\n", g_option_context_get_help(context, TRUE, NULL));
return -1;
}
-
- if (env_models_path.empty())
+
+ if (env_models_path.empty())
throw std::runtime_error("Enviroment variable MODELS_PATH is not set");
-
+
if(!detection_model[0]) {
if(strcmp(metaDataType[0], "object") == 0){
std::vector default_detection_model_names = {"yolo_v3.xml"};
@@ -426,18 +427,20 @@ int parse_args(
detection_model[0] = g_strdup(model_paths["face-detection-adas-0001.xml"].c_str());
}
}
-
+
if(classification_models[0] == NULL) {
std::map model_paths =
FindModels(SplitString(env_models_path), default_classification_model_names, model_precision[0]);
std::string classification_models_str = model_paths["age-gender-recognition-retail-0013.xml"] + "," +
model_paths["emotions-recognition-retail-0003.xml"];
classification_models[0] = g_strdup(classification_models_str.c_str());
- }
+ }
return 0;
}
+
int main(int argc, char *argv[]) {
+ // std::cout << "[metaData_extract DEBUG] PREPARE WORKLOAD" << std::endl;
std::string env_models_path = "";
const char *temp_model = getenv("MODELS_PATH");
const char *temp_cvsdk = getenv("INTEL_CVSDK_DIR");
@@ -447,7 +450,7 @@ int main(int argc, char *argv[]) {
else if(temp_cvsdk != NULL){
env_models_path = std::string() + temp_cvsdk + "/deployment_tools/intel_models/";
}
-
+
const std::vector default_classification_model_names = {
"age-gender-recognition-retail-0013.xml", "emotions-recognition-retail-0003.xml"
};
@@ -461,6 +464,7 @@ int main(int argc, char *argv[]) {
gchar const *device = "CPU";
gchar const *model_precision = "FP32";
gint batch_size = 1;
+ gint frame_interval = 1;
gdouble threshold = 0.7;
gboolean no_display = FALSE;
int frameID = 0;
@@ -469,12 +473,12 @@ int main(int argc, char *argv[]) {
gboolean streamVid = FALSE;
gboolean logVDMSTransX = FALSE;
gchar* objIDListFN = NULL;
-
+
static GOptionEntry opt_entries[] = {
{"input_file", 'i', 0, G_OPTION_ARG_STRING, &input_file, "Path to input video file", NULL},
{"precision", 'p', 0, G_OPTION_ARG_STRING, &model_precision, "Models precision. Default: FP32", NULL},
{"detection", 'm', 0, G_OPTION_ARG_STRING, &detection_model, "Path to detection model file", NULL},
- {"classification", 'c', 0, G_OPTION_ARG_STRING, &classification_models, "Path to classification models as ',' separated list", NULL},
+ {"classification", 'c', 0, G_OPTION_ARG_STRING, &classification_models, "Path to classification models as ',' separated list", NULL},
{"extension", 'e', 0, G_OPTION_ARG_STRING, &extension, "Path to custom layers extension library", NULL},
{"device", 'd', 0, G_OPTION_ARG_STRING, &device, "Device to run inference", NULL},
{"batch", 'b', 0, G_OPTION_ARG_INT, &batch_size, "Batch size", NULL},
@@ -488,10 +492,10 @@ int main(int argc, char *argv[]) {
{"logVDMSTransX", 'l', 0, G_OPTION_ARG_NONE, &logVDMSTransX, "Flag to dump VDMS transactions to the console", NULL},
{}
};
-
+
if(parse_args(
- argc,
- argv,
+ argc,
+ argv,
opt_entries,
&metaDataType,
&input_file,
@@ -503,8 +507,8 @@ int main(int argc, char *argv[]) {
&model_precision) != 0) {
return 1;
}
-
-
+
+
// Get classification models
std::stringstream ss(classification_models);
std::vector res;
@@ -517,7 +521,7 @@ int main(int argc, char *argv[]) {
classification_model_2 = res[1].c_str();
std::vector objIDArr = VDMSConverter::crtObjIDArr(objIDListFN, coco_labels);
- if(objIDArr.empty())
+ if(objIDArr.empty())
{
std::cout << "Object ID List is empty" << std::endl;
return 1;
@@ -526,12 +530,12 @@ int main(int argc, char *argv[]) {
std::string vdms_ip_addr_str(vdms_ip_addr);
converter = new VDMSConverter(vdms_ip_addr_str, (bool)logVDMSTransX);
std::string input_file_str(input_file);
- if(converter->upload_video(input_file_str, streamVid) == -1)
+ if(converter->upload_video(input_file_str, streamVid) == -1)
{
std::cout << "Add Video failed" << std::endl;
return 1;
- }
-
+ }
+
// set probe callback
pad_probe_CB_args args;
std::ofstream *outFD = (outFN != NULL) ? new std::ofstream(outFN) : NULL;
@@ -543,28 +547,28 @@ int main(int argc, char *argv[]) {
args.streamVid = (bool)streamVid;
args.noDisplay = (bool)no_display;
args.objIDArr = objIDArr;
-
+
// Build the pipeline
GstElement *gstBin = NULL;
GstPad *pad = NULL;
GstElement *pipeline = NULL;
-
+
if(strcmp(metaDataType, "object") == 0)
{
gchar const* preprocess_pipeline = "decodebin ! videoconvert n-threads=4 ! videoscale n-threads=4 ";
gchar const* capfilter = "video/x-raw,format=BGRA";
gchar const* sink = no_display ? "identity signal-handoffs=false ! fakesink sync=false"
- : "autovideosink";
+ : "autovideosink";
gchar* launch_str = g_strdup_printf("filesrc location=%s ! %s ! capsfilter caps=\"%s\" ! "
- "gvainference name=gvadetect model=%s device=%s batch-size=%d ! queue ! "
+ "gvainference name=gvadetect model=%s device=%s batch-size=%d inference-interval=%d ! queue ! "
"videoconvert n-threads=4 ! %s ",
- input_file, preprocess_pipeline, capfilter, detection_model, device, batch_size, sink);
-
+ input_file, preprocess_pipeline, capfilter, detection_model, device, batch_size, frame_interval, sink);
+
g_print("PIPELINE: %s \n", launch_str);
pipeline = gst_parse_launch(launch_str, NULL);
- g_free(launch_str);
-
+ g_free(launch_str);
+
// Set Probe callback
gstBin = gst_bin_get_by_name(GST_BIN(pipeline), "gvadetect");
pad = gst_element_get_static_pad(gstBin, "src");
@@ -574,58 +578,57 @@ int main(int argc, char *argv[]) {
gchar const* preprocess_pipeline = "decodebin ! videoconvert n-threads=4 ! videoscale n-threads=4 ";
gchar const* capfilter = "video/x-raw,format=BGRA";
gchar const *sink = no_display ? "identity signal-handoffs=false ! fakesink sync=false"
- : "autovideosink";
+ : "autovideosink";
gchar* launch_str = g_strdup_printf("filesrc location=%s ! %s ! capsfilter caps=\"%s\" ! "
- "gvadetect model=%s device=%s batch-size=%d ! queue ! "
+ "gvadetect model=%s device=%s batch-size=%d inference-interval=%d ! queue ! "
"gvaclassify model=%s device=%s batch-size=%d ! queue ! "
"gvaclassify model=%s device=%s batch-size=%d ! queue ! "
"gvawatermark name=gvawatermark ! videoconvert n-threads=4 ! %s",
input_file, preprocess_pipeline, capfilter,
- detection_model, device, batch_size,
+ detection_model, device, batch_size, frame_interval,
classification_model_1, device, batch_size,
classification_model_2, device, batch_size, sink);
// }
-
g_print("PIPELINE: %s \n", launch_str);
pipeline = gst_parse_launch(launch_str, NULL);
g_free(launch_str);
-
+
// set probe callback
gstBin = gst_bin_get_by_name(GST_BIN(pipeline), "gvawatermark");
pad = gst_element_get_static_pad(gstBin, "src");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, pad_probe_callback_face, &args, NULL);
}
gst_object_unref(pad);
-
+
// Start playing
gst_element_set_state(pipeline, GST_STATE_PLAYING);
-
+
// Wait until error or EOS
- GstBus *bus = gst_element_get_bus(pipeline);
- int ret_code = 0;
- GstMessage *msg = gst_bus_poll(bus, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS), -1);
+ GstBus *bus = gst_element_get_bus(pipeline);
+ int ret_code = 0;
+ GstMessage *msg = gst_bus_poll(bus, (GstMessageType)(GST_MESSAGE_ERROR | GST_MESSAGE_EOS), -1);
if (msg && GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) {
GError *err = NULL;
- gchar *dbg_info = NULL;
+ gchar *dbg_info = NULL;
gst_message_parse_error(msg, &err, &dbg_info);
g_printerr("ERROR from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
- g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
+ g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free(err);
g_free(dbg_info);
ret_code = -1;
}
-
+
if (msg)
gst_message_unref(msg);
-
+
// Free resources
gst_object_unref(bus);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
-
+
if(outFD)
outFD->close();
-
+
return ret_code;
-}
+}
\ No newline at end of file
diff --git a/ingest/ingest.py b/ingest/ingest.py
index eff43e8..77c3548 100755
--- a/ingest/ingest.py
+++ b/ingest/ingest.py
@@ -7,14 +7,55 @@
import socket
import time
import os
+import logging
+import kafka.errors as Errors
+
+num_cpus=int(os.environ["NCPU"])
+if num_cpus > 0:
+ from multiprocessing import cpu_count
+ import random
+ import psutil
+ cpu_nums = list(range(psutil.cpu_count()))
+ random.shuffle(cpu_nums)
+ proc = psutil.Process(os.getpid())
+ proc.cpu_affinity(cpu_nums[:num_cpus])
+
+
+# SETUP LOGGER
+logger = logging.getLogger(__name__)
+fmt_str = "[%(filename)s:line %(lineno)d] %(levelname)s:: %(message)s"
+formatter = logging.Formatter(fmt_str)
+
+stream_handler = logging.StreamHandler()
+stream_handler.setLevel(logging.INFO)
+stream_handler.setFormatter(formatter)
+logger.addHandler(stream_handler)
+logger.info('INGEST POOL')
topic="video_curation_sched"
groupid="curaters"
clientid=socket.gethostname()
+producer_topic="ingest_sched"
+
kkhost=os.environ["KKHOST"]
vdhost=os.environ["VDHOST"]
dbhost=os.environ["DBHOST"]
+in_source=os.environ["IN_SOURCE"]
+
+def send_producer_msg(producer, topic, strvalue):
+ try:
+ producer.send(topic=topic, value=str.encode(strvalue))
+ producer.flush()
+ except Errors.KafkaTimeoutError as kte:
+ logger.exception("[KAFKA PRODUCER TIMEOUT ERROR]: %s", kte)
+ logger.error(traceback.format_exc())
+ except Errors.KafkaError as ke:
+ logger.exception("[KAFKA PRODUCER ERROR]: %s", ke)
+ logger.error(traceback.format_exc())
+ except Exception as e:
+ logger.exception("[EXCEPTION]: %s", e)
+ logger.error(traceback.format_exc())
while True:
try:
@@ -28,17 +69,19 @@
if not zk.processed():
if zk.process_start():
- print("Processing "+clip_name+":"+mode+"...", flush=True)
+ logger.info("Processing {}: {}...".format(clip_name, mode))
+
while True:
- print("Downloading "+clip_name, flush=True)
- sts=call(["/usr/bin/wget","-O",clip_name,vdhost+"/mp4/"+clip_name])
- if sts==0: break
- time.sleep(1)
+ logger.info("Downloading "+clip_name)
+ sts=call(["/usr/bin/wget","-O",clip_name,vdhost+"/mp4/"+clip_name])
+ if sts==0: break
+ time.sleep(0.5)
call(["/opt/gstreamer_gva/metaData_extract","-i",clip_name,"-n","-x",mode,"-a",dbhost,"-l"])
+
os.remove(clip_name)
zk.process_end()
zk.close()
except:
- print(traceback.format_exc(), flush=True)
+ logger.error(traceback.format_exc())
diff --git a/ingest/vdms_converter/SConstruct b/ingest/vdms_converter/SConstruct
index c813319..72a01aa 100644
--- a/ingest/vdms_converter/SConstruct
+++ b/ingest/vdms_converter/SConstruct
@@ -9,7 +9,7 @@ env = Environment(
"/usr/include",
"/usr/include/jsoncpp",
"/usr/local/include/opencv4",
- VDMS_ROOT_PATH,
+ VDMS_ROOT_PATH,
VDMS_ROOT_PATH + "/client/cpp",
VDMS_ROOT_PATH + "/utils/include",
],
diff --git a/ingest/zkstate.py b/ingest/zkstate.py
index c21edf8..b1986f1 100755
--- a/ingest/zkstate.py
+++ b/ingest/zkstate.py
@@ -23,7 +23,7 @@ def __init__(self, path, name=None):
def processed(self):
return self._zk.exists(self._path+"/"+self._name+"complete")
-
+
def process_start(self):
if self.processed(): return False
try:
diff --git a/script/build.sh b/script/build.sh
index ad83b2d..90e54b5 100644
--- a/script/build.sh
+++ b/script/build.sh
@@ -7,14 +7,15 @@ fi
PLATFORM="${1:-Xeon}"
FRAMEWORK="gst"
-REGISTRY="$4"
+IN_SOURCE="$4"
+REGISTRY="$7"
USER="docker"
GROUP="docker"
build_docker() {
docker_file="$1"
shift
- image_name="$1"
+ image_name="$1:stream"
shift
if test -f "$docker_file.m4"; then
m4 -I "$(dirname $docker_file)" "$docker_file.m4" > "$docker_file"
@@ -22,7 +23,7 @@ build_docker() {
(cd "$DIR"; docker build --network host --file="$docker_file" "$@" -t "$image_name" "$DIR" $(env | cut -f1 -d= | grep -E '_(proxy|REPO|VER)$' | sed 's/^/--build-arg /') --build-arg USER=${USER} --build-arg GROUP=${GROUP} --build-arg UID=$(id -u) --build-arg GID=$(id -g))
# if REGISTRY is specified, push image to the private registry
- if [ -n "$REGISTRY" ]; then
+ if [ "$REGISTRY" != " " ]; then
docker tag "$image_name" "$REGISTRY$image_name"
docker push "$REGISTRY$image_name"
fi
diff --git a/script/deployment.cmake b/script/deployment.cmake
index 2cba464..fc53cca 100644
--- a/script/deployment.cmake
+++ b/script/deployment.cmake
@@ -1,2 +1,2 @@
-add_custom_target(start_${service} "${CMAKE_CURRENT_SOURCE_DIR}/start.sh" "${service}" "${PLATFORM}" "${NCURATIONS}" "${INGESTION}" "${REGISTRY}")
+add_custom_target(start_${service} "${CMAKE_CURRENT_SOURCE_DIR}/start.sh" "${service}" "${PLATFORM}" "${NCURATIONS}" "${INGESTION}" "${IN_SOURCE}" "${STREAM_URL}" "${NCPU}" "${REGISTRY}")
add_custom_target(stop_${service} "${CMAKE_CURRENT_SOURCE_DIR}/stop.sh" "${service}")
diff --git a/script/mk-dist.sh b/script/mk-dist.sh
index 94c4e3e..bad8220 100755
--- a/script/mk-dist.sh
+++ b/script/mk-dist.sh
@@ -13,7 +13,7 @@ case "$0" in
rm -rf "$DIR/../dist"
if test -e "$YML"; then
mkdir -p "$DIR/../dist/dist"
- for image in `awk -v 'labels=*' -f "$DIR/scan-yaml.awk" "$YML"` smtc_certificate:latest; do
+ for image in `awk -v 'labels=*' -f "$DIR/scan-yaml.awk" "$YML"` lcc_certificate:stream; do
imagefile=${image//\//-}
imagefile=${imagefile//:/-}
echo "archiving $image => $imagefile"
diff --git a/script/service.cmake b/script/service.cmake
index ccd0d10..9e167e7 100644
--- a/script/service.cmake
+++ b/script/service.cmake
@@ -1,3 +1,3 @@
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/build.sh")
- add_custom_target(build_${service} ALL "${CMAKE_CURRENT_SOURCE_DIR}/build.sh" "${PLATFORM}" "${NCURATIONS}" "${INGESTION}" "${REGISTRY}")
+ add_custom_target(build_${service} ALL "${CMAKE_CURRENT_SOURCE_DIR}/build.sh" "${PLATFORM}" "${NCURATIONS}" "${INGESTION}" "${IN_SOURCE}" "${STREAM_URL}" "${NCPU}" "${REGISTRY}")
endif()
diff --git a/script/shell.sh b/script/shell.sh
index 48d0041..3fb7440 100644
--- a/script/shell.sh
+++ b/script/shell.sh
@@ -7,10 +7,10 @@ fi
pid="$(docker ps -f ancestor=$IMAGE --format='{{.ID}}' | head -n 1)"
if [ -n "$pid" ] && [ "$#" -le "1" ]; then
- echo "bash into running container...$IMAGE"
+ echo "bash into running container...$IMAGE:stream"
docker exec -it $pid ${*-/bin/bash}
else
- echo "bash into new container...$IMAGE"
+ echo "bash into new container...$IMAGE:stream"
args=("$@")
- docker run --rm ${OPTIONS[@]} $(env | cut -f1 -d= | grep -E '_(proxy|REPO|VER)$' | sed 's/^/-e /') --entrypoint ${1:-/bin/bash} -it "${IMAGE}" ${args[@]:1}
+ docker run --rm ${OPTIONS[@]} $(env | cut -f1 -d= | grep -E '_(proxy|REPO|VER)$' | sed 's/^/-e /') --entrypoint ${1:-/bin/bash} -it "${IMAGE}:stream" ${args[@]:1}
fi
diff --git a/start_app.sh b/start_app.sh
new file mode 100755
index 0000000..c8700fa
--- /dev/null
+++ b/start_app.sh
@@ -0,0 +1,120 @@
+#!/bin/bash -e
+#######################################################################################################################
+# This script runs the Curation application
+#######################################################################################################################
+# DEFAULT VARIABLES
+INGESTION="object,face"
+EXP_TYPE=compose
+REGISTRY=None
+NCPU=0
+NCURATIONS=1
+IN_SOURCE=stream
+SOURCE="-DSTREAM_URL="udp://localhost:8088" -DIN_SOURCE=${IN_SOURCE}"
+
+DIR=$(dirname $(readlink -f "$0"))
+BUILD_DIR=$DIR/build
+
+LONG_LIST=(
+ "ingestion"
+ "type"
+ "registry"
+ "ncurations"
+ "ncpu"
+ "source"
+)
+
+OPTS=$(getopt \
+ --longoptions "$(printf "%s:," "${LONG_LIST[@]}")" \
+ --name "$(basename "$0")" \
+ --options "hi:t:r:n:c:s:" \
+ -- "$@"
+)
+
+eval set -- $OPTS
+
+if [ -d "$BUILD_DIR" ]; then
+ rm -rf $BUILD_DIR
+fi
+
+mkdir -p $BUILD_DIR
+
+#######################################################################################################################
+# GET SCRIPT OPTIONS
+script_usage()
+{
+ cat <=1.4.7' inotify
COPY *.py /home/
COPY cleanup.sh /home/
COPY manage.sh /home/
COPY *.conf /etc/nginx/
-COPY archive/*.mp4 /var/www/mp4/
-CMD ["/home/manage.sh"]
+COPY archive /var/www/mp4
+RUN rm /var/www/mp4/.gitignore || true
+CMD ["/bin/bash","-c","/home/manage.sh"]
####
ARG USER
@@ -16,10 +21,10 @@ ARG GROUP
ARG UID
ARG GID
## must use ; here to ignore user exist status code
-RUN [ ${GID} -gt 0 ] && groupadd -f -g ${GID} ${GROUP}; \
- [ ${UID} -gt 0 ] && useradd -d /home/${USER} -g ${GID} -K UID_MAX=${UID} -K UID_MIN=${UID} ${USER}; \
+RUN if [ ${GID} -gt 0 ]; then groupadd -f -g ${GID} ${GROUP}; fi; \
+ if [ ${UID} -gt 0 ]; then useradd -d /home/${USER} -g ${GID} -K UID_MAX=${UID} -K UID_MIN=${UID} ${USER}; fi; \
touch /var/run/nginx.pid && \
- mkdir -p /var/log/nginx /var/lib/nginx /var/www/cache /var/www/gen /var/www/mp4 && \
+ mkdir -p /var/log/nginx /var/lib/nginx /var/www/cache /var/www/gen /var/www/mp4 /var/www/streams && \
chown -R ${UID}.${GID} /var/run/nginx.pid /var/log/nginx /var/lib/nginx /var/www /etc/nginx/nginx.conf
VOLUME ["/var/www"]
USER ${UID}
diff --git a/video/archive/.gitignore b/video/archive/.gitignore
new file mode 100644
index 0000000..0baf103
--- /dev/null
+++ b/video/archive/.gitignore
@@ -0,0 +1,5 @@
+# Ignore everything in this directory
+*
+
+# Except this file
+!.gitignore
\ No newline at end of file
diff --git a/video/build.sh b/video/build.sh
index daca2fb..f5bae68 100755
--- a/video/build.sh
+++ b/video/build.sh
@@ -2,6 +2,9 @@
IMAGE="lcc_video"
DIR=$(dirname $(readlink -f "$0"))
+IN_SOURCE="$4"
-"$DIR/download.sh"
+if [[ $IN_SOURCE == *"videos"* ]]; then
+ "$DIR/download.sh"
+fi
. "$DIR/../script/build.sh"
diff --git a/video/download.sh b/video/download.sh
index 4f6a152..e8d09b9 100755
--- a/video/download.sh
+++ b/video/download.sh
@@ -1,10 +1,13 @@
#!/bin/bash -e
DIR=$(dirname $(readlink -f "$0"))
+mkdir -p "${DIR}/archive" "${DIR}/log/"
-CLIPS=($(cat "$DIR"/streamlist.txt))
+CLIPS=($(cat "${DIR}"/streamlist.txt))
+
+# LOGFILE="${DIR}/log/video_storage_local_download_metrics.csv"
+# echo "clip_name,type,Elapsed_time_s,File_size" | tee ${LOGFILE}
-mkdir -p "$DIR/archive"
for clip in "${CLIPS[@]}"; do
url=$(echo "$clip" | cut -f1 -d',')
clip_name=$(echo "$clip" | cut -f2 -d',')
@@ -17,7 +20,12 @@ for clip in "${CLIPS[@]}"; do
read reply
fi
if test "$reply" = "accept"; then
+ # start_time=$(date +%s.%3N)
wget -U "XXX YYY" -O "$DIR/archive/$clip_mp4" "$url"
+ # end_time=$(date +%s.%3N)
+ # file_size="$(du -h $DIR/archive/$clip_mp4 | cut -f 1)"
+ # total_time=$(echo "$end_time - $start_time" | bc);
+ # echo "${clip_mp4},download video,${total_time},${file_size}" >> ${LOGFILE}
else
echo "Skipping..."
fi
diff --git a/video/manage.sh b/video/manage.sh
index 1eb4051..0f7be27 100755
--- a/video/manage.sh
+++ b/video/manage.sh
@@ -1,19 +1,8 @@
#!/bin/bash -e
-# ingest video list into Kafka
-cd /var/www/mp4
-ls -1 | awk -v ingest=$INGESTION 'BEGIN{split(ingest,modes,",")}{for (i in modes) print modes[i]","$0}' > /tmp/videolist.txt
-
-while true; do
- sout="$(cat /tmp/videolist.txt | kafkacat -P -D '\n' -b $KKHOST -t video_curation_sched -p -1 -T -X partition.assignment.strategy=roundrobin 2>&1 || echo)"
- case "$sout" in
- *ERROR*)
- echo "$sout"
- sleep 1s
- continue;;
- esac
- break
-done
+# Watch directory
+# ./watch-new-clips.sh &
+python3 /home/watch_and_notify.py /var/www/streams &
# run tornado
exec /home/manage.py
diff --git a/video/segment.py b/video/segment.py
index c2bc27e..61011a9 100755
--- a/video/segment.py
+++ b/video/segment.py
@@ -22,6 +22,7 @@ def _gen_segment(self, video, start, end):
output=video.replace(".mp4","-"+start+".mp4")
if not os.path.exists(self._genpath+"/"+output):
call(["/usr/local/bin/ffmpeg","-ss",start,"-i",self._mp4path+"/"+video,"-to",end,"-c","copy",self._genpath+"/"+output])
+ # call(["/usr/local/bin/ffmpeg","-i",self._mp4path+"/"+video,"-ss",start,"-t",end,"-c","copy",self._genpath+"/"+output])
return output
def _format(self, time):
diff --git a/video/watch_and_notify.py b/video/watch_and_notify.py
new file mode 100644
index 0000000..a5cd309
--- /dev/null
+++ b/video/watch_and_notify.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+from inotify.adapters import Inotify
+from kafka import KafkaProducer
+from shutil import copyfile
+import socket
+import datetime
+import logging
+import kafka.errors as Errors
+import traceback
+
+
+# SETUP LOGGER
+logger = logging.getLogger(__name__)
+fmt_str = "[%(filename)s:line %(lineno)d] %(levelname)s:: %(message)s"
+formatter = logging.Formatter(fmt_str)
+
+stream_handler = logging.StreamHandler()
+stream_handler.setLevel(logging.INFO)
+stream_handler.setFormatter(formatter)
+logger.addHandler(stream_handler)
+logger.info('Video: Watch & Notify')
+
+topic="video_curation_sched"
+clientid=socket.gethostname()
+
+kkhost=os.environ["KKHOST"]
+ingestion=os.environ["INGESTION"]
+in_source=os.environ["IN_SOURCE"]
+video_store_dir="/var/www/mp4"
+
+# logger.info("[VIDEO_CLIP_MSG LOG],timestamp,mode,filename")
+
+def send_producer_msg(producer, topic, strvalue):
+ try:
+ ingest, filename = strvalue.split(",")
+ future = producer.send(topic=topic, value=str.encode(strvalue))
+ producer.flush()
+
+ record_metadata = future.get(timeout=10)
+ # logger.info("[VIDEO_CLIP_MSG LOG],{},{},{}".format( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ingest, filename))
+ # logger.debug("[KAFKA PRODUCER MSG] Message to Ingest Pool sent")
+ # logger.debug("[KAFKA PRODUCER TOPIC] {}".format(record_metadata.topic))
+ # logger.debug("[KAFKA PRODUCER PARTITION] {}".format(record_metadata.partition))
+ # logger.debug("[KAFKA PRODUCER OFFSET] {}\n\n".format(record_metadata.offset))
+ except Errors.KafkaTimeoutError as kte:
+ logger.exception("[KAFKA PRODUCER TIMEOUT ERROR]: %s", kte)
+ logger.error(traceback.format_exc())
+ except Errors.KafkaError as ke:
+ logger.exception("[KAFKA PRODUCER ERROR]: %s", ke)
+ logger.error(traceback.format_exc())
+ except Exception as e:
+ logger.exception("[EXCEPTION]: %s", e)
+ logger.error(traceback.format_exc())
+
+def main(watch_folder=os.getcwd()):
+ producer = KafkaProducer(bootstrap_servers=kkhost,
+ client_id=clientid, api_version=(0,10))
+ # logger.debug("[KAFKA PRODUCER (Video)] bootstrap_servers: {}\tclient_id: {}".format(kkhost, clientid))
+
+ if "videos" in in_source:
+ for filename in os.listdir(video_store_dir):
+ if filename.endswith(".mp4"):
+ for ingest in ingestion.split(','):
+ strvalue = "{},{}".format(ingest, filename)
+ send_producer_msg(producer, topic, strvalue)
+
+ with open('/var/www/mp4/last_video.log', 'w') as f:
+ f.write("{},{},extract metadata".format(ingest, filename))
+
+ if "stream" in in_source:
+ i = Inotify()
+ i.add_watch(watch_folder)
+
+ for event in i.event_gen(yield_nones=False):
+ (_, type_names, path, filename) = event
+ target_dir = os.path.join(video_store_dir, filename)
+
+ # on file write completion, we publish to topic
+ if 'IN_CLOSE_WRITE' in type_names and not os.path.exists(target_dir):
+ for ingest in ingestion.split(','):
+ strvalue = "{},{}".format(ingest, filename)
+ send_producer_msg(producer, topic, strvalue)
+ copyfile(os.path.join(path, filename), target_dir)
+ with open('/var/www/mp4/last_video.log', 'w') as f:
+ f.write("{},{},extract metadata".format(ingest, filename))
+
+if __name__ == '__main__':
+ if len(sys.argv) == 2:
+ main(sys.argv[1])
+ else:
+ main()
\ No newline at end of file